1. 完成的場(chǎng)景
在很多?數(shù)據(jù)場(chǎng)景下,要求數(shù)據(jù)形成數(shù)據(jù)流的形式進(jìn)?計(jì)算和存儲(chǔ)。上篇博客介紹了Flink消費(fèi)Kafka數(shù)據(jù)實(shí)現(xiàn)Wordcount計(jì)算,這篇博客需要完成的是將實(shí)時(shí)計(jì)算的結(jié)果寫到redis。當(dāng)kafka從其他端獲取數(shù)據(jù)?刻到Flink計(jì)算,F(xiàn)link計(jì)算完后結(jié)果寫到Redis,整個(gè)過(guò)程就像流??樣形成了數(shù)據(jù)流的處理
2. 代碼
添加第三?依賴
注意這?的版本最好統(tǒng)?選1.4.0,flink-redis的版本最好選1.1.5,?低版本或其他版本會(huì)遇到包沖突或者不同包的同?類不同等邏輯或者第版本有些類沒有等java通?的?些問(wèn)題邏輯代碼
package com.scn;
import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import org.apache.flink.util.Collector;import java.util.Properties;
public class FilnkCostKafka {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000);
Properties properties = new Properties();
properties.setProperty(\"bootstrap.servers\ properties.setProperty(\"zookeeper.connect\ properties.setProperty(\"group.id\
FlinkKafkaConsumer09 DataStream FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(\"127.0.0.1\").build(); //實(shí)例化RedisSink,并通過(guò)flink的addSink的?式將flink計(jì)算的結(jié)果插?到redis counts.addSink(new RedisSink public static final class LineSplitter implements FlatMapFunction public void flatMap(String value, Collector out.collect(new Tuple2 } } } //指定Redis key并將flink數(shù)據(jù)類型映射到Redis數(shù)據(jù)類型 public static final class RedisExampleMapper implements RedisMapper return new RedisCommandDescription(RedisCommand.HSET, \"flink\"); } public String getKeyFromData(Tuple2 public String getValueFromData(Tuple2 編寫?個(gè)測(cè)試類 package com.scn; import redis.clients.jedis.Jedis; public class RedisTest { public static void main(String args[]){ Jedis jedis=new Jedis(\"127.0.0.1\"); System.out.println(\"Server is running: \" + jedis.ping()); System.out.println(\"result:\"+jedis.hgetAll(\"flink\")); }} 3. 測(cè)試 啟動(dòng)Redis服務(wù) redis-server 執(zhí)?FilnkCostKafka main?法沒有跑出異常信息證明啟動(dòng)沒有問(wèn)題在kafka producer端輸出?些數(shù)據(jù) 執(zhí)?測(cè)試類RedisTest的main?法會(huì)輸出: Server is running: PONG result:{flink=2, newyork=1, will=1, kafka=2, wolrd=2, go=1, i=1, meijiasheng=1, is=1, hello=6, myname=1, redis=2} 可以看到數(shù)據(jù)已經(jīng)流到Redis 因篇幅問(wèn)題不能全部顯示,請(qǐng)點(diǎn)此查看更多更全內(nèi)容
Copyright ? 2019- 91gzw.com 版權(quán)所有 湘ICP備2023023988號(hào)-2
違法及侵權(quán)請(qǐng)聯(lián)系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市萬(wàn)商天勤律師事務(wù)所王興未律師提供法律服務(wù)