1. 完成的場景
在很多?數(shù)據(jù)場景下,要求數(shù)據(jù)形成數(shù)據(jù)流的形式進?計算和存儲。上篇博客介紹了Flink消費Kafka數(shù)據(jù)實現(xiàn)Wordcount計算,這篇博客需要完成的是將實時計算的結果寫到redis。當kafka從其他端獲取數(shù)據(jù)?刻到Flink計算,F(xiàn)link計算完后結果寫到Redis,整個過程就像流??樣形成了數(shù)據(jù)流的處理
2. 代碼
添加第三?依賴
注意這?的版本最好統(tǒng)?選1.4.0,flink-redis的版本最好選1.1.5,?低版本或其他版本會遇到包沖突或者不同包的同?類不同等邏輯或者第版本有些類沒有等java通?的?些問題邏輯代碼
package com.scn;
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import org.apache.flink.util.Collector;import java.util.Properties;
public class FilnkCostKafka {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000);
Properties properties = new Properties();
properties.setProperty(\"bootstrap.servers\ properties.setProperty(\"zookeeper.connect\ properties.setProperty(\"group.id\
FlinkKafkaConsumer09 DataStream FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(\"127.0.0.1\").build(); //實例化RedisSink,并通過flink的addSink的?式將flink計算的結果插?到redis counts.addSink(new RedisSink public static final class LineSplitter implements FlatMapFunction public void flatMap(String value, Collector out.collect(new Tuple2 } } } //指定Redis key并將flink數(shù)據(jù)類型映射到Redis數(shù)據(jù)類型 public static final class RedisExampleMapper implements RedisMapper return new RedisCommandDescription(RedisCommand.HSET, \"flink\"); } public String getKeyFromData(Tuple2 public String getValueFromData(Tuple2 編寫?個測試類 package com.scn; import redis.clients.jedis.Jedis; public class RedisTest { public static void main(String args[]){ Jedis jedis=new Jedis(\"127.0.0.1\"); System.out.println(\"Server is running: \" + jedis.ping()); System.out.println(\"result:\"+jedis.hgetAll(\"flink\")); }} 3. 測試 啟動Redis服務 redis-server 執(zhí)?FilnkCostKafka main?法沒有跑出異常信息證明啟動沒有問題在kafka producer端輸出?些數(shù)據(jù) 執(zhí)?測試類RedisTest的main?法會輸出: Server is running: PONG result:{flink=2, newyork=1, will=1, kafka=2, wolrd=2, go=1, i=1, meijiasheng=1, is=1, hello=6, myname=1, redis=2} 可以看到數(shù)據(jù)已經流到Redis 因篇幅問題不能全部顯示,請點此查看更多更全內容
Copyright ? 2019- 91gzw.com 版權所有 湘ICP備2023023988號-2
違法及侵權請聯(lián)系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市萬商天勤律師事務所王興未律師提供法律服務