kettle从入门到精通 第五十三课 ETL之kettle MQTT/RabbitMQ consumer实战

1、上一节课我们学习了MQTT producer 生产者步骤,MQTT consumer消费者步骤。该步骤可以从支持MRQTT协议的中间件获取数据,该步骤和kafka consumer 一样可以处理实时数据交互,如下图所示:

 2、双击步骤打开MQTT consumer 配置窗口,如下图所示:

Step name:自定义步骤名称。

Transformation:设置子转换,该子转换的作用是从中间件读取流数据,然后将字段返回给MQTT consumer步骤进行使用。

Connection:指定此步骤将连接的 MQTT 服务器的地址,如127.0.0.1:1883(注意这里的端口是1883,不是5672)

Client ID:指定 MQTT 客户端的唯一 ID。MQTT 服务器使用此客户端 ID 来识别每个不同的客户端及其当前状态。

Topics name:在主题名称字段中,输入您希望订阅流数据(消息)的 MQTT 主题的名称。其实这里的topic是RabbitMQ中的routing key(另外这里的routing key 一定不要绑定队列,否则MQTT consumer步骤无法接收数据)。

Quality of Service:是消息传递的保证级别。选择以下选项之一。
至多一次(0),这是默认值
至少一次(1)
恰好一次(2)

 3、安全验证设置,如下图所示:

Username:MQTT服务器的用户名,如admin

Password:MQTT服务器的密码。

Use secure protocol:选择此选项以定义连接的 SSL 属性,本次不做介绍。

 3、批次设置,使用此选项卡来指定在处理之前要拉取多少消息。您可以指定消息数量和/或特定的时间量。

Duration(ms):
请指定一个以毫秒为单位的时间。此值表示在执行转换之前该步骤将花费多少时间来收集记录。
如果将此选项设置为0,则根据参数Number of records记录数触发消费。要运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Number of records
指定一个数字。在每收集到‘X’条记录之后,指定的转换将被执行,并且这些‘X’条记录将被传递给转换过程。
如果将此选项设置为0,则将参数Duration按持续时间触发消费。为了运行转换,持续时间或记录数选项都必须包含一个大于0的值。

Maximum concurrent batches

指定用于同时收集记录的最大批次数。默认值为1,表示使用单个批次来收集记录。仅当您的消费者步骤无法跟上数据流的速度时,才应使用此选项。
您的计算环境必须具备足够的 CPU 和内存来进行此实现。如果您的环境无法处理指定的最大并发批次数,将会出现错误。


Message prefetch limit
请指定此步骤将排队等待处理的传入消息的限制,即从 kfakfa broker接收到的消息。
设置此值会强制kafka broker处理超过指定限制的消息的背压。默认排队消息的数量是100000条。

 4、字段设置,这里采用默认值就行了,不用调整。

5、子转换结果字段设置,选择子转换返回数据的步骤。 

 6、同上一节课,本次不再介绍。

 7、子转换配置,将Get records from stream步骤拉到画布,然后填写Message、Topic两个字段,类型都是设置为String即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/637920.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

golang学习笔记(net/http库基本使用)

关于net/http库 我们先看看标准库net/http如何处理一个请求。 import ("fmt""log""net/http" )var count 0func main() {http.HandleFunc("/", handler)http.HandleFunc("/count", counter)log.Fatal(http.ListenAndServ…

ChatGPT研究论文提示词集合2-【形成假设、设计研究方法】

点击下方▼▼▼▼链接直达AIPaperPass ! AIPaperPass - AI论文写作指导平台 目录 1.形成假设 2.设计研究方法 3.书籍介绍 AIPaperPass智能论文写作平台 近期小编按照学术论文的流程,精心准备一套学术研究各个流程的提示词集合。总共14个步骤&#…

【设计模式】单例模式|最常用的设计模式

写在前面 单例模式是最常用的设计模式之一,虽然简单,但是还是有一些小坑点需要注意。本文介绍单例模式并使用go语言实现一遍单例模式。 单例模式介绍 简介 单例模式保证一个类仅有一个实例,并提供一个访问它的全局访问点。 使用场景&#…

Hadoop高可用(High Availability)

Hadoop HA(High Availability) Hadoop 的高可用性,重点是确保存储(HDFS)和计算(YARN)资源在面对节点故障时能够继续正常运行。 1.概述 所谓HA(High Availablity)&…

Linux动态追踪——eBPF

目录 摘要 1 什么是 eBPF 2 eBPF 支持的功能 3 BCC 4 编写脚本 5 总结 6 附 摘要 ftrace 和 perf 与 ebpf 同为 linux 内核提供的动态追踪工具,其中 ftrace 侧重于事件跟踪和内核行为的实时分析,perf 更侧重于性能分析和事件统计,与…

ROS摄像机标定

文章目录 一、环境准备二、摄像头标定2.1 为什么要标定2.2 标定前准备2.2.1 标定板2.2.2 摄像头调焦 2.3 开始标定2.4 测试标定结果 总结参考资料 一、环境准备 安装usb_cam相机驱动 sudo apt-get install ros-noetic-usb-cam 安装标定功能包 sudo apt-get install ros-noet…

Vitis HLS 学习笔记--HLS优化指令示例-目录

目录 1. 示例集合概述 2. 内容分析 2.1 array_partition 2.2 bind_op_storage 2.3 burst_rw 2.4 critical_path 2.5 custom_datatype 2.6 dataflow_stream 2.7 dataflow_stream_array 2.8 dependence_inter 2.9 gmem_2banks 2.10 kernel_chain 2.11 lmem_2rw 2.1…

从零实现诗词GPT大模型:实现Transformer架构

专栏规划: https://qibin.blog.csdn.net/article/details/137728228 首先说明一下,跟其他文章不太一样,在本篇文章中不会对Transformer架构中的自注意力机制进行讲解,而是后面单独1~2篇文章详细讲解自注意力机制,我认为由浅入深的先了解Transformer整体架构和其中比较简单…

Java之类和对象

一面向对象的初步认知 1.什么是面向对象 Java是一门纯面向对象的语言(Object Oriented Program,简称OOP),在面向对象的世界里,一切皆为对象。面向对象是解决问题的一种思想,主要依靠对象之间的交互完成一件事情。用面向对象的思想…

政安晨:【Keras机器学习示例演绎】(八)—— 利用 PointNet 进行点云分割

目录 简介 导入 下载数据集 加载数据集 构建数据集 预处理 创建 TensorFlow 数据集 PointNet 模型 排列不变性 变换不变性 点之间的相互作用 实例化模型 训练 直观了解培训情况 推论 最后说明 政安晨的个人主页:政安晨 欢迎 👍点赞✍评论…

【AI开发:音频】二、GPT-SoVITS使用方法和过程中出现的问题(GPU版)

1.FileNotFoundError: [Errno 2] No such file or directory: logs/guanshenxxx/2-name2text-0.txt 这个问题中包含了两个: 第一个:No module named pyopenjtalk 我的电脑出现的就是这个 解决:pip install pyopenjtalk 第二个&#xff1a…

Wpf 使用 Prism 实战开发Day21

配置默认首页 当应用程序启动时&#xff0c;默认显示首页 一.实现思路&#xff0c;通过自定义接口来配置应用程序加载完成时&#xff0c;设置默认显示页 步骤1.创建自定义 IConfigureService 接口 namespace MyToDo.Common {/// <summary>/// 配置默认显示页接口/// <…