Elasticsearch 集成---Spark Streaming 框架集成

一.Spark Streaming 框架介绍

Spark Streaming Spark core API 的扩展,支持实时数据流的处理,并且具有可扩展,
高吞吐量,容错的特点。
数据可以从许多来源获取,如 Kafka Flume Kinesis TCP sockets
并且可以使用复杂的算法进行处理,这些算法使用诸如 map reduce join window 等高
级函数表示。 最后,处理后的数据可以推送到文件系统,数据库等。 实际上,您可以将
Spark 的机器学习和图形处理算法应用于数据流。

二.框架集成

1. 创建 Maven 项目

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.atguigu.es</groupId><artifactId>es-sparkstreaming</artifactId><version>1.0</version><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target></properties><dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.12</artifactId><version>3.0.0</version></dependency><dependency><groupId>org.elasticsearch</groupId><artifactId>elasticsearch</artifactId><version>7.8.0</version></dependency><!-- elasticsearch的客户端 --><dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId><version>7.8.0</version></dependency><!-- elasticsearch依赖2.x的log4j --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!--        <dependency>--><!--            <groupId>com.fasterxml.jackson.core</groupId>--><!--            <artifactId>jackson-databind</artifactId>--><!--            <version>2.11.1</version>--><!--        </dependency>--><!--        &lt;!&ndash; junit单元测试 &ndash;&gt;--><!--        <dependency>--><!--            <groupId>junit</groupId>--><!--            <artifactId>junit</artifactId>--><!--            <version>4.12</version>--><!--        </dependency>--></dependencies>
</project>

2.功能实现

package com.atguigu.esimport org.apache.http.HttpHost
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.elasticsearch.action.index.{IndexRequest, IndexResponse}
import org.elasticsearch.client.{RequestOptions, RestClient, RestHighLevelClient}
import org.elasticsearch.common.xcontent.XContentTypeobject SparkStreamingESTest {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[*]").setAppName("ESTest")val ssc = new StreamingContext(sparkConf, Seconds(3))val ds: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)ds.foreachRDD(rdd => {rdd.foreach(data => {val client = new RestHighLevelClient(RestClient.builder(new HttpHost("localhost",9200, "http")))val ss = data.split(" ")val request = new IndexRequest()request.index("product").id(ss(0))val json =s"""| {  "data" : "${ss(1)}" }|""".stripMarginrequest.source(json, XContentType.JSON)val response: IndexResponse = client.index(request, RequestOptions.DEFAULT)println(response.getResult)client.close()})})ssc.start()ssc.awaitTermination()}
}

3.界面截图

三.安装NetCat

1.下载网址:netcat 1.11 for Win32/Win64

2.解压压缩包

右键zip文件-->解压到当前文件夹

3.配置环境变量

右键此电脑-->属性-->高级系统设置-->环境变量

四.测试

Window + R  重新启动cmd命令窗口

4.1测试:输入 nc -l -p 9999

4.2 启动测试

4.3 cmd输入 1001 jianzi

 4.4 postman 查看

get    http://127.0.0.1:9200/product/_doc/1001

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/87740.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文解读 | ScanNet:室内场景的丰富注释3D重建

原创 | 文 BFT机器人 大型的、有标记的数据集的可用性是为了利用做有监督的深度学习方法的一个关键要求。但是在RGB-D场景理解的背景下&#xff0c;可用的数据非常少,通常是当前的数据集覆盖了一小范围的场景视图&#xff0c;并且具有有限的语义注释。 为了解决这个问题&#…

JVM知识点(二)

1、G1垃圾收集器 -XX:MaxGCPauseMillis10&#xff0c;G1的参数&#xff0c;表示在任意1s时间内&#xff0c;停顿时间不能超过10ms&#xff1b;G1将堆切分成很多小堆区&#xff08;Region&#xff09;&#xff0c;每一个Region可以是Eden、Survivor或Old区&#xff1b;这些区在…

树状表格父节点选择 - 在Vue.js中实现仅选择父节点的树状表格功能

功能介绍 本文介绍了如何在Vue.js框架下实现一个树状表格&#xff0c;其中只支持选择父节点的复选框。通过这个功能&#xff0c;用户可以方便地选择表格中的父节点&#xff0c;而无需关心子节点的选择。代码示例和详细的实现步骤将展示如何使用Vue.js的相关特性和组件实现这个功…

线性代数(五) 线性空间

前言 《线性代数(三) 线性方程组&向量空间》我通过解线性方程组的方式去理解线性空间。此章从另一个角度去理解 空间是什么 大家较熟悉的&#xff1a;平面直角坐标系是最常见的二维空间 空间由无穷多个坐标点组成 每个坐标点就是一个向量 反过来&#xff0c;也可说&…

传统品牌如何通过3D虚拟数字人定制和动捕设备加速年轻化发展?

步入Z时代&#xff0c;年轻一代消费者的生活方式深受互联网技术和媒介环境影响&#xff0c;对新潮事物感兴趣&#xff0c;消费思维也相对前卫&#xff0c;品牌需要探索契合Z世代的消费观念&#xff0c;寻找新的链接拉近品牌与消费者的距离&#xff0c;而3D虚拟数字人定制可以帮…

c#在MVC Api(.net framework)当中使用Swagger,以及Demo下载

主要的步骤就是创建项目&#xff0c;通过nuget 添加Swashbuckle包&#xff0c;然后在SwaggerConfig当中进行相关的配置。 具体的步骤&#xff0c;可以参考下面的链接&#xff1a; https://www.cnblogs.com/94pm/p/8046580.htmlhttps://blog.csdn.net/xiaouncle/article/detail…

macOS 安装 Homebrew 详细过程

文章目录 macOS 安装 Homebrew 详细过程Homebrew 简介Homebrew 安装过程设置环境变量安装 Homebrew安装完成后续设置(重要)设置环境变量homebrew 镜像源设置macOS 安装 Homebrew 详细过程 本文讲解了如何使用中科大源安装 Homebrew 的安装过程,文章里面的所有步骤都是必要的,需…

ARM寄存器组

CM3 拥有通用寄存器 R0‐R15 以及一些特殊功能寄存器。 R0-R7&#xff0c;通用目的寄存器 R0-R7也被称为低组寄存器&#xff0c;所有指令可以访问它们&#xff0c;它们的字长为32位&#xff0c;复位后的初始值是不可预料的。 R8-R12&#xff0c;通用目的寄存器 R8-R12也被称…

【详解】文本检测OCR模型的评价指标

关于文本检测OCR模型的评价指标 前言&#xff1a;网上关于评价标准乱七八糟的&#xff0c;有关于单词的&#xff0c;有关于段落的&#xff0c;似乎没见过谁解释一下常见论文中常用的评价指标具体是怎么计算的&#xff0c;比如DBNet&#xff0c;比如RCNN&#xff0c;这似乎好像…

C语言的发展及特点

1. C语言的发展历程 C语言作为计算机编程领域的重要里程碑&#xff0c;其发展历程承载着无数开发者的智慧和创新。C语言诞生于20世纪70年代初&#xff0c;由计算机科学家Dennis Ritchie在贝尔实验室首次推出。当时&#xff0c;Ritchie的目标是为Unix操作系统开发一门能够更方便…

LiveGBS伴侣

【1】LiveGBS 简介 LiveGBS是一套支持国标(GB28181)流媒体服务软件。 国标无插件;提供用户管理及Web可视化页面管理&#xff1b; 提供设备状态管理&#xff0c;可实时查看设备是否掉线等信息&#xff1b; 实时流媒体处理&#xff0c;PS&#xff08;TS&#xff09;转ES&…

2023.8.28日论文阅读

文章目录 NestFuse: An Infrared and Visible Image Fusion Architecture based on Nest Connection and Spatial/Channel Attention Models(2020的论文)本文方法 LRRNet: A Novel Representation Learning Guided Fusion Network for Infrared and Visible Images本文方法学习…