五分钟,Docker安装flink,并使用flinksql消费kafka数据

1、拉取flink镜像,创建网络

docker pull flink
docker network create flink-network

2、创建 jobmanager

# 创建 JobManager docker run \-itd \--name=jobmanager \--publish 8081:8081 \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest jobmanager 

3、创建 taskmanager

# 创建 TaskManager docker run \-itd \--name=taskmanager \--network flink-network \--env FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" \flink:latest taskmanager 

4、访问 http://localhost:8081/

在这里插入图片描述

4.1 修改Task Slots

默认的Slots num是1,我们可以修改为5:
修改的目录是jobmanager和taskmanager的/opt/flink/confflink-conf.yaml文件:
在这里插入图片描述
在这里插入图片描述
修改taskmanager.numberOfTaskSlots:即可。
注意:默认的docker容器中没有vi/vim命令,可以使用docker cp命令,复制出来修改,然后在复制回去,如下:

docker cp taskmanager:/opt/flink/conf/flink-conf.yaml .
docker cp flink-conf.yaml taskmanager:/opt/flink/conf/

在这里插入图片描述

5、通过flinksql消费Kafka

确保有一个可用的kafka,如果没有,可以五分钟内,Docker搭建一个
Docker安装kafka 3.5
并且通过python,简单写一个生产者
Python生产、消费Kafka

5.1 导入flink-sql-connector-kafka jar包

顾名思义,用于连接flinksql和kafka。
进入flink

docker exec -it jobmanager /bin/bash

进入 flink的bin目录

cd /opt/flink/bin

查看flink版本:

flink --version

可以看出,我的版本是1.18.0
在这里插入图片描述
根据自己的flink版本,下载对应的 flink-sql-connector-kafka jar包
https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka
因为我是1.18.0,所以选择下图的版本包:
在这里插入图片描述
点进去进行下载:
在这里插入图片描述

将下载的jar包,分别在jobmanager,taskmanager /opt/flink/lib目录下,注意,是两个都要放,如下图:
在这里插入图片描述
可以使用docker cp test.txt jobmanager:/opt/flink/lib命令,用户宿主机和docker容器文件传输。把test.txt换成对应的jar包即可

5.2 flinksql消费kafka

进入jobmanager中,执行

cd /opt/flink/bin
sql-client.sh

在这里插入图片描述
在这里插入图片描述
Flink SQL执行以下语句:

CREATE TABLE KafkaTable (`count_num` STRING,`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH ('connector' = 'kafka','topic' = 'kafka_demo','properties.bootstrap.servers' = '192.168.10.15:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','format' = 'json'
);show tables;
select * from KafkaTable;

可以看到Flink在消费kafka数据,如下图:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/211050.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

PMP 考试的含金量怎么样?

这里可以三个思考题和三个价值点帮你认识PMP考试。 三个思维题 1.工作环境 PMP证书含金量的一个很大因素,就是考证的人是否对PMP证书有比较强的实际需求。相反,如果只是听别人说,PMP证书很好,不管工作中是否有需要,…

一条Update语句的执行过程是怎样的?

先看第一个问题,这里做个简单描述 ,因为我们着重还是看Update MySQL执行一条Select语句是怎么运行的? 这个问题大家在面试的时候大家都背过类似的题,而且网上也有很多答案,这里分享一个大致流程介绍,关于…

RTOS的任务触发底层逻辑

(定时器用于计时和触发事件,任务则由调度器进行调度和执行:每当时钟节拍到达时,系统会触发一个称为 tick 中断的事件。当 tick 中断发生时,操作系统会在中断服务例程中执行一定的处理,其中包括更新任务的运…

Flink之状态TTL机制内容详解

1 状态TTL机制 状态的 TTL机制就是Flink提供的自动化删除状态中的过期数据,配置 TTL的 API可以做到对状态中的数据进行冷热数据分离,将热数据一直保存在状态存储器中,将冷数据进行定期删除. 1.1 API简介 TTL常用API如下: API注解setTtl(Time.seconds(…))配置过期时长,当状态…

window非gui形式运行jmeter脚本

配置jmeter环境 新增1个环境变量: JMETER_HOMED:\Tools\apache-jmeter-5.0 【jmeter文件夹】 编辑CLASSPATH: CLASSPATH后面加上 %JMETER_HOME%\lib\ext\ApacheJMeter_core.jar; %JMETER_HOME%\lib\jorphan.jar; 编辑path: path后面加上 %JM…

Java基层卫生健康云综合管理(云his)系统源码

云HIS(Cloud-Based Healthcare Information System)是基于云计算的医院健康卫生信息系统。它运用云计算、大数据、物联网等新兴信息技术,按照现代医疗卫生管理要求,在一定区域范围内以数字化形式提供医疗卫生行业数据收集、存储、…

基于C#实现赫夫曼树

赫夫曼树又称最优二叉树,也就是带权路径最短的树,对于赫夫曼树,我想大家对它是非常的熟悉,也知道它的应用场景,但是有没有自己亲手写过,这个我就不清楚了,不管以前写没写,这一篇我们…

可视化大屏时代的到来:智慧城市管理的新思路

随着科技的不断发展,智能芯片作为一种新型的电子元件,被广泛应用于各个领域,其中智慧芯片可视化大屏是一种重要的应用形式。 一、智慧芯片可视化大屏的优势 智慧芯片可视化大屏是一种将智能芯片与大屏幕显示技术相结合的产品,山海…

四肽-3——增加皮肤光滑度、紧致度,让肌肤看起来更年轻

Caprooyl四肽-3,也称为KGHK Caproic acid,是一种基于TGF-(转化生长因子β)的仿生脂肽结构,在细胞外基质成分的自然产生中发挥重要作用。肽序列是Lys-Gly-His-Lys。它可以减少细纹和皱纹的出现,提高皮肤弹性…

神经网络训练技巧

1. 逐渐增加训练数据规模,比如先在小数据集上训练,之后再增大数据集继续训练。

硬件开发笔记(十三):RK3568底板电路HDMI2.0模块原理图分析、HDMI硬件接口详解

若该文为原创文章,转载请注明原文出处 本文章博客地址:https://hpzwl.blog.csdn.net/article/details/134504319 红胖子网络科技博文大全:开发技术集合(包含Qt实用技术、树莓派、三维、OpenCV、OpenGL、ffmpeg、OSG、单片机、软硬…

Vatee万腾的数字时代探险:vatee科技力量的未来洞悉

在数字化的时代潮流中,Vatee万腾以其强大的科技力量,正在进行一场前所未有的数字时代探险。 Vatee万腾的数字时代探险源于其对未来的洞悉。通过深度研究和前瞻性思考,他们将科技力量与未来趋势相结合,勾勒出数字时代的新蓝图&…