#ES配置
elasticsearch:
#enabled=true时,elasticsearch配置类或方法才会被启用或执行
enabled: true
#请求方式
http: http
#集群地址多个用,逗号隔开并加上端口号,如127.0.0.x:9200,127.0.0.x:9201,127.0.0.x:9203
hosts: 1.1.1.1
#单个端口号
port: 9200
#最大连接数 (maxConnTotal):设置总的最大连接数,取决于业务的并发量。500-2000 之间较为合理。
max-conn: 500
#每个节点的最大连接数 (maxConnPerRoute):控制每个节点的最大连接数,建议 50-100 之间。
max-per-route-conn: 50
#用户名与密码
xpack:
user: elastic
password: xxxxxx
elasticsearch:
#enabled=true时,elasticsearch配置类或方法才会被启用或执行
enabled: true
#请求方式
http: http
#集群地址多个用,逗号隔开并加上端口号,如127.0.0.x:9200,127.0.0.x:9201,127.0.0.x:9203
hosts: 1.1.1.1
#单个端口号
port: 9200
#最大连接数 (maxConnTotal):设置总的最大连接数,取决于业务的并发量。500-2000 之间较为合理。
max-conn: 500
#每个节点的最大连接数 (maxConnPerRoute):控制每个节点的最大连接数,建议 50-100 之间。
max-per-route-conn: 50
#用户名与密码
xpack:
user: elastic
password: xxxxxx
import cn.hutool.core.util.NumberUtil;
import cn.hutool.core.util.StrUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
/**
* @Auther: qqj
* @Date: 2025/01/10 10:15
* @Description:es8客户端配置
*/
@Configuration
@Slf4j
@ConditionalOnProperty(value = "elasticsearch.enabled", havingValue = "true")
public class ElasticsearchConfig {
@Value("${elasticsearch.hosts}")
private String hosts;
@Value("${elasticsearch.port}")
private Integer port;
@Value("${elasticsearch.max-conn}")
private Integer maxConn;
@Value("${elasticsearch.max-per-route-conn}")
private Integer maxPerRouteConn;
@Value("${elasticsearch.xpack.user}")
private String xpackUser;
@Value("${elasticsearch.xpack.password}")
private String xpackPassword;
@Value("${elasticsearch.http}")
private String http;
/**
* 同步方式
*
* @return ElasticsearchClient
*/
@Bean
public ElasticsearchClient elasticsearchClient() {
return new ElasticsearchClient(clientInit());
}
/**
* 异步方式
*
* @return ElasticsearchClient
*/
// @Bean
// public ElasticsearchAsyncClient elasticsearchAsyncClient() {
// return new ElasticsearchAsyncClient(clientInit());
// }
/**
*初始化ElasticsearchTransport对象
* @return
*/
public ElasticsearchTransport clientInit() {
// 将hosts字符串按逗号分割成服务器地址数组,将主机名字符串按逗号分割成数组
String[] hostStrs = hosts.split(",");
// 获取服务器地址数组的长度
int len = hostStrs.length;
if (0 == len) {
// 如果长度为0,表示没有配置服务器地址,记录错误日志
log.error("ElasticsearchClient 配置错误!");
}
// 创建一个HttpHost数组,用于存储Elasticsearch集群的主机信息
HttpHost[] httpHosts = new HttpHost[hostStrs.length];
// 遍历主机名数组,将每个主机名转换为HttpHost对象并存储到httpHosts数组中
for (int i = 0; i < hostStrs.length; i++) {
String host = hostStrs[i];
// 创建HttpHost对象,指定主机名、端口号和协议(http)
httpHosts[i] = new HttpHost(host, port, http);
}
ElasticsearchTransport transport = null;
// 如果只有一个服务器地址,表示不是集群
if (1 == len) {
// 如果没有配置账号和密码
if (StrUtil.isEmpty(xpackUser) && StrUtil.isEmpty(xpackPassword)) {
// 调用getElasticsearchTransport方法,传入转换后的HttpHost数组,获取ElasticsearchTransport对象
transport = getElasticsearchTransport(maxConn,maxPerRouteConn,toHttpHost());
} else {
// 如果配置了账号和密码
// 调用getElasticsearchTransport方法,传入账号密码和转换后的HttpHost数组,获取ElasticsearchTransport对象
transport = getElasticsearchTransport(maxConn,maxPerRouteConn,xpackUser, xpackPassword, toHttpHost());
}
} else {
// 如果有多个服务器地址,表示是集群
// 如果没有配置账号和密码
if (StrUtil.isEmpty(xpackUser) && StrUtil.isEmpty(xpackPassword)) {
// 调用getElasticsearchTransport方法,传入转换后的HttpHost数组,获取ElasticsearchTransport对象
transport = getElasticsearchTransport(maxConn,maxPerRouteConn,toHttpHost());
} else {
// 如果配置了账号和密码
// 调用getElasticsearchTransport方法,传入账号密码和转换后的HttpHost数组,获取ElasticsearchTransport对象
transport = getElasticsearchTransport(maxConn,maxPerRouteConn,xpackUser, xpackPassword, toHttpHost());
}
}
// 返回一个新的ElasticsearchClient对象,传入设置好的ElasticsearchTransport对象
// 创建Elasticsearch客户端对象
// ElasticsearchClient esClient = new ElasticsearchClient(transport);
// 返回ElasticsearchTransport对象,后续可以同步或异步调用
return transport;
}
private HttpHost[] toHttpHost() {
// 将hosts字符串按逗号分割成数组
String[] hostArray = hosts.split(",");
// 获取服务器地址数组的长度
int len = hostArray.length;
// 如果hosts字符串分割后的数组长度为0,表示配置无效
if (0 == len) {
// 如果长度为0,表示没有配置服务器地址,记录错误日志
throw new RuntimeException("ElasticsearchClient 配置错误!");
}
// 创建HttpHost数组,长度与hostArray相同
HttpHost[] httpHosts = new HttpHost[hostArray.length];
// 定义HttpHost对象
HttpHost httpHost;
// 遍历hostArray数组
for (int i = 0; i < hostArray.length; i++) {
//初始化port
Integer portOther = port;
// 获取每个host字符串
String host = hostArray[i];
// 将每个host字符串按冒号分割成字符串数组
String[] strings = host.split(":");
//集群host
String hostOtherStr = strings[0];
if(strings.length>1){
//集群端口
String portOtherStr = strings[1];
if(NumberUtil.isNumber(portOtherStr)){
portOther = NumberUtil.parseInt(portOtherStr);
}
}
// 创建HttpHost对象,设置主机名、端口号和协议
httpHost = new HttpHost(hostOtherStr, portOther, http);
// 将HttpHost对象添加到httpHosts数组中
httpHosts[i] = httpHost;
}
// 返回HttpHost数组
return httpHosts;
}
/**
-最大连接数 (maxConnTotal):设置总的最大连接数,取决于业务的并发量。500-2000 之间较为合理。
-每个节点的最大连接数 (maxConnPerRoute):控制每个节点的最大连接数,建议 50-100 之间。
-IO 线程数 (setIoThreadCount):根据 CPU 核心数设置,通常为 2-4 倍 CPU 核心数。
-连接超时、套接字超时、获取连接超时:一般设置为 10-30 秒,复杂查询或大数据量操作可适当增加到 20-60 秒。
-失败监听器 (setFailureListener):自定义重试和故障处理逻辑,确保高可用性。
*/
private static ElasticsearchTransport getElasticsearchTransport(Integer maxConn,Integer maxPerRouteConn,String xpackUser, String xpackPassword, HttpHost... hosts) {
// 创建一个基本的凭证提供者,用于存储账号密码
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// 设置凭证提供者的认证范围为任意,使用提供的用户名和密码
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(xpackUser, xpackPassword));
// 创建RestClientBuilder对象,用于构建RestClient
RestClientBuilder restClientBuilder = RestClient.builder(hosts);
// 设置RestClient的HttpClient配置回调
// 定义一个HTTP客户端配置回调接口的实现,用于自定义HTTP客户端的配置
// 这里包括了自签证书的设置以及账号密码的配置
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
// 设置最大连接数,控制整个客户端的连接数
httpClientBuilder.setMaxConnTotal(maxConn);
// 设置每个路由的最大连接数,控制每个服务器地址的连接数
httpClientBuilder.setMaxConnPerRoute(maxPerRouteConn);
// 设置默认请求头,指定内容类型为JSON
// httpClientBuilder.setDefaultHeaders(Stream.of(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())).collect(Collectors.toList()));
// 添加一个响应拦截器,用于在响应中添加X-Elastic-Product头,值为Elasticsearch
httpClientBuilder.addInterceptorLast((HttpResponseInterceptor) (response, context)-> response.addHeader("X-Elastic-Product", "Elasticsearch"));
// 设置默认的用户认证提供者
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// 设置主机名验证器为NoopHostnameVerifier,不进行主机名验证
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
// 设置自定义的SSL上下文,用于处理自签证书
httpClientBuilder.setSSLContext(buildSSLContext());
// 返回配置后的HttpClientBuilder
return httpClientBuilder;
});
// 配置连接超时、套接字超时、获取连接超时
restClientBuilder.setRequestConfigCallback(builder ->
builder.setConnectTimeout(20000) // 设置连接超时时间为20000毫秒
.setSocketTimeout(20000) // 设置套接字超时时间为20000毫秒
.setConnectionRequestTimeout(20000) // 设置获取连接超时时间为20000毫秒
);
// 创建RestClient
RestClient client = restClientBuilder.build();
// 创建并返回一个新的ElasticsearchTransport对象
// ElasticsearchTransport是Elasticsearch客户端与Elasticsearch集群通信的底层传输层
// 使用RestClient作为底层传输,并使用JacksonJsonpMapper作为JSON序列化和反序列化工具
ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper());
return transport;
}
private static ElasticsearchTransport getElasticsearchTransport(Integer maxConn,Integer maxPerRouteConn,HttpHost... hosts) {
// 使用RestClient.builder()方法创建RestClient对象,传入一个或多个HttpHost对象作为参数
// 这些HttpHost对象包含了Elasticsearch集群的主机地址和端口号
RestClientBuilder restClientBuilder = RestClient.builder(hosts);
// 设置RestClient的HttpClient配置回调
// 定义一个HTTP客户端配置回调接口的实现,用于自定义HTTP客户端的配置
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
// 设置最大连接数,控制整个客户端的连接数
httpClientBuilder.setMaxConnTotal(maxConn);
// 设置每个路由的最大连接数,控制每个服务器地址的连接数
httpClientBuilder.setMaxConnPerRoute(maxPerRouteConn);
// 设置默认请求头,指定内容类型为JSON
// httpClientBuilder.setDefaultHeaders(Collections.singletonList(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())));
// 添加一个响应拦截器,用于在响应中添加X-Elastic-Product头,值为Elasticsearch
httpClientBuilder.addInterceptorLast((HttpResponseInterceptor) (response, context)-> response.addHeader("X-Elastic-Product", "Elasticsearch"));
// 返回配置后的HttpClientBuilder
return httpClientBuilder;
});
// 配置连接超时、套接字超时、获取连接超时
restClientBuilder.setRequestConfigCallback(builder ->
builder.setConnectTimeout(20000) // 设置连接超时时间为20000毫秒
.setSocketTimeout(20000) // 设置套接字超时时间为20000毫秒
.setConnectionRequestTimeout(20000) // 设置获取连接超时时间为20000毫秒
);
// 创建RestClient
RestClient client = restClientBuilder.build();
// 创建并返回一个新的ElasticsearchTransport对象
// ElasticsearchTransport是Elasticsearch客户端与Elasticsearch集群通信的底层传输层
// 使用RestClient作为底层传输,并使用JacksonJsonpMapper作为JSON序列化和反序列化工具
ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper());
return transport;
}
private static SSLContext buildSSLContext() {
// 初始化SSLContext对象为null
SSLContext sslContext = null;
String fileName = "es01.crt";
// 加载 CA 证书
// Path cacertificatePath = Paths.get("path/to/http_ca.crt");
// 创建一个ClassPathResource对象,用于从类路径中加载指定的文件
ClassPathResource resource = new ClassPathResource(fileName);
boolean exists = isFileExists(resource);
if (exists) {
try {
// 创建一个CertificateFactory实例,用于生成X.509证书
CertificateFactory factory = CertificateFactory.getInstance("X.509");
// 定义一个Certificate对象,用于存储受信任的CA证书
Certificate trustedCa;
// 使用try-with-resources语句自动关闭InputStream
try (InputStream is = resource.getInputStream()) {
// 从输入流中生成证书
trustedCa = factory.generateCertificate(is);
}
// 创建一个KeyStore实例,用于存储受信任的证书
KeyStore trustStore = KeyStore.getInstance("pkcs12");
// 加载KeyStore,这里传入null表示不加载任何文件,因为我们要手动设置证书
trustStore.load(null, null);
// 将受信任的CA证书添加到KeyStore中
trustStore.setCertificateEntry("ca", trustedCa);
// 创建一个SSLContextBuilder实例,用于构建SSLContext
SSLContextBuilder sslContextBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
// 构建SSLContext
sslContext = sslContextBuilder.build();
} catch (CertificateException | IOException | KeyStoreException | NoSuchAlgorithmException |
KeyManagementException e) {
// 如果在构建SSLContext过程中发生异常,记录错误日志
log.error("ES连接认证失败", e);
}
}else{
log.error("文件 " + fileName + " 不存在于类路径中。");
}
// 返回构建好的SSLContext对象
return sslContext;
}
public static boolean isFileExists(ClassPathResource resource) {
try {
// 尝试获取文件的输入流
InputStream inputStream = resource.getInputStream();
// 如果能够成功获取到输入流,说明文件存在
// 关闭输入流以释放资源
inputStream.close();
return true;
} catch (Exception e) {
// 如果在获取输入流时发生异常,说明文件不存在
return false;
}
}
}
import cn.hutool.core.util.StrUtil;
import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.ElasticsearchTransport;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponseInterceptor;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.ssl.SSLContextBuilder;
import org.apache.http.ssl.SSLContexts;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import javax.net.ssl.SSLContext;
import java.io.IOException;
import java.io.InputStream;
import java.security.KeyManagementException;
import java.security.KeyStore;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.security.cert.Certificate;
import java.security.cert.CertificateException;
import java.security.cert.CertificateFactory;
/**
* @Auther: qqj
* @Date: 2025/01/10 10:15
* @Description:es8客户端配置
*/
@Configuration
@Slf4j
@ConditionalOnProperty(value = "elasticsearch.enabled", havingValue = "true")
public class ElasticsearchConfig {
@Value("${elasticsearch.hosts}")
private String hosts;
@Value("${elasticsearch.port}")
private Integer port;
@Value("${elasticsearch.max-conn}")
private Integer maxConn;
@Value("${elasticsearch.max-per-route-conn}")
private Integer maxPerRouteConn;
@Value("${elasticsearch.xpack.user}")
private String xpackUser;
@Value("${elasticsearch.xpack.password}")
private String xpackPassword;
@Value("${elasticsearch.http}")
private String http;
/**
* 同步方式
*
* @return ElasticsearchClient
*/
@Bean
public ElasticsearchClient elasticsearchClient() {
return new ElasticsearchClient(clientInit());
}
/**
* 异步方式
*
* @return ElasticsearchClient
*/
// @Bean
// public ElasticsearchAsyncClient elasticsearchAsyncClient() {
// return new ElasticsearchAsyncClient(clientInit());
// }
/**
*初始化ElasticsearchTransport对象
* @return
*/
public ElasticsearchTransport clientInit() {
// 将hosts字符串按逗号分割成服务器地址数组,将主机名字符串按逗号分割成数组
String[] hostStrs = hosts.split(",");
// 获取服务器地址数组的长度
int len = hostStrs.length;
if (0 == len) {
// 如果长度为0,表示没有配置服务器地址,记录错误日志
log.error("ElasticsearchClient 配置错误!");
}
// 创建一个HttpHost数组,用于存储Elasticsearch集群的主机信息
HttpHost[] httpHosts = new HttpHost[hostStrs.length];
// 遍历主机名数组,将每个主机名转换为HttpHost对象并存储到httpHosts数组中
for (int i = 0; i < hostStrs.length; i++) {
String host = hostStrs[i];
// 创建HttpHost对象,指定主机名、端口号和协议(http)
httpHosts[i] = new HttpHost(host, port, http);
}
ElasticsearchTransport transport = null;
// 如果只有一个服务器地址,表示不是集群
if (1 == len) {
// 如果没有配置账号和密码
if (StrUtil.isEmpty(xpackUser) && StrUtil.isEmpty(xpackPassword)) {
// 调用getElasticsearchTransport方法,传入转换后的HttpHost数组,获取ElasticsearchTransport对象
transport = getElasticsearchTransport(maxConn,maxPerRouteConn,toHttpHost());
} else {
// 如果配置了账号和密码
// 调用getElasticsearchTransport方法,传入账号密码和转换后的HttpHost数组,获取ElasticsearchTransport对象
transport = getElasticsearchTransport(maxConn,maxPerRouteConn,xpackUser, xpackPassword, toHttpHost());
}
} else {
// 如果有多个服务器地址,表示是集群
// 如果没有配置账号和密码
if (StrUtil.isEmpty(xpackUser) && StrUtil.isEmpty(xpackPassword)) {
// 调用getElasticsearchTransport方法,传入转换后的HttpHost数组,获取ElasticsearchTransport对象
transport = getElasticsearchTransport(maxConn,maxPerRouteConn,toHttpHost());
} else {
// 如果配置了账号和密码
// 调用getElasticsearchTransport方法,传入账号密码和转换后的HttpHost数组,获取ElasticsearchTransport对象
transport = getElasticsearchTransport(maxConn,maxPerRouteConn,xpackUser, xpackPassword, toHttpHost());
}
}
// 返回一个新的ElasticsearchClient对象,传入设置好的ElasticsearchTransport对象
// 创建Elasticsearch客户端对象
// ElasticsearchClient esClient = new ElasticsearchClient(transport);
// 返回ElasticsearchTransport对象,后续可以同步或异步调用
return transport;
}
private HttpHost[] toHttpHost() {
// 将hosts字符串按逗号分割成数组
String[] hostArray = hosts.split(",");
// 获取服务器地址数组的长度
int len = hostArray.length;
// 如果hosts字符串分割后的数组长度为0,表示配置无效
if (0 == len) {
// 如果长度为0,表示没有配置服务器地址,记录错误日志
throw new RuntimeException("ElasticsearchClient 配置错误!");
}
// 创建HttpHost数组,长度与hostArray相同
HttpHost[] httpHosts = new HttpHost[hostArray.length];
// 定义HttpHost对象
HttpHost httpHost;
// 遍历hostArray数组
for (int i = 0; i < hostArray.length; i++) {
//初始化port
Integer portOther = port;
// 获取每个host字符串
String host = hostArray[i];
// 将每个host字符串按冒号分割成字符串数组
String[] strings = host.split(":");
//集群host
String hostOtherStr = strings[0];
if(strings.length>1){
//集群端口
String portOtherStr = strings[1];
if(NumberUtil.isNumber(portOtherStr)){
portOther = NumberUtil.parseInt(portOtherStr);
}
}
// 创建HttpHost对象,设置主机名、端口号和协议
httpHost = new HttpHost(hostOtherStr, portOther, http);
// 将HttpHost对象添加到httpHosts数组中
httpHosts[i] = httpHost;
}
// 返回HttpHost数组
return httpHosts;
}
/**
-最大连接数 (maxConnTotal):设置总的最大连接数,取决于业务的并发量。500-2000 之间较为合理。
-每个节点的最大连接数 (maxConnPerRoute):控制每个节点的最大连接数,建议 50-100 之间。
-IO 线程数 (setIoThreadCount):根据 CPU 核心数设置,通常为 2-4 倍 CPU 核心数。
-连接超时、套接字超时、获取连接超时:一般设置为 10-30 秒,复杂查询或大数据量操作可适当增加到 20-60 秒。
-失败监听器 (setFailureListener):自定义重试和故障处理逻辑,确保高可用性。
*/
private static ElasticsearchTransport getElasticsearchTransport(Integer maxConn,Integer maxPerRouteConn,String xpackUser, String xpackPassword, HttpHost... hosts) {
// 创建一个基本的凭证提供者,用于存储账号密码
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
// 设置凭证提供者的认证范围为任意,使用提供的用户名和密码
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(xpackUser, xpackPassword));
// 创建RestClientBuilder对象,用于构建RestClient
RestClientBuilder restClientBuilder = RestClient.builder(hosts);
// 设置RestClient的HttpClient配置回调
// 定义一个HTTP客户端配置回调接口的实现,用于自定义HTTP客户端的配置
// 这里包括了自签证书的设置以及账号密码的配置
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
// 设置最大连接数,控制整个客户端的连接数
httpClientBuilder.setMaxConnTotal(maxConn);
// 设置每个路由的最大连接数,控制每个服务器地址的连接数
httpClientBuilder.setMaxConnPerRoute(maxPerRouteConn);
// 设置默认请求头,指定内容类型为JSON
// httpClientBuilder.setDefaultHeaders(Stream.of(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())).collect(Collectors.toList()));
// 添加一个响应拦截器,用于在响应中添加X-Elastic-Product头,值为Elasticsearch
httpClientBuilder.addInterceptorLast((HttpResponseInterceptor) (response, context)-> response.addHeader("X-Elastic-Product", "Elasticsearch"));
// 设置默认的用户认证提供者
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
// 设置主机名验证器为NoopHostnameVerifier,不进行主机名验证
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
// 设置自定义的SSL上下文,用于处理自签证书
httpClientBuilder.setSSLContext(buildSSLContext());
// 返回配置后的HttpClientBuilder
return httpClientBuilder;
});
// 配置连接超时、套接字超时、获取连接超时
restClientBuilder.setRequestConfigCallback(builder ->
builder.setConnectTimeout(20000) // 设置连接超时时间为20000毫秒
.setSocketTimeout(20000) // 设置套接字超时时间为20000毫秒
.setConnectionRequestTimeout(20000) // 设置获取连接超时时间为20000毫秒
);
// 创建RestClient
RestClient client = restClientBuilder.build();
// 创建并返回一个新的ElasticsearchTransport对象
// ElasticsearchTransport是Elasticsearch客户端与Elasticsearch集群通信的底层传输层
// 使用RestClient作为底层传输,并使用JacksonJsonpMapper作为JSON序列化和反序列化工具
ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper());
return transport;
}
private static ElasticsearchTransport getElasticsearchTransport(Integer maxConn,Integer maxPerRouteConn,HttpHost... hosts) {
// 使用RestClient.builder()方法创建RestClient对象,传入一个或多个HttpHost对象作为参数
// 这些HttpHost对象包含了Elasticsearch集群的主机地址和端口号
RestClientBuilder restClientBuilder = RestClient.builder(hosts);
// 设置RestClient的HttpClient配置回调
// 定义一个HTTP客户端配置回调接口的实现,用于自定义HTTP客户端的配置
restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
// 设置最大连接数,控制整个客户端的连接数
httpClientBuilder.setMaxConnTotal(maxConn);
// 设置每个路由的最大连接数,控制每个服务器地址的连接数
httpClientBuilder.setMaxConnPerRoute(maxPerRouteConn);
// 设置默认请求头,指定内容类型为JSON
// httpClientBuilder.setDefaultHeaders(Collections.singletonList(new BasicHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString())));
// 添加一个响应拦截器,用于在响应中添加X-Elastic-Product头,值为Elasticsearch
httpClientBuilder.addInterceptorLast((HttpResponseInterceptor) (response, context)-> response.addHeader("X-Elastic-Product", "Elasticsearch"));
// 返回配置后的HttpClientBuilder
return httpClientBuilder;
});
// 配置连接超时、套接字超时、获取连接超时
restClientBuilder.setRequestConfigCallback(builder ->
builder.setConnectTimeout(20000) // 设置连接超时时间为20000毫秒
.setSocketTimeout(20000) // 设置套接字超时时间为20000毫秒
.setConnectionRequestTimeout(20000) // 设置获取连接超时时间为20000毫秒
);
// 创建RestClient
RestClient client = restClientBuilder.build();
// 创建并返回一个新的ElasticsearchTransport对象
// ElasticsearchTransport是Elasticsearch客户端与Elasticsearch集群通信的底层传输层
// 使用RestClient作为底层传输,并使用JacksonJsonpMapper作为JSON序列化和反序列化工具
ElasticsearchTransport transport = new RestClientTransport(client, new JacksonJsonpMapper());
return transport;
}
private static SSLContext buildSSLContext() {
// 初始化SSLContext对象为null
SSLContext sslContext = null;
String fileName = "es01.crt";
// 加载 CA 证书
// Path cacertificatePath = Paths.get("path/to/http_ca.crt");
// 创建一个ClassPathResource对象,用于从类路径中加载指定的文件
ClassPathResource resource = new ClassPathResource(fileName);
boolean exists = isFileExists(resource);
if (exists) {
try {
// 创建一个CertificateFactory实例,用于生成X.509证书
CertificateFactory factory = CertificateFactory.getInstance("X.509");
// 定义一个Certificate对象,用于存储受信任的CA证书
Certificate trustedCa;
// 使用try-with-resources语句自动关闭InputStream
try (InputStream is = resource.getInputStream()) {
// 从输入流中生成证书
trustedCa = factory.generateCertificate(is);
}
// 创建一个KeyStore实例,用于存储受信任的证书
KeyStore trustStore = KeyStore.getInstance("pkcs12");
// 加载KeyStore,这里传入null表示不加载任何文件,因为我们要手动设置证书
trustStore.load(null, null);
// 将受信任的CA证书添加到KeyStore中
trustStore.setCertificateEntry("ca", trustedCa);
// 创建一个SSLContextBuilder实例,用于构建SSLContext
SSLContextBuilder sslContextBuilder = SSLContexts.custom().loadTrustMaterial(trustStore, null);
// 构建SSLContext
sslContext = sslContextBuilder.build();
} catch (CertificateException | IOException | KeyStoreException | NoSuchAlgorithmException |
KeyManagementException e) {
// 如果在构建SSLContext过程中发生异常,记录错误日志
log.error("ES连接认证失败", e);
}
}else{
log.error("文件 " + fileName + " 不存在于类路径中。");
}
// 返回构建好的SSLContext对象
return sslContext;
}
public static boolean isFileExists(ClassPathResource resource) {
try {
// 尝试获取文件的输入流
InputStream inputStream = resource.getInputStream();
// 如果能够成功获取到输入流,说明文件存在
// 关闭输入流以释放资源
inputStream.close();
return true;
} catch (Exception e) {
// 如果在获取输入流时发生异常,说明文件不存在
return false;
}
}
}
/**
* 定时更新任务
* @return Boolean
*/
@Override
public Boolean regularlyUpdate() {
try {
//初始化索引
initIndex();
//批量插入数据
initAddressMetaData();
}catch (Exception e){
log.error("定时更新任务异常",e);
}
return Boolean.TRUE;
}
public void initAddressMetaData() throws Exception {
try {
//5天
long isTime = 5*24*60*60;
if(CollUtil.isNotEmpty(listGroup)){
//获取所有街道名称
Set<String> streetNmSet = listGroup.stream().filter(vo -> StrUtil.isNotEmpty(vo.getJd())).map(vo -> vo.getJd()).collect(Collectors.toSet());
if(CollUtil.isNotEmpty(streetNmSet)){
//4.根据街道名称去除
String streetNameJoin = CollUtil.join(streetNmSet, "|");
//存入reids中
if(StrUtil.isNotEmpty(streetNameJoin)){
redisService.setAndExpire(CommonConstants.STREET_NAME_JOIN, streetNameJoin,isTime);
}
}
//获取所有社区名称
Set<String> communityNmSet = listGroup.stream().filter(vo -> StrUtil.isNotEmpty(vo.getSqmc())).map(vo -> vo.getSqmc()).collect(Collectors.toSet());
if(CollUtil.isNotEmpty(communityNmSet)){
//5.根据社区名称去除
String communityNameJoin = CollUtil.join(communityNmSet, "|");
//存入reids中
if(StrUtil.isNotEmpty(communityNameJoin)){
redisService.setAndExpire(CommonConstants.COMMUNITY_NAME_JOIN, communityNameJoin,isTime);
}
}
//获取所有街路巷名称
Set<String> roadNmSet = listGroup.stream().filter(vo -> StrUtil.isNotEmpty(vo.getJlx())).map(vo -> vo.getJlx()).collect(Collectors.toSet());
if(CollUtil.isNotEmpty(roadNmSet)){
//6.根据街路巷名称去除
String roadNameJoin = CollUtil.join(roadNmSet, "|");
//存入reids中
if(StrUtil.isNotEmpty(roadNameJoin)){
redisService.setAndExpire(CommonConstants.ROAD_NAME_JOIN, roadNameJoin,isTime);
}
}
}
if(CollUtil.isNotEmpty(addressCompletionVOList)){
bulkAddDate(ADDRESS_COMPLETION_ES_INDEX_ALIASES, "id", addressCompletionVOList);
}
} catch (Exception e) {
log.error("初始化地址字典库失败", e);
throw new Exception("初始化失败");
}
}
/**
* 批量插入ES
*批量插入文档
* @param indexName 索引
* @param idName id名称
* @param list 数据集合
*/
public void bulkAddDate(String indexName, String idName, List<AddressCompletionVO> list) {
try {
// 检查传入的list是否为空或长度为0,如果是则直接返回
if (null == list || list.size() <= 0) {
return;
}
// 检查索引名称是否为空,如果是则直接返回
if (StrUtil.isEmpty(indexName)) {
return;
}
// 创建BulkRequest的构建器
BulkRequest.Builder request = new BulkRequest.Builder();
// 遍历传入的list,为每个Map对象创建一个bulk操作
for (AddressCompletionVO map : list) {
// 如果idName为null,表示不需要指定文档id,Elasticsearch会自动生成
if (idName == null) {
request.operations(op -> op.create(c -> c
.index(indexName) // 指定索引名称
.document(map) // 设置文档内容
)
);
} else {
// 如果idName不为null,从map中获取idName对应的值作为文档id
if (StrUtil.isNotEmpty(idName)) {
String idStr = StrUtil.toString(map.getId());
request.operations(op -> op.create(c -> c
.index(indexName) // 指定索引名称
.id(idStr) // 设置文档id
.document(map) // 设置文档内容
)
);
}
}
}
// 发送同步bulk请求
BulkResponse bulkResponse = elasticsearchClient.bulk(request.build());
// 处理bulk响应
if (bulkResponse.errors()) {
// 如果存在错误,遍历每个bulk响应项并打印错误信息
for (BulkResponseItem item : bulkResponse.items()) {
if (item.error() != null) {
log.error(item.error().reason());
}
}
} else {
// 如果没有错误,遍历每个bulk响应项并打印操作结果
for (BulkResponseItem item : bulkResponse.items()) {
String index = item.index(); // 获取索引名称
String result = item.result(); // 获取操作结果
int status = item.status(); // 获取操作状态码
// 根据操作结果打印不同的日志信息
if (StrUtil.contains(result, "created")) {
log.info("新增成功,{}:{}" , index , status);
} else if (StrUtil.contains(result, "update")) {
log.info("修改成功,{}:{}" , index , status);
} else if (StrUtil.contains(result, "delete")) {
log.info("删除成功,{}:{}" , index , status);
}
}
}
} catch (ElasticsearchException | IOException e) {
// 捕获ElasticsearchException和IOException异常,并打印错误日志
log.error("批量插入ES数据失败", e);
}
}
/**
* 初始化索引
* @throws IOException
*/
public void initIndex() throws IOException {
// 获取当前日期,格式为yyyyMMdd
String nowDateStr = DateUtil.format(DateUtil.date(), DatePattern.PURE_DATE_PATTERN);
// 拼接索引名称,包含日期
// String esIndexStr = ADDRESS_COMPLETION_ES_INDEX + nowDateStr;
String esIndexStr = ADDRESS_COMPLETION_ES_INDEX + "aut";
// 检查索引是否存在
BooleanResponse exists = elasticsearchClient.indices()
.exists(c -> c.index(ADDRESS_COMPLETION_ES_INDEX_ALIASES));
// 如果索引存在,则先删除索引,避免后面创建时索引已存在导致不创建
if (exists.value()) {
deleteIndex(esIndexStr);
// 此处注释掉return,表示即使索引存在并被删除,后续代码仍会继续执行创建索引的操作
// log.info("索引已存在");
// return;
}
// 获取Elasticsearch的映射配置文件内容
String mapping = getEsMapping("AddressCompletion.json");
// 将映射配置文件内容解析为JsonParser对象
JsonParser documentJSON = Json.createParser(new StringReader(mapping));
// 构建自定义分析器,用于处理特定的分词需求
Analyzer commaAnalyzer = new Analyzer.Builder()
.pattern(new PatternAnalyzer.Builder().pattern("#").build())
.build();
// 构建索引设置,包括分片数、副本数、刷新间隔以及自定义分析器等配置
IndexSettings settings = IndexSettings.of(sBuilder -> sBuilder
.index(iBuilder -> iBuilder
// 设置索引有2个分片
.numberOfShards("2")
// 设置索引有1个副本
.numberOfReplicas("1")
// 设置索引的刷新间隔为3秒
.refreshInterval(t -> t.time("3s"))
).analysis(an->an.
// 新建一个名为"comma"的分词器,使用上面构建的自定义分析器
analyzer("comma",commaAnalyzer)
)
);
// 创建索引,并设置索引名称、索引设置、映射配置以及别名等
CreateIndexResponse createIndexResponse = elasticsearchClient.indices().create(createIndexBuilder ->
createIndexBuilder.index(esIndexStr)
.settings(settings)
.mappings(TypeMapping._DESERIALIZER.deserialize(documentJSON, new JacksonJsonpMapper()))
.aliases(ADDRESS_COMPLETION_ES_INDEX_ALIASES, aliases ->
aliases.isWriteIndex(true))
);
// 获取创建索引的响应状态
boolean acknowledged = createIndexResponse.acknowledged(); // 确认索引是否已创建
boolean shardsAcknowledged = createIndexResponse.shardsAcknowledged(); // 确认分片是否已创建
String index = createIndexResponse.index(); // 获取创建的索引名称
// 记录创建索引的相关日志信息,方便后续查看和调试
log.info("创建索引状态:{}", acknowledged);
log.info("已确认的分片:{}", shardsAcknowledged);
log.info("索引名称:{}", index);
}
private String getEsMapping(String fileName) {
// 从类路径下的/es-mappings/目录中获取指定文件的输入流
InputStream resourceAsStream = this.getClass().getResourceAsStream("/es-mappings/" + fileName);
try {
// 创建StringBuilder用于拼接文件内容
StringBuilder sb = new StringBuilder();
// 使用BufferedReader读取输入流,指定字符集为UTF-8
BufferedReader br = new BufferedReader(new InputStreamReader(resourceAsStream, StandardCharsets.UTF_8));
String line;
// 逐行读取文件内容并追加到StringBuilder中,每行后追加换行符
while ((line = br.readLine()) != null) {
sb.append(line).append("\n");
}
// 关闭输入流
resourceAsStream.close();
// 返回拼接后的文件内容字符串
return sb.toString();
// 下面注释掉的代码是另一种读取文件内容为字符串的方式,使用CharStreams工具类
// String jsonString = CharStreams.toString(new InputStreamReader(resourceAsStream, Charsets.UTF_8));
} catch (Exception e) {
// 如果读取过程中出现异常,则抛出IllegalArgumentException,并携带异常信息
throw new IllegalArgumentException("读取es配置异常:", e);
}
}
/**
* 删除索引
* @param indexName 索引
*/
public void deleteIndex(String indexName) {
try {
//1.判断是否存在索引
ExistsRequest existsRequest = ExistsRequest.of(s -> s.index(indexName));
boolean isExit = elasticsearchClient.indices().exists(existsRequest).value();
//2.删除索引
if(isExit){
// 创建获取索引请求,使用通配符匹配索引
GetIndexRequest requestGet = GetIndexRequest.of(s -> s.index(indexName));
//查询索引
// Set<String> indexKeySets = elasticsearchClient.indices().get(requestGet).result().keySet();
// 执行获取索引操作
GetIndexResponse getIndexResponse = elasticsearchClient.indices().get(requestGet);
// 获取索引信息
Map<String, IndexState> result = getIndexResponse.result();
// 遍历并打印索引信息
// result.forEach((k, v) -> log.info("\n key = {},\n value = {}", k, v));
if(ObjectUtil.isNotNull(result)&&result.size()>0){
result.forEach((k, v)->{
try {
// 创建删除索引请求
//设置超时,等待所有节点确认索引删除
DeleteIndexRequest request = DeleteIndexRequest.of(s -> s
.index(k).timeout(a->a.time("2s"))
);
//设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式
// 执行删除索引操作
DeleteIndexResponse response = elasticsearchClient.indices().delete(request);
// 检查删除操作是否被确认
if (response.acknowledged()) {
log.info("索引 {} 删除成功", k);
} else {
log.warn("索引 {} 删除未被确认", k);
}
} catch (ElasticsearchException e) {
if (e.status() == 404) {
log.info("索引不存在,无需删除",e);
} else {
log.error("删除索引时发生错误", e);
}
} catch (IOException e) {
log.error("与Elasticsearch通信时发生I/O错误", e);
}
});
}
}else{
log.info("索引不存在,无需删除");
}
} catch (ElasticsearchException e) {
if (e.status() == 404) {
log.info("索引不存在,无需删除",e);
} else {
log.error("删除索引时发生错误", e);
}
} catch (IOException e) {
log.error("与Elasticsearch通信时发生I/O错误", e);
}
}
* 定时更新任务
* @return Boolean
*/
@Override
public Boolean regularlyUpdate() {
try {
//初始化索引
initIndex();
//批量插入数据
initAddressMetaData();
}catch (Exception e){
log.error("定时更新任务异常",e);
}
return Boolean.TRUE;
}
public void initAddressMetaData() throws Exception {
try {
//5天
long isTime = 5*24*60*60;
if(CollUtil.isNotEmpty(listGroup)){
//获取所有街道名称
Set<String> streetNmSet = listGroup.stream().filter(vo -> StrUtil.isNotEmpty(vo.getJd())).map(vo -> vo.getJd()).collect(Collectors.toSet());
if(CollUtil.isNotEmpty(streetNmSet)){
//4.根据街道名称去除
String streetNameJoin = CollUtil.join(streetNmSet, "|");
//存入reids中
if(StrUtil.isNotEmpty(streetNameJoin)){
redisService.setAndExpire(CommonConstants.STREET_NAME_JOIN, streetNameJoin,isTime);
}
}
//获取所有社区名称
Set<String> communityNmSet = listGroup.stream().filter(vo -> StrUtil.isNotEmpty(vo.getSqmc())).map(vo -> vo.getSqmc()).collect(Collectors.toSet());
if(CollUtil.isNotEmpty(communityNmSet)){
//5.根据社区名称去除
String communityNameJoin = CollUtil.join(communityNmSet, "|");
//存入reids中
if(StrUtil.isNotEmpty(communityNameJoin)){
redisService.setAndExpire(CommonConstants.COMMUNITY_NAME_JOIN, communityNameJoin,isTime);
}
}
//获取所有街路巷名称
Set<String> roadNmSet = listGroup.stream().filter(vo -> StrUtil.isNotEmpty(vo.getJlx())).map(vo -> vo.getJlx()).collect(Collectors.toSet());
if(CollUtil.isNotEmpty(roadNmSet)){
//6.根据街路巷名称去除
String roadNameJoin = CollUtil.join(roadNmSet, "|");
//存入reids中
if(StrUtil.isNotEmpty(roadNameJoin)){
redisService.setAndExpire(CommonConstants.ROAD_NAME_JOIN, roadNameJoin,isTime);
}
}
}
if(CollUtil.isNotEmpty(addressCompletionVOList)){
bulkAddDate(ADDRESS_COMPLETION_ES_INDEX_ALIASES, "id", addressCompletionVOList);
}
} catch (Exception e) {
log.error("初始化地址字典库失败", e);
throw new Exception("初始化失败");
}
}
/**
* 批量插入ES
*批量插入文档
* @param indexName 索引
* @param idName id名称
* @param list 数据集合
*/
public void bulkAddDate(String indexName, String idName, List<AddressCompletionVO> list) {
try {
// 检查传入的list是否为空或长度为0,如果是则直接返回
if (null == list || list.size() <= 0) {
return;
}
// 检查索引名称是否为空,如果是则直接返回
if (StrUtil.isEmpty(indexName)) {
return;
}
// 创建BulkRequest的构建器
BulkRequest.Builder request = new BulkRequest.Builder();
// 遍历传入的list,为每个Map对象创建一个bulk操作
for (AddressCompletionVO map : list) {
// 如果idName为null,表示不需要指定文档id,Elasticsearch会自动生成
if (idName == null) {
request.operations(op -> op.create(c -> c
.index(indexName) // 指定索引名称
.document(map) // 设置文档内容
)
);
} else {
// 如果idName不为null,从map中获取idName对应的值作为文档id
if (StrUtil.isNotEmpty(idName)) {
String idStr = StrUtil.toString(map.getId());
request.operations(op -> op.create(c -> c
.index(indexName) // 指定索引名称
.id(idStr) // 设置文档id
.document(map) // 设置文档内容
)
);
}
}
}
// 发送同步bulk请求
BulkResponse bulkResponse = elasticsearchClient.bulk(request.build());
// 处理bulk响应
if (bulkResponse.errors()) {
// 如果存在错误,遍历每个bulk响应项并打印错误信息
for (BulkResponseItem item : bulkResponse.items()) {
if (item.error() != null) {
log.error(item.error().reason());
}
}
} else {
// 如果没有错误,遍历每个bulk响应项并打印操作结果
for (BulkResponseItem item : bulkResponse.items()) {
String index = item.index(); // 获取索引名称
String result = item.result(); // 获取操作结果
int status = item.status(); // 获取操作状态码
// 根据操作结果打印不同的日志信息
if (StrUtil.contains(result, "created")) {
log.info("新增成功,{}:{}" , index , status);
} else if (StrUtil.contains(result, "update")) {
log.info("修改成功,{}:{}" , index , status);
} else if (StrUtil.contains(result, "delete")) {
log.info("删除成功,{}:{}" , index , status);
}
}
}
} catch (ElasticsearchException | IOException e) {
// 捕获ElasticsearchException和IOException异常,并打印错误日志
log.error("批量插入ES数据失败", e);
}
}
/**
* 初始化索引
* @throws IOException
*/
public void initIndex() throws IOException {
// 获取当前日期,格式为yyyyMMdd
String nowDateStr = DateUtil.format(DateUtil.date(), DatePattern.PURE_DATE_PATTERN);
// 拼接索引名称,包含日期
// String esIndexStr = ADDRESS_COMPLETION_ES_INDEX + nowDateStr;
String esIndexStr = ADDRESS_COMPLETION_ES_INDEX + "aut";
// 检查索引是否存在
BooleanResponse exists = elasticsearchClient.indices()
.exists(c -> c.index(ADDRESS_COMPLETION_ES_INDEX_ALIASES));
// 如果索引存在,则先删除索引,避免后面创建时索引已存在导致不创建
if (exists.value()) {
deleteIndex(esIndexStr);
// 此处注释掉return,表示即使索引存在并被删除,后续代码仍会继续执行创建索引的操作
// log.info("索引已存在");
// return;
}
// 获取Elasticsearch的映射配置文件内容
String mapping = getEsMapping("AddressCompletion.json");
// 将映射配置文件内容解析为JsonParser对象
JsonParser documentJSON = Json.createParser(new StringReader(mapping));
// 构建自定义分析器,用于处理特定的分词需求
Analyzer commaAnalyzer = new Analyzer.Builder()
.pattern(new PatternAnalyzer.Builder().pattern("#").build())
.build();
// 构建索引设置,包括分片数、副本数、刷新间隔以及自定义分析器等配置
IndexSettings settings = IndexSettings.of(sBuilder -> sBuilder
.index(iBuilder -> iBuilder
// 设置索引有2个分片
.numberOfShards("2")
// 设置索引有1个副本
.numberOfReplicas("1")
// 设置索引的刷新间隔为3秒
.refreshInterval(t -> t.time("3s"))
).analysis(an->an.
// 新建一个名为"comma"的分词器,使用上面构建的自定义分析器
analyzer("comma",commaAnalyzer)
)
);
// 创建索引,并设置索引名称、索引设置、映射配置以及别名等
CreateIndexResponse createIndexResponse = elasticsearchClient.indices().create(createIndexBuilder ->
createIndexBuilder.index(esIndexStr)
.settings(settings)
.mappings(TypeMapping._DESERIALIZER.deserialize(documentJSON, new JacksonJsonpMapper()))
.aliases(ADDRESS_COMPLETION_ES_INDEX_ALIASES, aliases ->
aliases.isWriteIndex(true))
);
// 获取创建索引的响应状态
boolean acknowledged = createIndexResponse.acknowledged(); // 确认索引是否已创建
boolean shardsAcknowledged = createIndexResponse.shardsAcknowledged(); // 确认分片是否已创建
String index = createIndexResponse.index(); // 获取创建的索引名称
// 记录创建索引的相关日志信息,方便后续查看和调试
log.info("创建索引状态:{}", acknowledged);
log.info("已确认的分片:{}", shardsAcknowledged);
log.info("索引名称:{}", index);
}
private String getEsMapping(String fileName) {
// 从类路径下的/es-mappings/目录中获取指定文件的输入流
InputStream resourceAsStream = this.getClass().getResourceAsStream("/es-mappings/" + fileName);
try {
// 创建StringBuilder用于拼接文件内容
StringBuilder sb = new StringBuilder();
// 使用BufferedReader读取输入流,指定字符集为UTF-8
BufferedReader br = new BufferedReader(new InputStreamReader(resourceAsStream, StandardCharsets.UTF_8));
String line;
// 逐行读取文件内容并追加到StringBuilder中,每行后追加换行符
while ((line = br.readLine()) != null) {
sb.append(line).append("\n");
}
// 关闭输入流
resourceAsStream.close();
// 返回拼接后的文件内容字符串
return sb.toString();
// 下面注释掉的代码是另一种读取文件内容为字符串的方式,使用CharStreams工具类
// String jsonString = CharStreams.toString(new InputStreamReader(resourceAsStream, Charsets.UTF_8));
} catch (Exception e) {
// 如果读取过程中出现异常,则抛出IllegalArgumentException,并携带异常信息
throw new IllegalArgumentException("读取es配置异常:", e);
}
}
/**
* 删除索引
* @param indexName 索引
*/
public void deleteIndex(String indexName) {
try {
//1.判断是否存在索引
ExistsRequest existsRequest = ExistsRequest.of(s -> s.index(indexName));
boolean isExit = elasticsearchClient.indices().exists(existsRequest).value();
//2.删除索引
if(isExit){
// 创建获取索引请求,使用通配符匹配索引
GetIndexRequest requestGet = GetIndexRequest.of(s -> s.index(indexName));
//查询索引
// Set<String> indexKeySets = elasticsearchClient.indices().get(requestGet).result().keySet();
// 执行获取索引操作
GetIndexResponse getIndexResponse = elasticsearchClient.indices().get(requestGet);
// 获取索引信息
Map<String, IndexState> result = getIndexResponse.result();
// 遍历并打印索引信息
// result.forEach((k, v) -> log.info("\n key = {},\n value = {}", k, v));
if(ObjectUtil.isNotNull(result)&&result.size()>0){
result.forEach((k, v)->{
try {
// 创建删除索引请求
//设置超时,等待所有节点确认索引删除
DeleteIndexRequest request = DeleteIndexRequest.of(s -> s
.index(k).timeout(a->a.time("2s"))
);
//设置IndicesOptions控制如何解决不可用的索引以及如何扩展通配符表达式
// 执行删除索引操作
DeleteIndexResponse response = elasticsearchClient.indices().delete(request);
// 检查删除操作是否被确认
if (response.acknowledged()) {
log.info("索引 {} 删除成功", k);
} else {
log.warn("索引 {} 删除未被确认", k);
}
} catch (ElasticsearchException e) {
if (e.status() == 404) {
log.info("索引不存在,无需删除",e);
} else {
log.error("删除索引时发生错误", e);
}
} catch (IOException e) {
log.error("与Elasticsearch通信时发生I/O错误", e);
}
});
}
}else{
log.info("索引不存在,无需删除");
}
} catch (ElasticsearchException e) {
if (e.status() == 404) {
log.info("索引不存在,无需删除",e);
} else {
log.error("删除索引时发生错误", e);
}
} catch (IOException e) {
log.error("与Elasticsearch通信时发生I/O错误", e);
}
}