成熟丰满熟妇高潮XXXXX,人妻无码AV中文系列久久兔费 ,国产精品一国产精品,国精品午夜福利视频不卡麻豆

您好,歡迎來到九壹網。
搜索
您的當前位置:首頁Flink消費Kafka數(shù)據(jù)并把實時計算的結果導入到Redis

Flink消費Kafka數(shù)據(jù)并把實時計算的結果導入到Redis

來源:九壹網
?Flink消費Kafka數(shù)據(jù)并把實時計算的結果導?到Redis

1. 完成的場景

在很多?數(shù)據(jù)場景下,要求數(shù)據(jù)形成數(shù)據(jù)流的形式進?計算和存儲。上篇博客介紹了Flink消費Kafka數(shù)據(jù)實現(xiàn)Wordcount計算,這篇博客需要完成的是將實時計算的結果寫到redis。當kafka從其他端獲取數(shù)據(jù)?刻到Flink計算,F(xiàn)link計算完后結果寫到Redis,整個過程就像流??樣形成了數(shù)據(jù)流的處理

2. 代碼

添加第三?依賴

org.apache.flink flink-clients_2.11 1.4.0

org.apache.flink

flink-streaming-java_2.11 1.4.0

org.apache.flink flink-java 1.4.0

org.apache.flink

flink-connector-kafka-0.9_2.11 1.4.0

org.apache.flink

flink-connector-redis_2.10 1.1.5

注意這?的版本最好統(tǒng)?選1.4.0,flink-redis的版本最好選1.1.5,?低版本或其他版本會遇到包沖突或者不同包的同?類不同等邏輯或者第版本有些類沒有等java通?的?些問題邏輯代碼

package com.scn;

import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;import org.apache.flink.streaming.connectors.redis.RedisSink;

import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;

import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import org.apache.flink.util.Collector;import java.util.Properties;

public class FilnkCostKafka {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000);

Properties properties = new Properties();

properties.setProperty(\"bootstrap.servers\ properties.setProperty(\"zookeeper.connect\ properties.setProperty(\"group.id\

FlinkKafkaConsumer09 myConsumer = new FlinkKafkaConsumer09(\"test\ DataStream stream = env.addSource(myConsumer);

DataStream> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1); //實例化Flink和Redis關聯(lián)類FlinkJedisPoolConfig,設置Redis端?

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(\"127.0.0.1\").build(); //實例化RedisSink,并通過flink的addSink的?式將flink計算的結果插?到redis

counts.addSink(new RedisSink>(conf,new RedisExampleMapper())); env.execute(\"WordCount from Kafka data\"); }

public static final class LineSplitter implements FlatMapFunction> { private static final long serialVersionUID = 1L;

public void flatMap(String value, Collector> out) { String[] tokens = value.toLowerCase().split(\"\\\\W+\"); for (String token : tokens) { if (token.length() > 0) {

out.collect(new Tuple2(token, 1)); }

} } }

//指定Redis key并將flink數(shù)據(jù)類型映射到Redis數(shù)據(jù)類型

public static final class RedisExampleMapper implements RedisMapper>{ public RedisCommandDescription getCommandDescription() {

return new RedisCommandDescription(RedisCommand.HSET, \"flink\"); }

public String getKeyFromData(Tuple2 data) { return data.f0; }

public String getValueFromData(Tuple2 data) { return data.f1.toString(); } }}

編寫?個測試類

package com.scn;

import redis.clients.jedis.Jedis;

public class RedisTest {

public static void main(String args[]){ Jedis jedis=new Jedis(\"127.0.0.1\");

System.out.println(\"Server is running: \" + jedis.ping()); System.out.println(\"result:\"+jedis.hgetAll(\"flink\")); }}

3. 測試

啟動Redis服務

redis-server

執(zhí)?FilnkCostKafka main?法沒有跑出異常信息證明啟動沒有問題在kafka producer端輸出?些數(shù)據(jù)

執(zhí)?測試類RedisTest的main?法會輸出:

Server is running: PONG

result:{flink=2, newyork=1, will=1, kafka=2, wolrd=2, go=1, i=1, meijiasheng=1, is=1, hello=6, myname=1, redis=2}

可以看到數(shù)據(jù)已經流到Redis

因篇幅問題不能全部顯示,請點此查看更多更全內容

Copyright ? 2019- 91gzw.com 版權所有 湘ICP備2023023988號-2

違法及侵權請聯(lián)系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市萬商天勤律師事務所王興未律師提供法律服務