【天衍系列 04】深入理解Flink的ElasticsearchSink组件:实时数据流如何无缝地流向Elasticsearch

文章目录

  • 01 Elasticsearch Sink 基础概念
  • 02 Elasticsearch Sink 工作原理
  • 03 Elasticsearch Sink 核心组件
  • 04 Elasticsearch Sink 配置参数
  • 05 Elasticsearch Sink 依赖管理
  • 06 Elasticsearch Sink 初阶实战
  • 07 Elasticsearch Sink 进阶实战
    • 7.1 包结构 & 项目配置
      • 项目配置application.properties
      • 日志配置log4j2.properties
      • 项目pom.xml文件
    • 7.2 实体类ElasticsearchEntity
    • 7.3 客户端工厂类CustomRestClientFactory
    • 7.4 回调函数类CustomRequestConfigCallback
    • 7.5 客户端配置类CustomHttpClientConfigCallback
    • 7.6 Es操作类CustomElasticsearchSinkFunction
    • 7.7 异常处理类CustomActionRequestFailureHandler
    • 7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

01 Elasticsearch Sink 基础概念

Flink的Elasticsearch Sink是用于将Flink数据流(DataStream)中的数据发送到Elasticsearch的组件。它是Flink的一个连接器(Connector),用于实现将实时处理的结果或数据持续地写入Elasticsearch集群中的索引中。

下面是一些关于Flink的Elasticsearch Sink的基础概念:

  1. 数据源(Source):Flink数据流的源头,可以是各种数据源,例如Kafka、文件系统、Socket等。Elasticsearch Sink通常是连接到Flink数据流的末端,用于将最终处理结果或数据写入Elasticsearch。
  2. Elasticsearch集群:一个或多个Elasticsearch节点的集合,用于存储和处理数据。Elasticsearch提供了分布式的数据存储和搜索功能。
  3. 索引(Index):在Elasticsearch中,索引是存储相关数据的地方,类似于关系数据库中的表。每个索引可以包含多个文档(Document),每个文档包含一个或多个字段(Field)。
  4. 文档(Document):在Elasticsearch中,文档是最小的数据单元。它们以JSON格式表示,并存储在索引中。
  5. Elasticsearch Sink:是Flink的一个数据接收器,用于将数据流中的数据发送到Elasticsearch集群中的特定索引。Sink负责将Flink数据流中的事件转换为Elasticsearch要求的格式,并将其发送到指定的索引。
  6. 序列化与映射:在将数据写入Elasticsearch之前,通常需要对数据进行序列化和映射。序列化是将数据从Flink的内部表示转换为Elasticsearch要求的JSON格式。映射则是定义如何将Flink数据流中的字段映射到Elasticsearch文档中的字段。
  7. 并行度控制:Elasticsearch Sink支持并行度控制,可以根据需要调整并发写入Elasticsearch的任务数量。这有助于优化性能并避免对Elasticsearch集群造成过大的负载。

总的来说,Flink的Elasticsearch Sink是一个关键的组件,用于将实时处理的结果或数据可靠地写入Elasticsearch中,从而支持各种实时数据分析和搜索应用。

02 Elasticsearch Sink 工作原理

Elasticsearch Sink 是 Apache Flink 提供的一个连接器,用于将 Flink 数据流中的数据发送到 Elasticsearch 集群中。以下是 Elasticsearch Sink 的工作原理:

  1. 数据流入 Flink 程序: 数据首先从外部数据源(如 Kafka、RabbitMQ、文件系统等)进入到 Flink 程序中。Flink 以流式处理的方式处理数据,这意味着数据会一条一条地进入 Flink 的数据流中。
  2. 数据转换与处理: 一旦数据进入 Flink,您可以对数据进行各种转换和处理。这可能包括数据清洗、转换、聚合、窗口操作等。在您的 Flink 程序中,您可以通过各种 Flink 的算子来实现这些转换和处理。
  3. Elasticsearch Sink 的配置: 当需要将数据写入 Elasticsearch 时,您需要配置 Elasticsearch Sink。这通常包括指定 Elasticsearch 集群的地址、端口、索引名称等信息。您还可以配置其他参数,例如批量写入的大小、超时时间等。
  4. 数据发送到 Elasticsearch: 一旦配置完成,Elasticsearch Sink 会将 Flink 数据流中的数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。通常,Elasticsearch Sink 会将数据批量发送到 Elasticsearch,以提高写入的效率和性能。
  5. 序列化与映射: 在发送数据之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式,并根据 Elasticsearch 索引的映射规则进行字段映射。这确保了发送到 Elasticsearch 的数据与索引的结构一致。
  6. 容错与错误处理: Flink 提供了容错机制来确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Flink 会自动进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。
  7. 性能优化: 为了提高性能,Elasticsearch Sink 可以通过调整批量写入的大小、并发度等参数来优化性能。这可以减少与 Elasticsearch 的通信开销,并提高写入的效率。

总的来说,Elasticsearch Sink 通过将 Flink 数据流中的数据转换为 JSON 格式,并利用 Elasticsearch 的 REST API 将数据发送到指定的索引中,实现了将实时流数据写入 Elasticsearch 的功能。

03 Elasticsearch Sink 核心组件

Elasticsearch Sink 在 Apache Flink 中是一个核心组件,它负责将 Flink 数据流中的数据发送到 Elasticsearch。下面是 Elasticsearch Sink 的核心组件:

  1. SinkFunction: SinkFunction 是 Flink 中的一个接口,用于定义将数据发送到外部系统的逻辑。在 Elasticsearch Sink 中,您需要实现 SinkFunction 接口,以将 Flink 数据流中的数据发送到 Elasticsearch。通常,您需要在 SinkFunction 中实现将数据转换为 JSON 格式,并通过 Elasticsearch 的 REST API 将数据发送到指定的索引中。
  2. BulkProcessor: BulkProcessor 是 Elasticsearch Java 客户端提供的一个功能,用于批量写入数据到 Elasticsearch。在 Elasticsearch Sink 中,BulkProcessor 负责将 Flink 数据流中的数据批量发送到 Elasticsearch。您可以通过 BulkProcessor 来配置批量写入的大小、并发度等参数,以优化写入性能。
  3. TransportClient 或 RestHighLevelClient: 在 Elasticsearch Sink 中,您可以使用 Elasticsearch Java 客户端的 TransportClient 或 RestHighLevelClient 来与 Elasticsearch 集群进行通信。这些客户端提供了与 Elasticsearch 集群交互的接口,使您可以发送数据到 Elasticsearch、执行查询、索引管理等操作。
  4. 序列化器(Serializer): 在将数据发送到 Elasticsearch 之前,通常需要将 Flink 数据流中的数据序列化为 JSON 格式。序列化器负责将 Flink 数据流中的数据转换为 Elasticsearch 所需的 JSON 格式。您可以根据具体的数据类型和业务需求来实现自定义的序列化器。
  5. Elasticsearch 连接配置: 在 Elasticsearch Sink 中,您需要配置与 Elasticsearch 集群的连接信息,包括 Elasticsearch 集群的地址、端口、索引名称等。这些配置信息通常在初始化 Elasticsearch Sink 时进行设置,并在发送数据时使用。
  6. 容错与错误处理机制: Elasticsearch Sink 需要具备容错和错误处理机制,以确保数据的可靠性和一致性。如果在数据发送过程中发生错误,例如网络故障或 Elasticsearch 集群不可用,Sink 需要能够进行故障恢复,并重新发送丢失的数据,以确保数据不会丢失。

这些组件共同作用,构成了 Elasticsearch Sink 在 Flink 中的核心功能,使得 Flink 用户可以轻松地将实时流数据发送到 Elasticsearch,并实现各种实时数据分析和搜索应用。

04 Elasticsearch Sink 配置参数

nodes :Elasticsearch 集群的节点地址列表

port :Elasticsearch 集群的端口

Elasticsearch 集群的节点地址列表

scheme : Elasticsearch 集群的通信协议,http或https

type :Elasticsearch 集群的文档类型,es7以后是_doc

index :Elasticsearch 集群的索引名称

bulkFlushMaxActions :内部批量处理器,刷新前最大缓存的操作数

bulkFlushMaxSizeMb :刷新前最大缓存的数据量(以兆字节为单位)

bulkFlushInterval :刷新的时间间隔(不论缓存操作的数量或大小如何)

bulkFlushBackoff :是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。

bulkFlushBackoffDelay :设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试

bulkFlushBackoffRetries :设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试

connectTimeout :设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常

socketTimeout :设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。

connectionRequestTimeout :设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。

redirectsEnabled :设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。

maxRedirects :客户端允许的最大重定向次数

authenticationEnabled :启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。

circularRedirectsAllowed :设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。

contentCompressionEnabled :设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。

expectContinueEnabled :设置是否启用 “Expect: continue” 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。

normalizeUri :设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等

05 Elasticsearch Sink 依赖管理

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_1.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_1.12</artifactId><version>1.14.4</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_1.12</artifactId><version>1.14.4</version>
</dependency>

06 Elasticsearch Sink 初阶实战

package com.aurora.demo;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;/*** 描述:Flink集成Elasticsearch Connector连接器快速入门运行demo* 实现实时数据流如何无缝地流向Elasticsearch** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 22:25:58*/
public class ElasticsearchSinkStreamJobQuickDemo {private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobQuickDemo.class);public static void main(String[] args) throws Exception {// 创建elasticsearch集群的httpHost连接HttpHost httpHost = new HttpHost("localhost", 9200, "http");List<HttpHost> httpHosts = new ArrayList<>();httpHosts.add(httpHost);// 创建elasticsearchSinkFunction函数对象,专门用于处理数据写入elasticsearchSink算子队列,会自动创建索引ElasticsearchSinkFunction<JSONObject> elasticsearchSinkFunction = new ElasticsearchSinkFunction<JSONObject>() {@Overridepublic void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {String transId = element.getString("transId");String tradeTime = element.getString("tradeTime");String index = "flink_" + tradeTime;logger.info("交易流水={},数据写入索引{}成功", transId, index);IndexRequest indexRequest = Requests.indexRequest().index(index).type("_doc").id(transId).source(element, XContentType.JSON);indexer.add(indexRequest);}};// 构建elasticsearchSink算子BuilderElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, elasticsearchSinkFunction);// 每个请求最多发送的文档数量esSinkBuilder.setBulkFlushMaxActions(1);// 每次发送请求的时间间隔esSinkBuilder.setBulkFlushInterval(1000);//构建elasticsearchSink算子ElasticsearchSink<JSONObject> sink = esSinkBuilder.build();// 自定义数据源,模拟生产环境交易接入,每秒下发一个json格式数据SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {//交易流水号String tradeId = UUID.randomUUID().toString();//交易发生时间戳long timeStamp = System.currentTimeMillis();//交易发生金额long tradeAmount = new Random().nextInt(1000);//交易名称String tradeName = "支付宝转账";JSONObject dataObj = new JSONObject();dataObj.put("transId", tradeId);dataObj.put("timeStamp", timeStamp);dataObj.put("tradeTime", dateUtil(timeStamp));dataObj.put("tradeAmount", tradeAmount);dataObj.put("tradeName", tradeName);//模拟生产,每隔1秒生成一笔交易Thread.sleep(1000);logger.info("源交易流水={},原始报文={}", tradeId, dataObj.toJSONString());sourceContext.collect(dataObj);}}@Overridepublic void cancel() {}};// 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);// 数据源写入数据算子,进行输出到elasticsearchdataStreamSource.addSink(sink);// 执行任务env.execute();}/*** 描述:时间格式化工具类** @param timestamp 时间戳* @return {@code String }*/private static String dateUtil(long timestamp) {//时间戳加工timestamp = timestamp / 1000;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");// 格式化日期时间对象为指定格式的字符串String dateTimeFormat = formatter.format(dateTime);return dateTimeFormat;}
}

启动上述作业后,根据对应的交易流水号查询es,或者查询es的索引数据,但是索引数据一般是一段时间才更新

验证1:检查索引数据变化
http://127.0.0.1:9200/_cat/indices?v

在这里插入图片描述

验证2:根据id查询es的文档记录

在这里插入图片描述
在这里插入图片描述

07 Elasticsearch Sink 进阶实战

进阶实战主要是包括ElasticsearchSink的各种参数配置,以及性能调优

7.1 包结构 & 项目配置

在这里插入图片描述

项目配置application.properties

es.cluster.hosts=localhost
es.cluster.port=9200
es.cluster.scheme=http
es.cluster.type=_doc
es.cluster.indexPrefix=flink_#内部批量处理器,刷新前最大缓存的操作数
es.cluster.bulkFlushMaxActions=1
#刷新前最大缓存的数据量(以兆字节为单位)
es.cluster.bulkFlushMaxSizeMb=10
#刷新的时间间隔(不论缓存操作的数量或大小如何)
es.cluster.bulkFlushInterval=10000#是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。
es.cluster.bulkFlushBackoff=false
#设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试
es.cluster.bulkFlushBackoffDelay=10000
#设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试
es.cluster.bulkFlushBackoffRetries=3#设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常
es.cluster.connectTimeout=10000
#设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。
es.cluster.socketTimeout=10000
#设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。
es.cluster.connectionRequestTimeout=10000
设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。
es.cluster.redirectsEnabled=false
#客户端允许的最大重定向次数
es.cluster.maxRedirects=3#启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。
es.cluster.authenticationEnabled=false
#设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。
es.cluster.circularRedirectsAllowed=false
#设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。
es.cluster.contentCompressionEnabled=false
#设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。
es.cluster.expectContinueEnabled=false
#设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。
es.cluster.normalizeUri=false

日志配置log4j2.properties

rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp

项目pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.aurora</groupId><artifactId>aurora_elasticsearch_connector</artifactId><version>1.0-SNAPSHOT</version><!--属性设置--><properties><!--java_JDK版本--><java.version>1.8</java.version><!--maven打包插件--><maven.plugin.version>3.8.1</maven.plugin.version><!--编译编码UTF-8--><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><!--输出报告编码UTF-8--><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><!--json数据格式处理工具--><fastjson.version>1.2.75</fastjson.version><!--log4j版本--><log4j.version>2.17.1</log4j.version><!--flink版本--><flink.version>1.14.4</flink.version><!--scala版本--><scala.binary.version>2.12</scala.binary.version></properties><!--依赖管理--><dependencies><!-- fastJson工具类依赖 start --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>${fastjson.version}</version></dependency><!-- fastJson工具类依赖 end --><!-- log4j日志框架依赖 start --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version></dependency><!-- log4j日志框架依赖 end --><!-- Flink基础依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink基础依赖 end --><!-- Flink Elasticsearch 连接器依赖 start --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- Flink Elasticsearch 连接器依赖 end --></dependencies><!--编译打包--><build><finalName>${project.name}</finalName><!--资源文件打包--><resources><resource><directory>src/main/resources</directory></resource><resource><directory>src/main/java</directory><includes><include>**/*.xml</include></includes></resource></resources><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><artifactSet><excludes><exclude>org.apache.flink:force-shading</exclude><exclude>org.google.code.flindbugs:jar305</exclude><exclude>org.slf4j:*</exclude><excluder>org.apache.logging.log4j:*</excluder></excludes></artifactSet><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters><transformers><transformerimplementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>com.aurora.demo,ElasticsearchSinkStreamingJobDemo</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins><!--插件统一管理--><pluginManagement><plugins><!--maven打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><version>${spring.boot.version}</version><configuration><fork>true</fork><finalName>${project.build.finalName}</finalName></configuration><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin><!--编译打包插件--><plugin><artifactId>maven-compiler-plugin</artifactId><version>${maven.plugin.version}</version><configuration><source>${java.version}</source><target>${java.version}</target><encoding>UTF-8</encoding><compilerArgs><arg>-parameters</arg></compilerArgs></configuration></plugin></plugins></pluginManagement></build></project>

7.2 实体类ElasticsearchEntity

package com.aurora.advanced;import java.io.Serializable;/*** 描述:elasticsearch实体类** @author 浅夏的猫* @version 1.0.0* @date 2024-02-10 20:08:20*/
public class ElasticsearchEntity implements Serializable {private static final long serialVersionUID = 1L;/*** 集群地址* */private String hosts;/*** 集群端口* */private Integer port;/***执行计划* */private String scheme;/*** 文档类型,es7一般都是_doc* */private String type;/*** 索引前缀* */private String indexPrefix;/*** 内部批量处理器,刷新前最大缓存的操作数* */private Integer bulkFlushMaxActions=1;/*** 刷新前最大缓存的数据量(以兆字节为单位)* */private Integer bulkFlushMaxSizeMb=10;/*** 刷新的时间间隔(不论缓存操作的数量或大小如何)* */private Integer bulkFlushInterval=10000;/*** 是否启用批量写入的退避策略,当Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。* 此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。* */private Boolean bulkFlushBackoff=false;/*** 设置批量写入的退避延迟时间,在发生写入失败后,等待指定的延迟时间后再进行重试* */private Integer bulkFlushBackoffDelay=10000;/*** 设置批量写入的最大重试次数,设置在写入失败后的最大重试次数。超过这个次数后,将不再重试* */private Integer bulkFlushBackoffRetries=3;/*** 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常* */private Integer connectTimeout=10000;/*** 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。* */private Integer socketTimeout=10000;/*** 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。* */private Integer connectionRequestTimeout=10000;/*** 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。* */private Boolean redirectsEnabled=false;/*** 客户端允许的最大重定向次数* */private Integer maxRedirects=3;/*** 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。* */private Boolean authenticationEnabled=true;/*** 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。* */private Boolean circularRedirectsAllowed=false;/*** 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。* */private Boolean contentCompressionEnabled=false;/*** 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。* 如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。* */private Boolean expectContinueEnabled=false;/*** 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。* */private Boolean normalizeUri=false;/*** 用于设置 HTTP 请求的路径前缀。* 这个配置选项通常用于设置反向代理或者负载均衡器等中间件与 Elasticsearch 集群之间的连接* */private String pathPrefix;public String getHosts() {return hosts;}public void setHosts(String hosts) {this.hosts = hosts;}public Integer getPort() {return port;}public void setPort(Integer port) {this.port = port;}public String getScheme() {return scheme;}public void setScheme(String scheme) {this.scheme = scheme;}public String getType() {return type;}public void setType(String type) {this.type = type;}public String getIndexPrefix() {return indexPrefix;}public void setIndexPrefix(String indexPrefix) {this.indexPrefix = indexPrefix;}public Integer getBulkFlushMaxActions() {return bulkFlushMaxActions;}public void setBulkFlushMaxActions(Integer bulkFlushMaxActions) {this.bulkFlushMaxActions = bulkFlushMaxActions;}public Integer getBulkFlushMaxSizeMb() {return bulkFlushMaxSizeMb;}public void setBulkFlushMaxSizeMb(Integer bulkFlushMaxSizeMb) {this.bulkFlushMaxSizeMb = bulkFlushMaxSizeMb;}public Integer getBulkFlushInterval() {return bulkFlushInterval;}public void setBulkFlushInterval(Integer bulkFlushInterval) {this.bulkFlushInterval = bulkFlushInterval;}public Boolean getBulkFlushBackoff() {return bulkFlushBackoff;}public void setBulkFlushBackoff(Boolean bulkFlushBackoff) {this.bulkFlushBackoff = bulkFlushBackoff;}public Integer getBulkFlushBackoffDelay() {return bulkFlushBackoffDelay;}public void setBulkFlushBackoffDelay(Integer bulkFlushBackoffDelay) {this.bulkFlushBackoffDelay = bulkFlushBackoffDelay;}public Integer getBulkFlushBackoffRetries() {return bulkFlushBackoffRetries;}public void setBulkFlushBackoffRetries(Integer bulkFlushBackoffRetries) {this.bulkFlushBackoffRetries = bulkFlushBackoffRetries;}public Integer getConnectTimeout() {return connectTimeout;}public void setConnectTimeout(Integer connectTimeout) {this.connectTimeout = connectTimeout;}public Integer getSocketTimeout() {return socketTimeout;}public void setSocketTimeout(Integer socketTimeout) {this.socketTimeout = socketTimeout;}public Integer getConnectionRequestTimeout() {return connectionRequestTimeout;}public void setConnectionRequestTimeout(Integer connectionRequestTimeout) {this.connectionRequestTimeout = connectionRequestTimeout;}public Boolean getRedirectsEnabled() {return redirectsEnabled;}public void setRedirectsEnabled(Boolean redirectsEnabled) {this.redirectsEnabled = redirectsEnabled;}public Integer getMaxRedirects() {return maxRedirects;}public void setMaxRedirects(Integer maxRedirects) {this.maxRedirects = maxRedirects;}public Boolean getAuthenticationEnabled() {return authenticationEnabled;}public void setAuthenticationEnabled(Boolean authenticationEnabled) {this.authenticationEnabled = authenticationEnabled;}public Boolean getCircularRedirectsAllowed() {return circularRedirectsAllowed;}public void setCircularRedirectsAllowed(Boolean circularRedirectsAllowed) {this.circularRedirectsAllowed = circularRedirectsAllowed;}public Boolean getContentCompressionEnabled() {return contentCompressionEnabled;}public void setContentCompressionEnabled(Boolean contentCompressionEnabled) {this.contentCompressionEnabled = contentCompressionEnabled;}public Boolean getExpectContinueEnabled() {return expectContinueEnabled;}public void setExpectContinueEnabled(Boolean expectContinueEnabled) {this.expectContinueEnabled = expectContinueEnabled;}public Boolean getNormalizeUri() {return normalizeUri;}public void setNormalizeUri(Boolean normalizeUri) {this.normalizeUri = normalizeUri;}public String getPathPrefix() {return pathPrefix;}public void setPathPrefix(String pathPrefix) {this.pathPrefix = pathPrefix;}
}

7.3 客户端工厂类CustomRestClientFactory

作用:设置用于创建 Elasticsearch REST 客户端的工厂,可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口

package com.aurora.advanced;import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.http.Header;
import org.apache.http.message.BasicHeader;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:设置用于创建 Elasticsearch REST 客户端的工厂* 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 00:12:15*/
public class CustomRestClientFactory implements RestClientFactory {private ElasticsearchEntity elasticsearchEntity;public CustomRestClientFactory(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic void configureRestClientBuilder(RestClientBuilder restClientBuilder) {//设置默认的 HTTP 头部信息,这些信息将在每个请求中包含Header contentType = new BasicHeader("Content-Type", "application/json");Header authorization = new BasicHeader("Authorization", "Bearer your_access_token");Header[] headers = {contentType, authorization};restClientBuilder.setDefaultHeaders(headers);//设置用于监听节点故障的监听器。当节点发生故障时,可以执行特定的操作restClientBuilder.setFailureListener(new RestClient.FailureListener());//配置用于选择与之通信的节点的策略。这涉及到 Elasticsearch 集群中多个节点的选择。restClientBuilder.setNodeSelector(NodeSelector.ANY);//为每个请求设置路径前缀。这可以用于将请求定向到特定的子路径。if(StringUtils.isNoneBlank(elasticsearchEntity.getPathPrefix())){restClientBuilder.setPathPrefix(elasticsearchEntity.getPathPrefix());}//允许在创建每个请求的时候进行额外的请求配置。restClientBuilder.setRequestConfigCallback(new CustomRequestConfigCallback(elasticsearchEntity));//允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置。restClientBuilder.setHttpClientConfigCallback(new CustomHttpClientConfigCallback(elasticsearchEntity));//设置是否启用严格的废弃模式,用于警告有关已弃用功能的使用。restClientBuilder.setStrictDeprecationMode(false);}
}

7.4 回调函数类CustomRequestConfigCallback

作用:允许在创建每个请求的时候进行额外的请求配置

package com.aurora.advanced;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:* 允许在创建每个请求的时候进行额外的请求配置* @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 23:24:42*/
public class CustomRequestConfigCallback implements RestClientBuilder.RequestConfigCallback {private ElasticsearchEntity elasticsearchEntity;public CustomRequestConfigCallback(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder custom) {// 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());// 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());// 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());// 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());// 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());// 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());// 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());// 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。//  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());// 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());// 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范custom.setCookieSpec(new DefaultCookieSpec().toString());return custom;}
}

7.5 客户端配置类CustomHttpClientConfigCallback

作用:允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置

package com.aurora.advanced;import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.cookie.DefaultCookieSpec;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClientBuilder;/*** 描述:客户端配置* 允许在创建 CloseableHttpClient 实例时进行额外的 HTTP 客户端配置** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 23:28:15*/
public class CustomHttpClientConfigCallback implements RestClientBuilder.HttpClientConfigCallback {private ElasticsearchEntity elasticsearchEntity;CustomHttpClientConfigCallback(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {RequestConfig.Builder custom = RequestConfig.custom();// 设置与 Elasticsearch 集群建立连接的超时时间,单位为毫秒。在指定的时间内无法建立连接将会抛出连接超时异常custom.setConnectTimeout(elasticsearchEntity.getConnectTimeout());// 设置与 Elasticsearch 连接的套接字超时时间,单位为毫秒。该参数定义了在建立连接后从服务器读取数据的超时时间。如果在指定的时间内没有读取到数据,将会抛出超时异常。custom.setSocketTimeout(elasticsearchEntity.getSocketTimeout());// 设置连接请求超时时间,单位为毫秒。该参数表示从连接池获取连接的超时时间。如果在指定的时间内无法获得连接,将会抛出连接请求超时异常。custom.setConnectionRequestTimeout(elasticsearchEntity.getConnectionRequestTimeout());// 设置是否允许重定向。如果设置为true,则当遇到重定向响应时,客户端将跟随重定向并继续请求;如果设置为false,重定向响应将被视为错误。custom.setRedirectsEnabled(elasticsearchEntity.getRedirectsEnabled());// 设置最大重定向次数。当允许重定向时,该参数指定在遇到重定向响应时,最多可以重定向的次数。custom.setMaxRedirects(elasticsearchEntity.getMaxRedirects());// 启用身份验证功能。通过设置该参数为true,可以提供用户名和密码进行身份验证,以连接到 Elasticsearch 集群。custom.setAuthenticationEnabled(elasticsearchEntity.getAuthenticationEnabled());// 设置是否允许循环重定向。如果设置为true,则允许在重定向过程中发生循环重定向;如果设置为false,则在检测到循环重定向时,将会抛出异常。custom.setCircularRedirectsAllowed(elasticsearchEntity.getCircularRedirectsAllowed());// 设置是否启用内容压缩。如果设置为true,则允许客户端和 Elasticsearch 之间进行内容压缩,以减少数据传输量。custom.setContentCompressionEnabled(elasticsearchEntity.getContentCompressionEnabled());// 设置是否启用 "Expect: continue" 机制。当设置为true时,在发送请求之前,客户端会发送一个请求头部,询问服务器是否接受请求的主体部分。//  如果服务器响应允许继续发送请求主体,则客户端会继续发送请求;如果服务器响应拒绝继续发送请求主体,则客户端会放弃该请求。custom.setExpectContinueEnabled(elasticsearchEntity.getExpectContinueEnabled());// 设置是否标准化 URI。如果设置为true,则客户端会尝试标准化请求 URI,以便消除多余和重复的斜杠等。custom.setNormalizeUri(elasticsearchEntity.getNormalizeUri());// 设置使用的 Cookie 规范。可以指定客户端在处理与 Elasticsearch 服务器之间的 Cookie 交互时使用的 Cookie 规范custom.setCookieSpec(new DefaultCookieSpec().toString());return httpAsyncClientBuilder.setDefaultRequestConfig(custom.build());}
}

7.6 Es操作类CustomElasticsearchSinkFunction

作用:实时把数据写入到队列中,再通过批量提交到Elasticsearch中,实现数据写入

package com.aurora.advanced;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.xcontent.XContentType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 描述:自定义elasticsearch sink 算子函数* ElasticsearchSinkFunction 是用于将数据流写入 Elasticsearch 的接口。* 它允许您自定义如何将 Flink 流式处理的数据写入 Elasticsearch 索引** @author 浅夏的猫* @version 1.0.0* @date 2024-02-12 23:49:22*/
public class CustomElasticsearchSinkFunction implements ElasticsearchSinkFunction<JSONObject> {private static final Logger logger = LoggerFactory.getLogger(CustomElasticsearchSinkFunction.class);private ElasticsearchEntity elasticsearchEntity;public CustomElasticsearchSinkFunction(ElasticsearchEntity elasticsearchEntity) {this.elasticsearchEntity = elasticsearchEntity;}@Overridepublic void process(JSONObject element, RuntimeContext runtimeContext, RequestIndexer indexer) {String transId = element.getString("transId");String tradeTime = element.getString("tradeTime");String index = elasticsearchEntity.getIndexPrefix() + tradeTime;logger.info("交易流水={},数据写入索引{}成功", tradeTime, index);IndexRequest indexRequest = Requests.indexRequest().index(index).type(elasticsearchEntity.getType()).id(transId).source(element, XContentType.JSON);indexer.add(indexRequest);}
}

7.7 异常处理类CustomActionRequestFailureHandler

作用:当sink写Elasticsearch出现异常时,可以自定义操作策略

package com.aurora.advanced;import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** 描述:es写入异常处理** @author 浅夏的猫* @version 1.0.0* @date 2024-02-13 00:04:24*/
public class CustomActionRequestFailureHandler implements ActionRequestFailureHandler {private static final Logger logger = LoggerFactory.getLogger(CustomActionRequestFailureHandler.class);@Overridepublic void onFailure(ActionRequest action, Throwable throwable, int restStatusCode, RequestIndexer requestIndexer) throws Throwable {// 处理不同类型的异常if (throwable instanceof EsRejectedExecutionException) {// 如果是由于线程池饱和导致的拒绝执行异常,可以采取相应的处理措施logger.warn("Elasticsearch action execution was rejected due to thread pool saturation.");// 这里你可以选择执行重试或者其他处理逻辑,例如将数据写入到一个备用存储// 例如: indexer.add(createAnotherRequest(action));} else {// 对于其他类型的异常,默认返回放弃策略logger.error("Unhandled failure, abandoning request: {}", action.toString());}}
}

7.8 作业主类ElasticsearchSinkStreamJobAdvancedDemo

package com.aurora.advanced;import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;/*** 描述:Flink集成Elasticsearch Connector连接器进阶Demo* 实现实时数据流如何无缝地流向Elasticsearch** @author 浅夏的猫* @version 1.0.0* @date 2024-02-11 22:06:45*/
public class ElasticsearchSinkStreamJobAdvancedDemo {private static final Logger logger = LoggerFactory.getLogger(ElasticsearchSinkStreamJobAdvancedDemo.class);public static void main(String[] args) {try {// 读取配置参数ElasticsearchEntity elasticsearchEntity = paramsInit();// 设置elasticsearch节点List<HttpHost> httpHosts = esClusterHttpHostHandler(elasticsearchEntity);// 创建esSinkFunction函数ElasticsearchSinkFunction<JSONObject> esSinkFunction = new CustomElasticsearchSinkFunction(elasticsearchEntity);// 构建ElasticsearchSink算子builderElasticsearchSink.Builder<JSONObject> esSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts, esSinkFunction);// es参数配置esBuilderHandler(esSinkBuilder, elasticsearchEntity);// 构建sink算子ElasticsearchSink<JSONObject> esSink = esSinkBuilder.build();// 自定义数据源,模拟生产环境交易接入,json格式数据SourceFunction<JSONObject> dataSource = new SourceFunction<JSONObject>() {@Overridepublic void run(SourceContext sourceContext) throws Exception {while (true) {//交易流水号String tradeId = UUID.randomUUID().toString();//交易发生时间戳long timeStamp = System.currentTimeMillis();//交易发生金额long tradeAmount = new Random().nextInt(100);//交易名称String tradeName = "支付宝转账";JSONObject dataObj = new JSONObject();dataObj.put("transId", tradeId);dataObj.put("timeStamp", timeStamp);dataObj.put("tradeTime", dateUtil(timeStamp));dataObj.put("tradeAmount", tradeAmount);dataObj.put("tradeName", tradeName);//模拟生产,每隔1秒生成一笔交易Thread.sleep(1000);logger.info("交易接入,原始报文={}", dataObj.toJSONString());sourceContext.collect(dataObj);}}@Overridepublic void cancel() {}};// 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 构建数据源DataStreamSource<JSONObject> dataStreamSource = env.addSource(dataSource);// 构建sink算子dataStreamSource.addSink(esSink);// 运行作业env.execute();} catch (Exception e) {e.printStackTrace();}}/*** 描述:Flink参数配置读取** @return {@code ElasticsearchEntity }* @throws IOException*/private static ElasticsearchEntity paramsInit() throws IOException {// 通过flink内置工具类获取命令行参数String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink_connector_elasticsearch\\src\\main\\resources\\application.properties";ParameterTool paramsMap = ParameterTool.fromPropertiesFile(propertiesFilePath);ElasticsearchEntity elasticsearchEntity = new ElasticsearchEntity();String hosts = paramsMap.get("es.cluster.hosts");int port = paramsMap.getInt("es.cluster.port");String scheme = paramsMap.get("es.cluster.scheme");String type = paramsMap.get("es.cluster.type");String indexPrefix = paramsMap.get("es.cluster.indexPrefix");int bulkFlushMaxActions = paramsMap.getInt("es.cluster.bulkFlushMaxActions");int bulkFlushMaxSizeMb = paramsMap.getInt("es.cluster.bulkFlushMaxSizeMb");int bulkFlushInterval = paramsMap.getInt("es.cluster.bulkFlushInterval");boolean bulkFlushBackoff = paramsMap.getBoolean("es.cluster.bulkFlushBackoff");int bulkFlushBackoffDelay = paramsMap.getInt("es.cluster.bulkFlushBackoffDelay");int bulkFlushBackoffRetries = paramsMap.getInt("es.cluster.bulkFlushBackoffRetries");int connectTimeout = paramsMap.getInt("es.cluster.connectTimeout");int socketTimeout = paramsMap.getInt("es.cluster.socketTimeout");int connectionRequestTimeout = paramsMap.getInt("es.cluster.connectionRequestTimeout");boolean redirectsEnabled = paramsMap.getBoolean("es.cluster.redirectsEnabled");int maxRedirects = paramsMap.getInt("es.cluster.maxRedirects");boolean authenticationEnabled = paramsMap.getBoolean("es.cluster.authenticationEnabled");boolean circularRedirectsAllowed = paramsMap.getBoolean("es.cluster.circularRedirectsAllowed");boolean contentCompressionEnabled = paramsMap.getBoolean("es.cluster.contentCompressionEnabled");boolean expectContinueEnabled = paramsMap.getBoolean("es.cluster.expectContinueEnabled");boolean normalizeUri = paramsMap.getBoolean("es.cluster.normalizeUri");elasticsearchEntity.setHosts(hosts);elasticsearchEntity.setPort(port);elasticsearchEntity.setScheme(scheme);elasticsearchEntity.setType(type);elasticsearchEntity.setIndexPrefix(indexPrefix);elasticsearchEntity.setBulkFlushMaxActions(bulkFlushMaxActions);elasticsearchEntity.setBulkFlushMaxSizeMb(bulkFlushMaxSizeMb);elasticsearchEntity.setBulkFlushInterval(bulkFlushInterval);elasticsearchEntity.setBulkFlushBackoff(bulkFlushBackoff);elasticsearchEntity.setBulkFlushBackoffDelay(bulkFlushBackoffDelay);elasticsearchEntity.setBulkFlushBackoffRetries(bulkFlushBackoffRetries);elasticsearchEntity.setConnectTimeout(connectTimeout);elasticsearchEntity.setSocketTimeout(socketTimeout);elasticsearchEntity.setConnectionRequestTimeout(connectionRequestTimeout);elasticsearchEntity.setRedirectsEnabled(redirectsEnabled);elasticsearchEntity.setMaxRedirects(maxRedirects);elasticsearchEntity.setAuthenticationEnabled(authenticationEnabled);elasticsearchEntity.setCircularRedirectsAllowed(circularRedirectsAllowed);elasticsearchEntity.setExpectContinueEnabled(expectContinueEnabled);elasticsearchEntity.setContentCompressionEnabled(contentCompressionEnabled);elasticsearchEntity.setNormalizeUri(normalizeUri);return elasticsearchEntity;}/*** 描述:时间格式化工具类** @param timestamp 时间戳* @return {@code String }*/private static String dateUtil(long timestamp) {timestamp = timestamp / 1000;// 将时间戳转换为 LocalDateTime 对象LocalDateTime dateTime = LocalDateTime.ofInstant(Instant.ofEpochSecond(timestamp), ZoneId.systemDefault());// 定义日期时间格式DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMdd");// 格式化日期时间对象为指定格式的字符串String dateTimeFormat = formatter.format(dateTime);return dateTimeFormat;}/*** 描述:es参数配置** @param esSinkBuilder       esSinkBuilder建造器* @param elasticsearchEntity es实体类*/private static void esBuilderHandler(ElasticsearchSink.Builder<JSONObject> esSinkBuilder, ElasticsearchEntity elasticsearchEntity) {// 设置触发批量写入的最大动作数,// 解释:当达到指定的最大动作数时,将触发批量写入到 Elasticsearch。如果你希望在每次写入到 Elasticsearch 时都进行批量写入,可以将该值设置为 1esSinkBuilder.setBulkFlushMaxActions(elasticsearchEntity.getBulkFlushMaxActions());// 设置触发批量写入的最大数据量// 解释:当写入的数据量达到指定的最大值时,将触发批量写入到 Elasticsearch。单位为 MBesSinkBuilder.setBulkFlushMaxSizeMb(elasticsearchEntity.getBulkFlushMaxSizeMb());// 设置批量写入的时间间隔// 解释:每隔指定的时间间隔,无论是否达到最大动作数或最大数据量,都会触发批量写入esSinkBuilder.setBulkFlushInterval(elasticsearchEntity.getBulkFlushInterval());// 启用批量写入的退避策略// 解释:当 Elasticsearch 写入失败时,可以启用退避策略,以避免频繁的重试。此时,setBulkFlushBackoffDelay 和 setBulkFlushBackoffRetries 参数生效。esSinkBuilder.setBulkFlushBackoff(elasticsearchEntity.getBulkFlushBackoff());// 设置批量写入的退避延迟时间// 解释:在发生写入失败后,等待指定的延迟时间后再进行重试esSinkBuilder.setBulkFlushBackoffDelay(elasticsearchEntity.getBulkFlushBackoffDelay());// 设置批量写入的最大重试次数// 解释:设置在写入失败后的最大重试次数。超过这个次数后,将不再重试esSinkBuilder.setBulkFlushBackoffRetries(elasticsearchEntity.getBulkFlushBackoffRetries());// 设置写入失败时的处理策略// 解释:可以自定义处理失败的策略,实现 ElasticsearchSinkFunction.FailureHandler 接口esSinkBuilder.setFailureHandler(new CustomActionRequestFailureHandler());// 设置用于创建 Elasticsearch REST 客户端的工厂// 解释:可以自定义创建 Elasticsearch REST 客户端的逻辑,实现 ElasticsearchSinkBase.RestClientFactory 接口esSinkBuilder.setRestClientFactory(new CustomRestClientFactory(elasticsearchEntity));}/*** 描述:* elasticsearch 节点配置** @param elasticsearchEntity es实体类* @return {@code List<HttpHost> }*/private static List<HttpHost> esClusterHttpHostHandler(ElasticsearchEntity elasticsearchEntity) {List<HttpHost> httpHosts = new ArrayList<>();String[] clusterArray = elasticsearchEntity.getHosts().split(",");for (String node : clusterArray) {httpHosts.add(new HttpHost(node, elasticsearchEntity.getPort(), elasticsearchEntity.getScheme()));}return httpHosts;}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/472005.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习从入门到不想放弃-7

上一章的内容 深度学习从入门到不想放弃-6 (qq.com) 今天讲的也算基础(这个系列后来我一寻思,全是基础 ),但是可能要着重说下,今天讲前向计算和反向传播,在哪儿它都永远是核心,不管面对什么模型 前向计算: 有的叫也叫正向传播,正向计算的,有的直接把前向的方法梯度下…

程序全家桶 | 机器学习之心【Python机器学习/深度学习程序全家桶】

理论背景 机器学习&#xff08;Machine Learning&#xff09;是一种人工智能&#xff08;Artificial Intelligence&#xff09;领域的技术和方法&#xff0c;通过使用数据和统计模型&#xff0c;使计算机系统能够自动学习和改进&#xff0c;而无需明确地进行编程。机器学习使计…

接口测试全流程扫盲

扫盲内容&#xff1a; 1.什么是接口&#xff1f; 2.接口都有哪些类型&#xff1f; 3.接口的本质是什么&#xff1f; 4.什么是接口测试&#xff1f; 5.问什么要做接口测试&#xff1f; 6.怎样做接口测试&#xff1f; 7.接口测测试点是什么&#xff1f; 8.接口测试都要掌…

数据结构与算法:双向链表

朋友们大家好啊&#xff0c;在上节完成单链表的讲解后&#xff0c;我们本篇文章来对带头循环双向链表进行讲解 双向链表 双向链表、头节点和循环的介绍构建双向链表节点的构建初始化双向循环链表&#xff08;空链表&#xff09;销毁双向链表 链表的打印双向链表头尾的插与删尾插…

2024】前端,该卷什么呢?_2024-02-16

2024已来&#xff0c;过去的 2023 可以说是具有里程碑意义的一年&#xff0c;ChatGPT 的炸裂式发展&#xff0c;很多大佬都亲自入场整活儿&#xff0c;你不得不说&#xff0c;人工智能时代的未来已来&#xff0c;大势所趋&#xff0c;不可阻挡。随着生成式AI的迅猛发展&#xf…

AI:128-基于机器学习的建筑物能源消耗预测

🚀点击这里跳转到本专栏,可查阅专栏顶置最新的指南宝典~ 🎉🎊🎉 你的技术旅程将在这里启航! 从基础到实践,深入学习。无论你是初学者还是经验丰富的老手,对于本专栏案例和项目实践都有参考学习意义。 ✨✨✨ 每一个案例都附带有在本地跑过的关键代码,详细讲解供…

怎么恢复电脑重装前的数据?介绍几种有效的方法

在日常生活和工作中&#xff0c;电脑已成为我们不可或缺的工具。然而&#xff0c;有时候我们会遇到一些突发情况&#xff0c;比如电脑系统崩溃需要重新安装系统。在这个过程中&#xff0c;我们可能会失去一些重要的数据&#xff0c;比如照片、文档、视频等。这些数据可能包含着…

【NLP】MHA、MQA、GQA机制的区别

Note LLama2的注意力机制使用了GQA。三种机制的图如下&#xff1a; MHA机制&#xff08;Multi-head Attention&#xff09; MHA&#xff08;Multi-head Attention&#xff09;是标准的多头注意力机制&#xff0c;包含h个Query、Key 和 Value 矩阵。所有注意力头的 Key 和 V…

BBC英式口语~发音练习~笔记整理

参考资料 原视频地址&#xff1a; https://www.bilibili.com/video/BV1D7411n7bS/?spm_id_from333.1245.0.0&vd_source5986fc7c8e6d754f3ca44233573aeaff 笔记图片

Java Web 中forward 和 redirect 的区别

前言 在Java Web开发中&#xff0c;页面跳转是构建用户界面和实现业务逻辑的重要组成部分。Forward&#xff08;转发&#xff09;和Redirect&#xff08;重定向&#xff09;是两种常见的跳转方式&#xff0c;它们分别具有不同的特点和适用场景。正确地选择和使用这两种跳转方式…

html的列表标签

列表标签 列表在html里面经常会用到的&#xff0c;主要使用来布局的&#xff0c;使其整齐好看. 无序列表 无序列表[重要]&#xff1a; ul &#xff0c;li 示例代码1&#xff1a; 对应的效果&#xff1a; 无序列表的属性 属性值描述typedisc&#xff0c;square&#xff0c;…

文生视频:Sora模型报告总结

作为世界模拟器的视频生成模型 我们探索视频数据生成模型的大规模训练。具体来说&#xff0c;我们在可变持续时间、分辨率和宽高比的视频和图像上联合训练文本条件扩散模型。我们利用对视频和图像潜在代码的时空补丁进行操作的变压器架构。我们最大的模型 Sora 能够生成一分钟…