Kafka和Spark Streaming的组合使用(Spark 3.5.1)

一、安装Kafka

1.执行以下命令完成Kafka的安装:
cd ~  //默认压缩包放在根目录
sudo tar -zxf  kafka_2.11-2.3.1.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.11-2.3.1 kafka-2.3.1
sudo chown -R qiangzi ./kafka-2.3.1

二、启动Kafaka

1.首先需要启动Kafka,打开一个终端,输入下面命令启动Zookeeper服务:
cd  /usr/local/kafka-2.3.1
./bin/zookeeper-server-start.sh  config/zookeeper.properties

注意:以上现象是Zookeeper服务器已经启动,正在处于服务状态。不要关闭!

2.打开第二个终端,输入下面命令启动Kafka服务:
cd  /usr/local/kafka-2.3.1
./bin/kafka-server-start.sh  config/server.properties//加了“&”的命令,Kafka就会在后台运行,即使关闭了这个终端,Kafka也会一直在后台运行。
bin/kafka-server-start.sh  config/server.properties  &

注意:同样不要误以为死机了,而是Kafka服务器已经启动,正在处于服务状态。

三、创建Topic

1.再打开第三个终端,然后输入下面命令创建一个自定义名称为“wordsender”的Topic:
cd /usr/local/kafka-2.3.1
./bin/kafka-topics.sh --create --zookeeper localhost:2181 \
> --replication-factor 1 --partitions 1 \
> --topic wordsender
2.然后,可以执行如下命令,查看名称为“wordsender”的Topic是否已经成功创建:
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.再新开一个终端(记作“监控输入终端”),执行如下命令监控Kafka收到的文本:
cd /usr/local/kafka-2.3.1
bin/kafka-console-consumer.sh \
> --bootstrap-server localhost:9092 --topic wordsender

注意,所有这些终端窗口都不要关闭,要继续留着后面使用。

四、Spark准备工作

Kafka和Flume等高级输入源,需要依赖独立的库(jar文件),因此,需要为Spark添加相关jar包。访问MVNREPOSITORY官网(http://mvnrepository.com),下载spark-streaming-kafka-0-10_2.12-3.5.1.jar和spark-token-provider-kafka-0-10_2.12-3.5.1.jar文件,其中,2.12表示Scala的版本号,3.2.0表示Spark版本号。然后,把这两个文件复制到Spark目录的jars目录下(即“/usr/local/spark/jars”目录)。此外,还需要把“/usr/local/kafka/libs”目录下的kafka-clients-2.6.0.jar文件复制到Spark目录的jars目录下。

sudo mv ./spark-streaming-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/
sudo mv ./spark-token-provider-kafka-0-10_2.12-3.5.1.jar /usr/local/spark-3.5.1/jars/

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-streaming-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

spark-streaming-kafka-0-10_2.12-3.5.1.jar的下载页面:

Maven Repository: org.apache.spark » spark-token-provider-kafka-0-10_2.12 » 3.5.1 (mvnrepository.com)

进入下载页面以后,如下图所示,点击红色方框内的“jar”,就可以下载JAR包了。

五、编写Spark Streaming程序使用Kafka数据源

1.编写生产者(Producer)程序
(1)新打开一个终端,然后,执行如下命令创建代码目录和代码文件:
cd /usr/local/spark-3.5.1
mkdir mycode
cd ./mycode
mkdir kafka
mkdir -p kafka/src/main/scala
vi kafka/src/main/scala/KafkaWordProducer.scala
(2)使用vi编辑器新建了KafkaWordProducer.scala

它是用来产生一系列字符串的程序,会产生随机的整数序列,每个整数被当作一个单词,提供给KafkaWordCount程序去进行词频统计。请在KafkaWordProducer.scala中输入以下代码:

import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object KafkaWordProducer {def main(args: Array[String]) {if (args.length < 4) {System.err.println("Usage: KafkaWordProducer <metadataBrokerList> <topic> " +"<messagesPerSec> <wordsPerMessage>")System.exit(1)}val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args// Zookeeper connection propertiesval props = new HashMap[String, Object]()props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")val producer = new KafkaProducer[String, String](props)// Send some messageswhile(true) {(1 to messagesPerSec.toInt).foreach { messageNum =>val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).
toString).mkString(" ")print(str)println()val message = new ProducerRecord[String, String](topic, null, str)producer.send(message)}Thread.sleep(1000)}}
}
2.编写消费者(Consumer)程序

在“/usr/local/spark/mycode/kafka/src/main/scala”目录下创建代码文件KafkaWordCount.scala,用于单词词频统计,它会把KafkaWordProducer发送过来的单词进行词频统计,代码内容如下:

cd /usr/local/spark-3.5.1/mycode
vi kafka/src/main/scala/KafkaWordCount.scala
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribeobject KafkaWordCount{def main(args:Array[String]){val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")val sc = new SparkContext(sparkConf)sc.setLogLevel("ERROR")val ssc = new StreamingContext(sc,Seconds(10))ssc.checkpoint("file:///usr/local/spark-3.5.1/mycode/kafka/checkpoint") //设置检查点,如果存放在HDFS上面,则写成类似ssc.checkpoint("/user/hadoop/checkpoint")这种形式,但是,要启动Hadoopval kafkaParams = Map[String, Object]("bootstrap.servers" -> "localhost:9092","key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> "use_a_separate_group_id_for_each_stream","auto.offset.reset" -> "latest","enable.auto.commit" -> (true: java.lang.Boolean))val topics = Array("wordsender")val stream = KafkaUtils.createDirectStream[String, String](ssc,PreferConsistent,Subscribe[String, String](topics, kafkaParams))stream.foreachRDD(rdd => {val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRangesval maped: RDD[(String, String)] = rdd.map(record => (record.key,record.value))val lines = maped.map(_._2)val words = lines.flatMap(_.split(" "))val pair = words.map(x => (x,1))val wordCounts = pair.reduceByKey(_+_)wordCounts.foreach(println)})ssc.startssc.awaitTermination}
}
3.在路径“file:///usr/local/spark/mycode/kafka/”下创建“checkpoint”目录作为预写式日志的存放路径。
cd ./kafka
mkdir checkpoint
4.继续在当前目录下创建StreamingExamples.scala代码文件,用于设置log4j:
cd /usr/local/spark-3.5.1/mycode/
vi kafka/src/main/scala/StreamingExamples.scala/*StreamingExamples.scala*/
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}                                                                                 /** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {/** Set reasonable logging levels for streaming if the user has not configured log4j. */def setStreamingLogLevels() {val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElementsif (!log4jInitialized) {// We first log something to initialize Spark's default logging, then we override the// logging level.logInfo("Setting log level to [WARN] for streaming example." +" To override add a custom log4j.properties to the classpath.")Logger.getRootLogger.setLevel(Level.WARN)}                                                                                                                     }                                                                                                                     } 
5.编译打包程序

现在在“/usr/local/spark/mycode/kafka/src/main/scala”目录下,就有了如下3个代码文件:

然后,执行下面命令新建一个simple.sbt文件:

cd /usr/local/spark-3.5.1/mycode/kafka/
vim simple.sbt

在simple.sbt中输入以下代码:

name := "Simple Project"
version := "1.0"
scalaVersion := "2.12.18"
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.5.1"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "3.5.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "3.5.1"
libraryDependencies += "org.apache.kafka" %% "kafka-clients" % "2.6.0"

然后执行下面命令,进行编译打包:

cd  /usr/local/spark/mycode/kafka/
/usr/local/sbt-1.9.0/sbt/sbt  package

打包成功界面

6. 运行程序

首先,启动Hadoop,因为如果前面KafkaWordCount.scala代码文件中采用了ssc.checkpoint
("/user/hadoop/checkpoint")这种形式,这时的检查点是被写入HDFS,因此需要启动Hadoop。启动Hadoop的命令如下:

cd  /usr/local/hadoop-2.10.1
./sbin/start-dfs.sh
或者
start-dfs.sh
start-yarn.sh

启动Hadoop成功以后,就可以测试刚才生成的词频统计程序了。
要注意,之前已经启动了Zookeeper服务和Kafka服务,因为之前那些终端窗口都没有关闭,所以,这些服务一直都在运行。如果不小心关闭了之前的终端窗口,那就参照前面的内容,再次启动Zookeeper服务,启动Kafka服务。
然后,新打开一个终端,执行如下命令,运行“KafkaWordProducer”程序,生成一些单词(是一堆整数形式的单词):

cd  /usr/local/spark/mycode/kafka/
/usr/local/spark/bin/spark-submit  \
> --class "KafkaWordProducer"   \
> ./target/scala-2.12/simple-project_2.12-1.0.jar  \
> localhost:9092  wordsender  3  5
 /usr/local/spark-3.5.1/bin/spark-submit --class "KafkaWordProducer" --packages org.apache.kafkkafka-clients:2.6.0 ./target/scala-2.12/simple-project_2.12-1.0.jar localhost:9092 wordsender 3 5

注意,上面命令中,“localhost:9092 wordsender 3 5”是提供给KafkaWordProducer程序的4个输入参数,第1个参数“localhost:9092”是Kafka的Broker的地址,第2个参数“wordsender”是Topic的名称,我们在KafkaWordCount.scala代码中已经把Topic名称写死掉,所以,KafkaWordCount程序只能接收名称为“wordsender”的Topic。第3个参数“3”表示每秒发送3条消息,第4个参数“5”表示每条消息包含5个单词(实际上就是5个整数)。
执行上面命令后,屏幕上会不断滚动出现类似如下的新单词:

不要关闭这个终端窗口,让它一直不断发送单词。然后,再打开一个终端,执行下面命令,运行KafkaWordCount程序,执行词频统计:

cd  /usr/local/spark/mycode/kafka/
/usr/local/spark/bin/spark-submit  \
> --class "KafkaWordCount"  \
> ./target/scala-2.12/simple-project_2.12-1.0.jar
出现报错信息,暂时无法解决

待完善!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/678771.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Qt应用开发(拓展篇)——图表 QChart

一、前言 QChart是一个图形库模块&#xff0c;它可以实现不同类型的序列和其他图表相关对象(如图例和轴)的图形表示。要在布局中简单地显示图表&#xff0c;可以使用QChartView来代替QChart。此外&#xff0c;线条、样条、面积和散点序列可以通过使用QPolarChart类表示为极坐标…

给window电脑安装Linux系统

身为小白的我们在安装Linux系统时会遇到很多麻烦&#xff0c;没有接触过的命令&#xff0c;没有实际操作的经验&#xff0c;觉得Linux遥不可及&#xff0c;这篇博客讲述了我安装Linux的经历与安装过程遇到的问题与解决方案。我是为了学习Linux开发而安装&#xff0c;目的不同安…

重置密码之后无法ssh登录

背景描述 我这边有个服务器S&#xff0c;我从ServerA可以ssh上去&#xff0c;但是我从堡垒机B无法ssh上去&#xff1b;一开始以为是密码问题&#xff0c;手动重置密码&#xff0c;但是依然无法登录进去&#xff1b;一直提示密码错误&#xff1b;改了好几次密码都不行 问题原因…

WordPress MasterStudy LMS插件 SQL注入漏洞复现(CVE-2024-1512)

0x01 产品简介 WordPress和WordPress plugin都是WordPress基金会的产品。WordPress是一套使用PHP语言开发的博客平台。该平台支持在PHP和MySQL的服务器上架设个人博客网站。WordPress plugin是一个应用插件。 0x02 漏洞概述 WordPress Plugin MasterStudy LMS 3.2.5 版本及之…

HTTP常见面试题(二)

3.1 HTTP 常见面试题 HTTP特性 HTTP 常见到版本有 HTTP/1.1&#xff0c;HTTP/2.0&#xff0c;HTTP/3.0&#xff0c;不同版本的 HTTP 特性是不一样的。 HTTP/1.1 的优点有哪些&#xff1f; HTTP 最突出的优点是「简单、灵活和易于扩展、应用广泛和跨平台」。 1. 简单 HTTP…

【无标题】基于GIS、Python机器学习技术的地质灾害风险评价、易发性分析与信息化建库及灾后重建中的实践技术

理解地质灾害形成机理与成灾模式&#xff1b;从空间数据处理、信息化指标空间数据库构建、致灾因子提取&#xff0c;空间分析、危险性评价与制图分析等方面掌握GIS在灾害危险性评价中的方法&#xff1b;运用地质灾害危险性评价原理和技术方法 原文链接&#xff1a;基于GIS、Py…

Parallels Desktop 19 for Mac v19.3.0.54924中文破解版

Parallels Desktop 19 for Mac v19.3.0.54924中文破解版是一款强大的虚拟机软件&#xff0c;支持多操作系统&#xff0c;提供卓越的虚拟化技术&#xff0c;确保流畅稳定的运行。新增特色功能如共享打印、TouchID集成等&#xff0c;提供便捷高效的虚拟机体验。界面美观现代&…

运行SpringBoot项目失败?异常显示Can‘t load IA 32-bit .dll on a AMD 64-bit platform,让我来看看~

原因是&#xff0c;我放入jdk的bin文件夹下的tcnative-1.dll文件是32位的&#xff0c;那么肯定是无法在AMD 64位平台上加载IA 32位.dll。但是网站上给出的都是32位呀&#xff0c;没有64位怎么办&#xff1a; 其实当我们把“tomcat-native-1.2.34-openssl-1.1.1o-win32-bin.zip”…

[公开课学习]台大李宏毅-自注意力机制 Transformer

自注意力机制 存在一些问题&#xff0c;将vector set/sequence作为input&#xff0c;例如&#xff1a; 文字处理&#xff1a;将文字用one-hot表示&#xff0c;或者向量空间的向量表示&#xff0c;然后进行翻译任务等语音处理&#xff1a;25ms音频作为一个向量&#xff0c;10m…

Day_2

1. 菜品管理 新增菜品 接口设计 1. 根据类型查询分类&#xff08;分类管理已完成&#xff09; 查看接口文档即可 2. 文件上传 创建Bucket 采用的是阿里云的OSS对象存储服务 新增AccessKey 3. 菜品的新增逻辑 代码开发 1. 文件上传接口开发 为了提高代码的解耦性&#…

Javaweb项目 博客系统(后端代码编写)

准备工作,创建项目 引入依赖 1.servlet 2.mysql 3.jackson 导入前端代码 1.博客列表页 2.博客详情页 3.登录页 4.博客编辑页 接下来要进行的操作就是两大方面 1.前端和服务器的交互 2.服务器和数据库的交互 进行数据库设计创建数据库和数据表 一把需要把建库建表的操作写错sq…

嘉楠堪智 CanMV K230 进行 Linux、RT-smart 系统开发

本文记录学习、使用 K230 SDK 进行 Linux、RT-smart 系统的开发的一些关键步骤&#xff0c;如何获取系统源代码&#xff0c;如何配置环境&#xff0c;如何使用 Docker 进行编译&#xff0c;获得系统文件。 具体详细的教程&#xff0c;可以学习 CanMV K230 教程。 目录 一、S…