Flink 输出至 Elasticsearch

【1】引入pom.xml依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch6_2.12</artifactId><version>1.10.0</version>
</dependency>

【2】ES6 Scala代码,自动导入的scala包需要修改为scala._ 否则会出现错误。

package com.zzx.flinkimport java.utilimport org.apache.flink.api.common.functions.RuntimeContext
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction, RequestIndexer}
import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink
import org.apache.http.HttpHost
import org.elasticsearch.client.Requestsobject EsSinkTest {def main(args: Array[String]): Unit = {// 创建一个流处理执行环境val env = StreamExecutionEnvironment.getExecutionEnvironment//从文件中读取数据并转换为 类val inputStreamFromFile: DataStream[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")//转换val dataStream: DataStream[SensorReading] = inputStreamFromFile.map( data => {var dataArray = data.split(",")SensorReading(dataArray(0),dataArray(1).toLong,dataArray(2).toDouble)})//定义一个 HttpHostsval httpHost = new util.ArrayList[HttpHost]()//默认 9200 我的修改为了 9201httpHost.add(new HttpHost("192.168.1.12",9200,"http"))httpHost.add(new HttpHost("127.0.0.1",9200,"http"))//定义一个 ElasticSearchFuntion 操作 es的functionval esSinkFunc = new ElasticsearchSinkFunction[SensorReading] {//element 每一条数据 通过 index 发送override def process(element: SensorReading, runtimeContext: RuntimeContext, index: RequestIndexer): Unit = {//包装写入 es 的数据val dataSource = new util.HashMap[String,String]()dataSource.put("sensor_id",element.id)dataSource.put("temp",element.temperature.toString)dataSource.put("ts",element.timestamp.toString)//indexval indexRequest = Requests.indexRequest().index("sensor_temp").`type`("readingdata").source(dataSource)index.add(indexRequest)println("saved successfully " + element.toString)}}//输出值 esdataStream.addSink(new ElasticsearchSink.Builder[SensorReading](httpHost,esSinkFunc).build())env.execute("es")}
}

【3】ES6输出展示

​ [点击并拖拽以移动] ​​

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/307678.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

01的token的年度总结

​ 大家好&#xff0c;我是token&#xff0c;一个热爱.NET的普通人&#xff0c;同样我来自湖南衡阳&#xff0c;再次之前我已经遇到非常多的湖南衡阳的老乡&#xff0c;比如李哥。 ​ 在这里一年中&#xff0c;我的成长也是非常迅速的&#xff0c;每一年的的每一天&#xff0c…

机器学习分类模型

机器学习常见分类模型及特点 机器学习常见分类模型优缺点 决策树模型 决策树&#xff08;Decision Tree&#xff09;是一类常见的机器学习方法&#xff0c;可应用于分类与回归任务&#xff0c;这里主要讨论分类决策树。决策树是基于树结构来进行决策的。下图是使用决策树来决定…

Observer观察者模式(组件协作)

观察者模式&#xff08;组件协作&#xff09; 链接&#xff1a;观察者模式实例代码 解析 目的 在软件构建过程中&#xff0c;我们需要为某些对象建立一种“通知依赖关系” ——一个对象&#xff08;目标对象&#xff09;的状态发生改变&#xff0c;所有的依赖对象&#xff0…

Decorator装饰模式(单一责任)

Decorator&#xff08;装饰模式&#xff1a;单一责任模式&#xff09; 链接&#xff1a;装饰模式实例代码 解析 目的 在某些情况下我们可能会“过度地使用继承来扩展对象的功能”&#xff0c;由于继承为类型引入的静态特质&#xff0c;使得这种扩展方式缺乏灵活性&#xff…

《Spring Cloud学习笔记:分布式事务Seata》

1.分布式事务理论基础 1.1.本地事务 本地事务&#xff0c;也就是传统的单机事务&#xff0c;在传统的数据库事务中&#xff0c;必须要满足ACID四个原则&#xff1a; 1.2.分布式事务 分布式事务&#xff0c;就是指不是在单个服务或单个数据库架构下产生的事务。 分布式事务是…

Element Plus中表格树型结构,像el-tree的属性check-strictly一样,实现遵循父子不互相关联

实现效果&#xff1a; 勾选父节点时&#xff0c;不影响字节点的选中状态。先上效果如图&#xff1a; 勾选一个字节的&#xff0c;父节点不是半选状态&#xff0c;勾选了父节点&#xff0c;子节点没有被选中。 实现思路 借助el-table的select回调事件&#xff0c;而不是借助…

堪比Postman!这款IDEA插件真好用!

Postman是大家最常用的API调试工具&#xff0c;那么有没有一种方法可以不用手动写入接口到Postman&#xff0c;即可进行接口调试操作&#xff1f;今天给大家推荐一款IDEA插件&#xff1a;Apipost Helper&#xff0c;写完代码就可以调试接口并一键生成接口文档&#xff01;而且还…

AI智能分析网关V4算法在幼儿园视频监管系统的应用与设计

一、背景需求 在科技浪潮的推动下&#xff0c;智慧化监管已然成为幼儿园管理发展的必然趋势。通过引入尖端技术手段&#xff0c;智慧幼儿园监管解决方案不仅显著提升了管理效率&#xff0c;更为孩子们的安全与健康筑起一道坚实的屏障。为了全方位守护幼儿的平安&#xff0c;幼…

【Unity美术】Unity工程师对3D模型需要达到的了解【一】

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;Uni…

在 Golang 应用程序中管理多个数据库

掌握在 Golang 项目中处理多个数据库的艺术 在当前软件开发领域中&#xff0c;处理单个应用程序内的多个数据库的需求越来越普遍。具有强大功能的 Golang 是处理此类任务的绝佳解决方案&#xff0c;无论您是与多个数据源合作还是仅为增强组织和可扩展性而分隔数据。在本文中&a…

算法学习系列(十六):二维数组填充数字问题

目录 引言一、思路二、代码模板三、例题总结1.回字蛇形矩阵2.三角填充3.回文填充二 引言 关于这个二维数组填数问题我碰到过很多次&#xff0c;不管是找工作笔试面试&#xff0c;还是在算法竞赛&#xff0c;而且这种问题都有很多种变形&#xff0c;当初学算法的时候让我很是头…

深度解析 | 什么是超融合数据中心网络?

数据中心网络连接数据中心内部通用计算、存储和高性能计算资源&#xff0c;服务器间的所有数据交互都要经由网络转发。当前&#xff0c;IT架构、计算和存储技术都在发生重大变革&#xff0c;驱动数据中心网络从原来的多张网络独立部署向全以太化演进。而传统的以太网无法满足存…