成熟丰满熟妇高潮XXXXX,人妻无码AV中文系列久久兔费 ,国产精品一国产精品,国精品午夜福利视频不卡麻豆

您好,歡迎來(lái)到九壹網(wǎng)。
搜索
您的當(dāng)前位置:首頁(yè)Flink消費(fèi)Kafka數(shù)據(jù)并把實(shí)時(shí)計(jì)算的結(jié)果導(dǎo)入到Redis

Flink消費(fèi)Kafka數(shù)據(jù)并把實(shí)時(shí)計(jì)算的結(jié)果導(dǎo)入到Redis

來(lái)源:九壹網(wǎng)
?Flink消費(fèi)Kafka數(shù)據(jù)并把實(shí)時(shí)計(jì)算的結(jié)果導(dǎo)?到Redis

1. 完成的場(chǎng)景

在很多?數(shù)據(jù)場(chǎng)景下,要求數(shù)據(jù)形成數(shù)據(jù)流的形式進(jìn)?計(jì)算和存儲(chǔ)。上篇博客介紹了Flink消費(fèi)Kafka數(shù)據(jù)實(shí)現(xiàn)Wordcount計(jì)算,這篇博客需要完成的是將實(shí)時(shí)計(jì)算的結(jié)果寫到redis。當(dāng)kafka從其他端獲取數(shù)據(jù)?刻到Flink計(jì)算,F(xiàn)link計(jì)算完后結(jié)果寫到Redis,整個(gè)過(guò)程就像流??樣形成了數(shù)據(jù)流的處理

2. 代碼

添加第三?依賴

org.apache.flink flink-clients_2.11 1.4.0

org.apache.flink

flink-streaming-java_2.11 1.4.0

org.apache.flink flink-java 1.4.0

org.apache.flink

flink-connector-kafka-0.9_2.11 1.4.0

org.apache.flink

flink-connector-redis_2.10 1.1.5

注意這?的版本最好統(tǒng)?選1.4.0,flink-redis的版本最好選1.1.5,?低版本或其他版本會(huì)遇到包沖突或者不同包的同?類不同等邏輯或者第版本有些類沒有等java通?的?些問(wèn)題邏輯代碼

package com.scn;

import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09;import org.apache.flink.streaming.connectors.redis.RedisSink;

import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;

import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;import org.apache.flink.streaming.util.serialization.SimpleStringSchema;import org.apache.flink.util.Collector;import java.util.Properties;

public class FilnkCostKafka {

public static void main(String[] args) throws Exception {

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000);

Properties properties = new Properties();

properties.setProperty(\"bootstrap.servers\ properties.setProperty(\"zookeeper.connect\ properties.setProperty(\"group.id\

FlinkKafkaConsumer09 myConsumer = new FlinkKafkaConsumer09(\"test\ DataStream stream = env.addSource(myConsumer);

DataStream> counts = stream.flatMap(new LineSplitter()).keyBy(0).sum(1); //實(shí)例化Flink和Redis關(guān)聯(lián)類FlinkJedisPoolConfig,設(shè)置Redis端?

FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(\"127.0.0.1\").build(); //實(shí)例化RedisSink,并通過(guò)flink的addSink的?式將flink計(jì)算的結(jié)果插?到redis

counts.addSink(new RedisSink>(conf,new RedisExampleMapper())); env.execute(\"WordCount from Kafka data\"); }

public static final class LineSplitter implements FlatMapFunction> { private static final long serialVersionUID = 1L;

public void flatMap(String value, Collector> out) { String[] tokens = value.toLowerCase().split(\"\\\\W+\"); for (String token : tokens) { if (token.length() > 0) {

out.collect(new Tuple2(token, 1)); }

} } }

//指定Redis key并將flink數(shù)據(jù)類型映射到Redis數(shù)據(jù)類型

public static final class RedisExampleMapper implements RedisMapper>{ public RedisCommandDescription getCommandDescription() {

return new RedisCommandDescription(RedisCommand.HSET, \"flink\"); }

public String getKeyFromData(Tuple2 data) { return data.f0; }

public String getValueFromData(Tuple2 data) { return data.f1.toString(); } }}

編寫?個(gè)測(cè)試類

package com.scn;

import redis.clients.jedis.Jedis;

public class RedisTest {

public static void main(String args[]){ Jedis jedis=new Jedis(\"127.0.0.1\");

System.out.println(\"Server is running: \" + jedis.ping()); System.out.println(\"result:\"+jedis.hgetAll(\"flink\")); }}

3. 測(cè)試

啟動(dòng)Redis服務(wù)

redis-server

執(zhí)?FilnkCostKafka main?法沒有跑出異常信息證明啟動(dòng)沒有問(wèn)題在kafka producer端輸出?些數(shù)據(jù)

執(zhí)?測(cè)試類RedisTest的main?法會(huì)輸出:

Server is running: PONG

result:{flink=2, newyork=1, will=1, kafka=2, wolrd=2, go=1, i=1, meijiasheng=1, is=1, hello=6, myname=1, redis=2}

可以看到數(shù)據(jù)已經(jīng)流到Redis

因篇幅問(wèn)題不能全部顯示,請(qǐng)點(diǎn)此查看更多更全內(nèi)容

Copyright ? 2019- 91gzw.com 版權(quán)所有 湘ICP備2023023988號(hào)-2

違法及侵權(quán)請(qǐng)聯(lián)系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市萬(wàn)商天勤律師事務(wù)所王興未律師提供法律服務(wù)