11_Pulsar Adaptors适配器、kafka适配器、Spark适配器

2.3. Pulsar Adaptors适配器
2.3.1.kafka适配器
2.3.2.Spark适配器

2.3. Pulsar Adaptors适配器

2.3.1.kafka适配器

Pulsar 为使用 Apache Kafka Java 客户端 API 编写的应用程序提供了一个简单的解决方案。
在生产者中, 如果想不改变原有kafka的代码架构, 就切换到Pulsar的平台中, 那么Pulsar adaptor on kafka就变的非常的有用了, 它可以帮助我们在不改变原有kafka的代码基础上, 即可接入pulsar, 但是需要注意, 相关配置信息需要进行一些调整, 例如: 地址与topic

  • 1- 需要导入Pulsar集成kafka的依赖包, 删除掉原有Kafka-client包
<dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client-kafka</artifactId> <version>2.8.0</version> 
</dependency>

注: 目前Pulsar并在Maven中央仓库中并没有提供Pulsar-client-kafka 2.8.1的包, 故此处导入2.8.0

  • 2-编写生产者
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class KafkaAdaptorProducer {public static void main(String[] args) throws ExecutionException, InterruptedException {//1. 创建kafka生产者的核心类对象: KafkaProducer// 1.1: 创建生产者配置对象: 设置相关配置Properties props = new Properties();props.put("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");// 消息的确认方案props.put("acks", "all");// key序列化类型props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");// value 序列化类型props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");Producer<String, String> producer = new KafkaProducer<>(props); //2. 发送数据 for (int i = 0; i < 10; i++) { //2.1: 创建 生产者数据承载对象 一个对象代表是一条消息数据ProducerRecord<String, String> producerRecord = new ProducerRecord<>("persistent://public/default/txn_t1",Integer.toString(i), Integer.toString(i)); producer.send(producerRecord).get(); }//3. 释放资源 producer.close();}}
  • 3-编写消费者
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;public class KafkaAdaptorConsumer {public static void main(String[] args) {//1. 创建kafka的消费者的核心对象: KafkaConsumer//1.1: 创建消费者配置对象, 并设置相关的参数:Properties props = new Properties();props.setProperty("bootstrap.servers", "pulsar://node1:6650,node2:6650,node3:6650");//消费者组的 idprops.setProperty("group.id", "test");//是否启动消费者自动提交消费偏移量props.setProperty("enable.auto.commit", "true");//每间隔多长时间提交一次偏移量:单位 毫秒props.setProperty("auto.commit.interval.ms","1000");//key 反序列化props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");//val 发序列化props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);//2. 给消费者设置订阅topic:consumer.subscribe(Arrays.asList("persistent://public/default/txn_t1"));//3. 循环获取相关的消息数据while (true) {//3.1: 从kafka中获取消息数据: 参数表示等待超时时间//注意: 如果没有获取到数据, 返回一个空集合对象, 如果数据集合中有多个 ConsumerRecord 对象ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//3.2 遍历ConsumerRecords 获取每一个 ConsumerRecord 对象 : ConsumerRecord 消费者数据承载对象, 一个对象就是一条消息for (ConsumerRecord<String, String> record : records) {String massage = record.value();System.out.println("消息数据为:"+massage);}} } 
}
  • 4- 先运行消费者, 进行监听, 然后运行生产者, 观察消费者是否可以正常消费到数据
    在这里插入图片描述

2.3.2.Spark适配器

Pulsar 的 Spark Streaming 接收器是一个自定义的接收器,它使用 Apache Spark Streaming 能够从 Pulsar 接
收原始数据。

应用程序可以通过 Spark Streaming receiver 接收 Resilient Distributed Dataset (RDD) 格式的数据,并可
以通过多种方式对其进行处理。

  • 1-导入相关的依赖包
<dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-spark</artifactId><version>2.8.0</version>
</dependency>
  • 2-编写spark的流式代码
String serviceUrl = "pulsar://localhost:6650/"; 
String topic = "persistent://public/default/test_src"; 
String subs = "test_sub"; 
SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Pulsar Spark Example"); 
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(60)); 
ConsumerConfigurationData<byte[]> pulsarConf = new ConsumerConfigurationData(); 
Set<String> set = new HashSet<>(); 
set.add(topic); 
pulsarConf.setTopicNames(set); 
pulsarConf.setSubscriptionName(subs); 
SparkStreamingPulsarReceiver pulsarReceiver = new SparkStreamingPulsarReceiver( 
serviceUrl, 
pulsarConf, 
new AuthenticationDisabled()); 
JavaReceiverInputDStream<byte[]> lineDStream = jsc.receiverStream(pulsarReceiver);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/62331.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ArcGIS】经纬度数据转化成平面坐标数据

将点位置导入Gis中&#xff0c;如下&#xff08;经纬度表征位置&#xff09;&#xff1a; 如何利用Gis将其转化为平面坐标呢&#xff1f; Step1 坐标变换 坐标变换&#xff0c;打开ArcToolbox&#xff0c;找到“数据管理工具”->“投影和变换”->“要素”->“投影”…

ChatGPT已闯入学术界,Elsevier推出AI工具

2022年11月&#xff0c;OpenAI公司发布了ChatGPT&#xff0c;这是迄今为止人工智能在现实世界中最重要的应用之一。 当前&#xff0c;互联网搜索引擎中出现了越来越多的人工智能&#xff08;AI&#xff09;聊天机器人&#xff0c;例如谷歌的Bard和微软的Bing&#xff0c;看起来…

掌握 JVM 调优命令

点击下方关注我&#xff0c;然后右上角点击...“设为星标”&#xff0c;就能第一时间收到更新推送啦~~~ JVM 日常调优总结起来就是&#xff1a;首先通过 jps 命令查看当前进程&#xff0c;然后根据 pid 通过 jinfo 命令查看和修改 jvm 参数&#xff0c;通过 jstat 命令查看 cla…

Go 语言并发编程 及 进阶与依赖管理

1.0 从并发编程本质了解Go高性能的本质 1.1 Goroutine 协程可以理解为轻量级线程&#xff1b; Go更适合高并发场景原因之一&#xff1a;Go语言一次可以创建上万协成&#xff1b; “快速”&#xff1a;开多个协成 打印。 go func(): 在函数前加 go 代表 创建协程; time.Sleep():…

软件开发人员这样跟踪时间,简单又有效!

作为软件开发人员&#xff0c;会有大量的代码需要编写&#xff0c;有大量的错误需要解决&#xff0c;有大量的功能需要构建。问题是&#xff0c;错误计算的任务堆积如山&#xff0c;很难正确掌握时间。 令人震惊的现实是&#xff0c;近一半的开发人员每周花在软件项目上的时间…

免费实用的日记应用:Day One for Mac中文版

Day One for Mac是一款运行在Mac平台上的日记软件&#xff0c;你可以使用Day One for mac通过快速菜单栏条目、提醒系统和鼓舞人心的信息来编写更多内容&#xff0c;day one mac版还支持Dropbox同步功能&#xff0c;想要day one mac中文免费版的朋友赶紧来试试吧&#xff01; …

AI模型公司如何定位 ?

AI模型公司如何定位 ? 企业与消费者&#xff1f; 和 多用途与利基市场&#xff1f; 文本将分解每个象限。 消费类和多用途 最有价值的象限并引发了人工智能热潮。 顶级公司&#xff1a; Open AI - 通过 ChatGPT 为消费者构建&#xff0c;并通过其旗舰 GPT 模型为企…

小鱼深度产品测评之:阿里云容器服务器ASK,一款不需购买节点,即可直接部署容器应用。

容器服务器ASK测评 1、引言2、帮助文档3、集群3.1集群列表3.1.1 详情3.1.1.1概览 tab3.1.1.2基本信息 tab3.1.1.4集群资源 tab3.1.1.5 集群日志 tab3.1.1.6 集群任务 tab 3.1.2 应用管理3.1.2.1 详情3.1.2.2 详情3.1.2.3 伸缩3.1.2.4 监控 3.1.3 查看日志3.1.3.1 集群日志3.1.3…

Vue组件化开发思想;Vue的全局组件;Vue的局部组件;Vue的开发模式和解析;Vue CLI安装和使用;Vue项目的创建方式–Vite

目录 1_Vue组件化开发思想1.1_认识组件化开发1.2_Vue的组件化1.3_注册组件的方式 2_Vue的全局组件3_Vue的局部组件4_Vue的开发模式和解析4.1_Vue的开发模式4.2_单文件的特点4.3_如何支持SFC4.4_VSCode对SFC文件的支持 5_Vue CLI安装和使用5.1_Vue CLI脚手架5.2_Vue CLI 安装和使…

基于SLAM的规划算法仿真复现|SLAM|智能规划

图片来自百度百科 前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 首先是博主的高质量博客的汇总&#xff0c;这个专栏里面的博客&#xff0c;都是博主最最用心写的一部分&#xff0c;干货满满&#xff0c;希望对大家有帮助。 高质量博客汇总https://blog.csdn.n…

给大家推荐9个Linux高效运维命令技巧!

文章目录 前言一、实用的 xargs 命令二、命令或脚本后台运行三、找出当前系统内存使用量较高的进程四、找出当前系统CPU使用量较高的进程五、同时查看多个日志或数据文件六、持续ping并将结果记录到日志七、查看tcp连接状态八、查找80端口请求数最高的前20个IP九、ssh实现端口转…

STM32F429IGT6使用CubeMX配置GPIO点亮LED灯

1、硬件电路 2、设置RCC&#xff0c;选择高速外部时钟HSE,时钟设置为180MHz 3、配置GPIO引脚 4、生成工程配置 5、部分代码 6、实验现象