Kafka传输数据到Spark Streaming通过编写程序java、scala程序实现操作

一、案例说明

现有一电商网站数据文件,名为buyer_favorite1,记录了用户对商品的收藏数据,数据以“\t”键分割,数据内容及数据格式如下:
在这里插入图片描述

二、前置准备工作

项目环境说明
Linux Ubuntu 16.04jdk-7u75-linux-x64scala-2.10.4kafka_2.10-0.8.2.2spark-1.6.0-bin-hadoop2.6

开启hadoop集群,zookeeper服务,开启kafka服务。再另开启一个窗口,在/apps/kafka/bin目录下创建一个topic。

/apps/zookeeper/bin/zkServer.sh start 
cd /apps/kafka  
bin/kafka-server-start.sh config/server.properties &  
cd /apps/kafka  
bin/kafka-topics.sh \  
--create \  
--zookeeper localhost:2181 \  
--replication-factor 1 \  
--topic kafkasendspark \  
--partitions 1

三、编写程序代码创建kafka的producer

1、新创一个文件folder命名为lib,并将jar包添加进来。(可以从我的博客主页资源里面下载)
2、进入以下界面,移除Scala Library。

在这里插入图片描述

3、操作完成后,再点击Add Library选项

在这里插入图片描述

4、进入以下界面

在这里插入图片描述

5、点击完成即可
6、最后创建如下项目结构的文件

在这里插入图片描述

四、编写代码,运行程序

编写生产者代码

package my.kafka;  
import java.io.BufferedReader;  
import java.io.File;  
import java.io.FileNotFoundException;  
import java.io.FileReader;  
import java.io.IOException;  
import java.util.Properties;  
import kafka.javaapi.producer.Producer;  
import kafka.producer.KeyedMessage;  
import kafka.producer.ProducerConfig;  
public class KafkaSend {  private final Producer<String, String> producer;  public final static String TOPIC = "kafkasendspark";  public KafkaSend(){  Properties props = new Properties();  // 此处配置的是kafka的端口  props.put("metadata.broker.list", "localhost:9092");  // 配置value的序列化类  props.put("serializer.class", "kafka.serializer.StringEncoder");  // 配置key的序列化类  props.put("key.serializer.class", "kafka.serializer.StringEncoder");  props.put("request.required.acks", "-1");  producer = new Producer<String, String>(new ProducerConfig(props));  }  void produce() {  int lineNo = 1;  File file = new File("/data/case6/buyer_favorite1");  BufferedReader reader = null;  try {  reader = new BufferedReader(new FileReader(file));  String tempString = null;  while ( (tempString = reader.readLine()) != null ) {  String key = String.valueOf(lineNo);  String data = tempString;  producer.send(new KeyedMessage<String, String>(TOPIC, key, data));  System.out.println(data);  lineNo++;  Thread.sleep(100);  }  } catch (FileNotFoundException e) {  System.err.println(e.getMessage());  } catch (IOException e) {  System.err.println(e.getMessage());  } catch (InterruptedException e) {  System.err.println(e.getMessage());  }  }  public static void main(String[] args) {  System.out.println("start");  new KafkaSend().produce();  System.out.println("finish");  }  
}  

编写消费者代码

package my.scala  
import org.apache.spark.SparkConf  
import org.apache.spark.streaming.StreamingContext  
import org.apache.spark.streaming.Seconds  
import scala.collection.immutable.Map  
import org.apache.spark.streaming.kafka.KafkaUtils  
import kafka.serializer.StringDecoder  
import kafka.serializer.StringDecoder  
object SparkReceive {  def main(args: Array[String]) {  val sparkConf = new SparkConf().setAppName("countuser").setMaster("local")  val ssc = new StreamingContext(sparkConf, Seconds(2))  ssc.checkpoint("checkpoint")  val topics = Set("kafkasendspark")  val brokers = "localhost:9092"  val zkQuorum = "localhost:2181"  val kafkaParams = Map[String, String](  "metadata.broker.list" -> brokers,  "serializer.class" -> "kafka.serializer.StringEncoder"  )  val lines = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc, kafkaParams, topics)  val addFunc = (currValues: Seq[Int], prevValueState: Option[Int]) => {  //通过Spark内部的reduceByKey按key规约,然后这里传入某key当前批次的Seq/List,再计算当前批次的总和  val currentCount = currValues.sum  // 已累加的值  val previousCount = prevValueState.getOrElse(0)  // 返回累加后的结果,是一个Option[Int]类型  Some(currentCount + previousCount)  }  val result=lines.map(line => (line._2.split("\t")) ).map( row => (row(0),1) ).updateStateByKey[Int](addFunc).print()  ssc.start();  ssc.awaitTermination()  }  
}  

五、运行程序

在Eclipse的SparkReceive类中右键并点击==>Run As==>Scala Application选项。

然后在KafkaSend类中:右键点击==>Run As==>Jave Application选项。

即可在控制窗口Console中查看输出结果为:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/18570.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hadoop知识点

1.HDFS写数据流程 客户端通过Distributed FileSystem模块向NameNode请求上传文件&#xff0c;NameNode检查目标文件是否已存在&#xff0c;父目录是否存在。NameNode返回是否可以上传。客户端请求第一个 Block上传到哪几个DataNode服务器上。NameNode返回3个DataNode节点&#…

车载以太网 - SomeIP - 协议用例 - RPC

目录 RPC Protocol specification 1、Cleint和Server端应该为一个服务实例的所有的Methodsevents使用一个TCP连接

Android Java代码与JNI交互字符串转换(四)

🔥 Android Studio 版本 🔥 🔥 创建JNIString.java 🔥 package com.cmake.ndk1.jni;public class JNIString {static{System.loadLibrary("string-lib");}public native String callNativeString(String str);public native void stringMethod(String str)…

pd虚拟机 Parallels Desktop ,让你能够在Mac电脑上同时运行Windows和macOS,简直太棒了!

最近我发现了一款软件——Parallels Desktop&#xff0c;它让我能够在Mac电脑上同时运行Windows和macOS&#xff0c;简直太棒了&#xff01;我想向大家推荐这款软件&#xff0c;因为它为我的工作和生活带来了巨大的便利和乐趣。 永久访问 https://www.hereitis.cn/soft/Paralle…

6.2.3 网络基本服务---文件传输协议(FTP)

6.2.3 网络基本服务—文件传输协议&#xff08;FTP&#xff09; FTP&#xff08;File Transfer Protocol&#xff09;是Internet上使用最为广泛的文件传送协议&#xff0c;FTP提供交互式的访问&#xff0c;允许客户上传文件到服务器或者从服务器下载文件&#xff0c;FTP屏蔽了…

《黑马头条》 ElectricSearch 分词器 联想词 MangoDB day08-平台管理[实战]作业

07 app端文章搜索 1) 今日内容介绍 1.1)App端搜索-效果图 1.2)今日内容 2) 搭建ElasticSearch环境 2.1) 拉取镜像 docker pull elasticsearch:7.4.0 2.2) 创建容器 docker run -id --name elasticsearch -d --restartalways -p 9200:9200 -p 9300:9300 -v /usr/share/elasticse…

关于Vue 、vue2、vue3

vue优点&#xff1f;vue2、vue3响应式比较&#xff1f; &#xff08;1&#xff09; 响应式编程 Vue 会自动对页面中某些数据的变化做出响应。通过 MVVM 思想实现数据的双向绑定&#xff0c;让开发者不用再操作 DOM 对象&#xff0c;有更多的时间去思考业务逻辑。 组件化开发…

springboot服务端接口公网远程调试,并实现HTTP服务监听

文章目录 前言1. 本地环境搭建1.1 环境参数1.2 搭建springboot服务项目 2. 内网穿透2.1 安装配置cpolar内网穿透2.1.1 windows系统2.1.2 linux系统 2.2 创建隧道映射本地端口2.3 测试公网地址 3. 固定公网地址3.1 保留一个二级子域名3.2 配置二级子域名3.2 测试使用固定公网地址…

探索全球市场:初创品牌海外营销策略解析

​随着全球化进程的不断推进&#xff0c;越来越多的初创品牌意识到海外市场的巨大潜力&#xff0c;并希望能够将自己的品牌推广到更广阔的国际舞台上。然而&#xff0c;对于初创品牌来说&#xff0c;进军海外市场并开展品牌营销是一项具有挑战性的任务。本文Nox聚星将介绍一些初…

Vue前后端分离项目 【实战篇】

一、shiro中session的共享问题&#x1f349; 1.演示问题&#x1f95d; (1)启动shiro-springboot的集群项目&#x1f353; (2)修改nginx的配置&#x1f353; (3)测试&#x1f353; 使用swagger测试需要在过滤器中放行 //测试路径 http://localhost:8080/doc.html登录成功后访…

ModaHub魔搭社区:向量数据库Zilliz Cloud集群、Collection 及 Entity教程

目录 集群 Collection 字段 Schema 索引 Entity Zilliz Cloud 集群由全托管 Milvus 实例及相关计算资源构成。您可以在 Zilliz Cloud 集群中创建 Collection,然后在 Collection 中插入 Entity。Zilliz Cloud 集群中的 Collection 类似于关系型数据库中的表。Collection …

请求响应-json参数的接收

JSON参数 JSON参数&#xff1a;JSON数据键名与形参对象属性名&#xff08;即实体类中的成员变量&#xff09;相同&#xff0c;定义POJO实体类即可接收参数&#xff0c;需要使用RequestBody标识前端发送JSON格式的数据&#xff0c;要使用POST方法发送请求&#xff0c;因为JSON格…