2023_Spark_实验二十四:SparkStreaming读取Kafka数据源:使用Direct方式

SparkStreaming读取Kafka数据源:使用Direct方式

一、前提工作

  • 安装了zookeeper

  • 安装了Kafka

  • 实验环境:kafka + zookeeper + spark

  • 实验流程

二、实验内容

实验要求:实现的从kafka读取实现wordcount程序

启动zookeeper

zk.sh start# zk.sh脚本 参考教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

启动Kafka

kf.sh start# kf.sh 参照教程 https://blog.csdn.net/pblh123/article/details/134730738?spm=1001.2014.3001.5502

 (测试用,实验不做)创建Kafka主题,如test,可参考:Kafka的安装与基本操作

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

--bootstrap-server  连接的Kafka Broker主机名称和端口号

--create 创建主题

--describe 查看主题详细描述

# 创建kafka主题测试
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --create --bootstrap-server hd1:9092 --replication-factor 3 --partitions 1 --topic gnutest2# 再次查看first主题的详情
/opt/module/kafka_2.12-3.0.0/bin/kafka-topics.sh --bootstrap-server hd1:9092 --describe --topic gnutest2

启动Kafka控制台生产者,可参考:Kafka的安装与基本操作

# 创建kafka生产者
/opt/module/kafka_2.12-3.0.0/bin/kafka-console-producer.sh --bootstrap-server hd1:9092 --topic gnutest2

创建maven项目

添加kafka依赖

       <!--- 添加streaming依赖 ---><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>${spark.version}</version></dependency><!--- 添加streaming kafka依赖 ---><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.13</artifactId><version>3.4.1</version></dependency>

编写程序,如下所示:

package examsimport org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.DStream
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeimport java.lang/*** @projectName SparkLearning2023  * @package exams  * @className exams.SparkStreamingReadKafka  * @description ${description}  * @author pblh123* @date 2023/12/1 15:19* @version 1.0**/object SparkStreamingReadKafka {def main(args: Array[String]): Unit = {//  1. 创建spark,sc对象if (args.length != 2) {println("您需要输入一个参数")System.exit(5)}val musrl: String = args(0)val spark: SparkSession = new SparkSession.Builder().appName(s"${this.getClass.getSimpleName}").master(musrl).getOrCreate()val sc: SparkContext = spark.sparkContext// 生成streamingContext对象val ssc: StreamingContext = new StreamingContext(sc, Seconds(5))//  2. 代码主体val bststrapServers = args(1)val kafkaParms: Map[String, Object] = Map[String, Object]("bootstrap.servers" -> bststrapServers, //kafka列表"key.deserializer" -> classOf[StringDeserializer], k和v 的序列化类型"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream", //消费者组"auto.offset.reset" -> "latest", //如果没有记录偏移量,第一次从最开始读,有偏移量,接着偏移量读"enable.auto.commit" -> (true: java.lang.Boolean) // 消费者不自动提交偏移量)val topics = Array("gnutest2", "t100")// createDirectStream: 主动拉取数据val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParms))val mapDStream: DStream[(String, String)] = stream.map(record => (record.key, record.value))//kafka 是一个key value 格式的, 默认key 为null ,一般用不上val resultRDD: DStream[(String, Int)] = mapDStream.flatMap(_._2.split(" ")).map((_, 1)).reduceByKey(_ + _)// 打印resultRDD.print()//  3. 关闭sc,spark对象ssc.start()ssc.awaitTermination()ssc.stop()sc.stop()spark.stop()}
}

配置输入参数

生产者追加数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/239016.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【方案】智慧林业:如何基于EasyCVR视频能力搭建智能林业监控系统

随着人类进程的发展。城市化范围的扩大&#xff0c;森林覆盖率越来越低&#xff0c;为保障地球环境&#xff0c;保护人类生存的净土&#xff0c;森林的保护与监管迫在眉睫。TSINGSEE青犀智慧林业智能视频监控系统方案的设计&#xff0c;旨在利用现代科技手段提高林业管理的效率…

2023年AI时代中小企业智能化发展报告

今天分享的是AI系列深度研究报告&#xff1a;《2023年AI时代中小企业智能化发展报告》。 &#xff08;报告出品方&#xff1a;创业邦&#xff09; 报告共计&#xff1a;47页 AI——中小企业的智能化增长利器 继蒸汽机、电气化、信息化时代之后&#xff0c;由第四次工业革命开…

[HTML]Web前端开发技术6(HTML5、CSS3、JavaScript )DIV与SPAN,盒模型,Overflow——喵喵画网页

希望你开心&#xff0c;希望你健康&#xff0c;希望你幸福&#xff0c;希望你点赞&#xff01; 最后的最后&#xff0c;关注喵&#xff0c;关注喵&#xff0c;关注喵&#xff0c;佬佬会看到更多有趣的博客哦&#xff01;&#xff01;&#xff01; 喵喵喵&#xff0c;你对我真的…

【U8+】用友U8修改凭证提示:外部凭证在总账系统中不能修改。

【问题描述】 用友U8中&#xff0c;在总账模块中修改凭证的时候&#xff0c;提示&#xff1a; 外部凭证在总账系统中不能修改。 【原因分析】 在软件中&#xff0c;使用其他模块的情况下&#xff0c; 其他模块生成的凭证都会传到总账模块中&#xff0c;进而这些由其他模块生成…

【开源】基于JAVA语言的考研专业课程管理系统

项目编号&#xff1a; S 035 &#xff0c;文末获取源码。 \color{red}{项目编号&#xff1a;S035&#xff0c;文末获取源码。} 项目编号&#xff1a;S035&#xff0c;文末获取源码。 目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 数据中心模块2.2 考研高校模块2.3 高…

Tkinter 面向对象框架《二》

一、说明 Tkinter 教程 开发完整的 Tkinter 面向对象应用程序开发完整的 Tkinter 面向对象应用程序。 即使OOP的高手&#xff0c;也未必对面向对象全部掌握。至于 Tkinter的OOP编程&#xff0c;其实高手们也是在摸索实践中。 为了面向对象和Tkinter参与本教程。如果你来这里纯…

测评补单助力亚马逊,速卖通,国际站卖家抢占市场,提升转化和评分

想要快速提升商品的销量&#xff0c;测评补单这种方法见效是最快的。特别是新品上线&#xff0c;缺少用户评价&#xff0c;转化率不好&#xff0c;很多商家新品上线都会做测评补单&#xff0c;搞些商品好评&#xff0c;不但可以提升转化&#xff0c;同时在平台也可以获得更多展…

向光有光megauging使用说明书(二,轻量级的visionpro)

测试程序暂时支持80万&#xff08;包含1024*768&#xff09;以上的gige工业相机 我们程序中使用注意力机制&#xff0c;其实就是感兴趣区域&#xff08;roi&#xff0c;你看过我前面博文&#xff0c;就应该明白&#xff09;精神的延伸&#xff0c;我们只处理全景图中的1024*76…

抑郁症由什么引起?

抑郁症的发生并不是单一原因所导致&#xff0c;而是多种因素相互作用的结果。以下是一些主要的原因&#xff1a; 首先&#xff0c;生物学因素在抑郁症的发病中起到了关键作用。研究显示&#xff0c;抑郁症可能与遗传有关&#xff0c;家族中有患抑郁症的成员会增加个体患病的风…

Windows的常用cmd命令总结

文章目录 一.盘符切换二: cd命令(打开文件/文件夹)三:查看目录四.创建和删除文件夹五.查看本机ip地址六.清除当前屏幕七.复制文件到另一个地方八.移动文件到另一个地方九.删除文件&#xff08;不能删除文件夹&#xff09;十.测试网络连接十一.停止任务进程Windows快捷键总结大全…

Java研学-IO流(三)

六 字节流 – 字节输出流系列 OutPutStream体系 1 OutPutStream系列 – 字节输出流 // 表示字节输出流所有类的超类&#xff0c;输出流接受输出字节并将其发送到某个接收器 public abstract class OutputStreamFileOutputStream/BufferedOutputStream 2 FileOutputStream类设…

Elasticsearch分词器--空格分词器(whitespace analyzer)

介绍 文本分析&#xff0c;是将全文本转换为一系列单词的过程&#xff0c;也叫分词。analysis是通过analyzer(分词器)来实现的&#xff0c;可以使用Elasticearch内置的分词器&#xff0c;也可以自己去定制一些分词器。除了在数据写入时将词条进行转换&#xff0c;那么在查询的时…