大数据-玩转数据-Flink RedisSink

一、添加Redis Connector依赖

具体版本根据实际情况确定

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.1.5</version>
</dependency>

二、启动redis

参见大数据-玩转数据-Redis 安装与使用

三、编写代码

package com.lyh.flink06;import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;public class SinkRedis {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {@Overridepublic Integer getKey(Integer key) throws Exception {return key.intValue();}});FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("hadoop100").setPort(6379).setMaxTotal(100).setMaxIdle(10).setMinIdle(2).setTimeout(10*1000).setDatabase(0).setPassword("redis").build();keyedStream.addSink(new RedisSink<>(conf, new RedisMapper<Integer>() {@Overridepublic RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}@Overridepublic String getKeyFromData(Integer integer) {return integer.toString();}@Overridepublic String getValueFromData(Integer integer) {return integer.toString();}}));env.execute();}
}

可以根据要写入的redis的不同数据类型进行调整

四、查询结果

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/66413.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java代理模式——静态代理与动态代理

代理模式 代理模式允许你为其他对象提供一个代理&#xff0c;以控制对这个对象的访问。代理模式在不改变实际对象的情况下&#xff0c;可以在访问对象时添加额外的功能。 可以理解为代理模式为被代理对象创造了一个替身&#xff0c;调用者可以通过这个替身去实现这个被代理对…

UGUI事件系统EventSystem

一. 事件系统概述 Unity的事件系统具有通过鼠标、键盘、游戏控制柄、触摸操作等输入方式&#xff0c;将事件发送给对象的功能。事件系统通过场景中EventSystem对象的组件EventSystem和Standalone Input Module发挥功能。EventSystem对象通常实在创建画布的同时被创建的&#xf…

【报错】ModuleNotFoundError: No module named ‘websocket‘

1 报错 ModuleNotFoundError: No module named websocket 2 解决方法 pip install websocket 1 报错 AttributeError: module websocket has no attribute enableTrace 2 分析 一般是由于websocket的依赖包没有安装造成的。websocket.enableTrace()方法是在websocket-cli…

【校招VIP】前端JS语言考点之px rem等单位

考点介绍&#xff1a; rem vm等问题是前端面试里的高频题型。但是不少同学并不能很清楚的说明为什么在有px单位之后&#xff0c;还需要rem单位&#xff1f;往往会往不对的自适应方向回答。 作为基础性问题&#xff0c;只要回答不出来&#xff0c;面试就通过不了&#xff0c;需要…

学习SOLIDWORKS Simulation有限元分析就是如此简单

当工程师完成产品设计后&#xff0c;一般都会想验证一下设计是否满足要求&#xff0c;此时SOLIDWORKS Simulation有限元分析就是很好的工具&#xff0c; SOLIDWORKS 创新的将复杂的分析过程和术语进行“包装”&#xff0c;使其操作比较简单&#xff0c;同时功能比较强大&#x…

【NLP】训练LLM的不同方式

一、说明 在大型语言模型&#xff08;LLM&#xff09;领域&#xff0c;有各种各样的 训练机制&#xff0c;具有不同的手段&#xff0c;要求和目标。由于它们服务于不同的目的&#xff0c;因此重要的是不要将它们相互混淆&#xff0c;并了解它们适用的不同场景。 在本文中&#…

leetcode375. 猜数字大小 II(动态规划-java)

猜数字大小 II lc - 375 猜数字大小 II题目描述暴力递归 记忆化搜索代码演示动态规划 动态规划 lc - 375 猜数字大小 II 题目描述 我们正在玩一个猜数游戏&#xff0c;游戏规则如下&#xff1a; 我从 1 到 n 之间选择一个数字。 你来猜我选了哪个数字。 如果你猜到正确的数字&…

微型导轨在包棉机中的作用

随着工业革命的开展&#xff0c;各种人工智能设备的迅猛发展&#xff0c;为了适应高速发展的工业自动化&#xff0c;越来越多的工业企业开始采用微型导轨&#xff0c;尤其是在包棉机中的应用。 包棉机是一种用于加工棉花的机械设备&#xff0c;它的主要功能是将原始棉花经过清洁…

Wlan——CAPWAP隧道的建立过程

CAPWAP基本概念 CAPWAP全称为无线接入点的控制和配置协议&#xff08;Control And Provisioning of Wireless Access Points Protocol Specification&#xff09; CAPWAP协议主要作用 1、AP可以通过CAPWAP实现自动发现AC 2、AC通过CAPWAP协议对AP进行管理、业务配置下发 3…

电信流失用户画像

三大运营商电信、联通、移动&#xff0c;都想扩大自己的客户群体。据研究&#xff0c;获取新客户所需的成本远高于保留现有客户的成本。因此为了满足在激烈竞争中的优势&#xff0c;提前预测出用户是否会流失&#xff0c;采取保留措施成为一大挑战。本文和你一起探索电信流失客…

7.1 动手实现AlexNet

AlexNet引入了dropput层 代码 import torch from torch import nn from d2l import torch as d2lnet nn.Sequential(# 样本数为1,通道数为96,11x11的卷积核,步幅为4&#xff0c;减少输出的高度和深度。 LeNet的通道数才6&#xff0c;此处96&#xff0c;为什么要增加这么多通…

VS2017 查看dll

dumpbin /exports xxxx.dll 导出函数 dumpbin /dependents xxxx.dll 依赖关系