本文为博主原创,未经授权,严禁转载及使用。
本文链接:https://blog.csdn.net/zyooooxie/article/details/109588072
前面刚写了 requests发请求 操作Elasticsearch - Search https://blog.csdn.net/zyooooxie/article/details/123730279,再来分享下 使用elasticsearch库 ;
【实际这篇博客推迟发布N个月】
个人博客:https://blog.csdn.net/zyooooxie
【以下所有内容仅为个人项目经历,如有不同,纯属正常】
Python Client
https://www.elastic.co/guide/en/elasticsearch/client/index.html
我使用的 是 7.17.0;
https://pypi.org/project/elasticsearch/7.17.0/
https://www.elastic.co/guide/en/elasticsearch/client/python-api/7.17/overview.html
https://elasticsearch-py.readthedocs.io/en/v7.17.0/index.html
"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
@email: zyooooxie@gmail.com
"""import time
import traceback
import sys
import json
import string
import math
import random
from typing import Optional, Union, List, Any
from user_log import Logfrom elasticsearch import Elasticsearch
from elasticsearch.helpers import BulkIndexErrorgl_es_host_new = 'http://1.1.1.1:1111'
gl_es_host_new_2 = ['http://1.1.1.1:1111', 'http://2.2.2.2:2222']# ``port`` needs to be an int.
gl_es_host_new_3 = [{'host': '2.2.2.2', 'port': 2222}]
gl_es_host_new_4 = [{'host': '2.2.2.2', 'port': 2222}, {'host': '1.1.1.1', 'port': 1111}]gl_es_auth = ('es_username', 'es_password')gl_type = '_doc'gl_search_dict = {'size': 100, 'from': 0, "sort": {"xxxXXX": {"order": "desc"}}}# pip install elasticsearch==7.17.0
# https://pypi.org/project/elasticsearch/7.17.0/
# https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs.html# https://elasticsearch-py.readthedocs.io/en/v7.17.0/api.html# doc_type 不建议使用 【Specifying types in requests is deprecated】
# https://www.elastic.co/guide/en/elasticsearch/reference/7.17/removal-of-types.html# Note that in 7.0, _doc is a permanent part of the path, and represents the endpoint name rather than the document type.
# In Elasticsearch 7.0, each API will support typeless requests, and specifying a type will produce a deprecation warning.
搜索
"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
@email: zyooooxie@gmail.com
"""def connect_es_client(hosts: Union[str, list], auth: tuple):""":param hosts::param auth::return:"""client = Elasticsearch(hosts,sniff_on_start=True, # sniff before doing anythingsniff_on_node_failure=True, # refresh nodes after a node fails to respondrequest_timeout=60,http_auth=auth) # HTTP authentication uses the http_auth parameter by passing in a username and password within a tupleLog.error('连接-{}'.format(client))return clientdef close_es_client(client: Elasticsearch):""":param client::return:"""client.close()Log.error('断开连接')def _es_search(index_str: str, client: Elasticsearch,size_: int = 10000, from_: int = 0,sort_: Union[str, dict] = {"seq": {"order": "desc"}},get_more_10000: bool = False,**kwargs):""":param index_str::param client::param size_::param from_::param sort_: query 传值是 {"seq": {"order": "desc"}} ; body 是 'seq:desc';:param get_more_10000:是否查询超过10000条的数据:param kwargs: 不建议使用 body传参;查全部时,啥都不传;:return:"""# 索引不存在时,返回值是 Noneif not client.indices.exists(index=index_str):return None# from + size must be less than or equal to: [10000]assert size_ + from_ <= 10000# # ✅ New usage:# es.search(query={...})## # ❌ Deprecated usage:# es.search(body={"query": {...}})Log.debug(locals())# search() 的 from: Defaults to 0. size: Defaults to 10.# 但有时候为了查出来所有数据,size 默认给 最大值10000,from 默认给0;res = client.search(index=index_str, size=size_, from_=from_, sort=sort_, **kwargs)total = res.get('hits').get('total').get('value')Log.info(f'total:{total}')hits_len = len(res.get('hits').get('hits'))Log.info(f'hits有:{hits_len}条')result = _search_10000(hits_len=hits_len, first_search_result=res, locals_=locals(),client=client, first_search_size=size_, get_more_10000=get_more_10000)Log.info(result[-10:])Log.info(f'search返回的结果有:{len(result)}条')return resultdef _search_10000(client: Elasticsearch, hits_len: int, first_search_result: dict, locals_: dict,first_search_size: int,get_more_10000: bool = False):""":param client::param hits_len::param first_search_result::param locals_::param first_search_size::param get_more_10000::return:"""if hits_len < first_search_size or not get_more_10000:if hits_len:return first_search_result.get('hits').get('hits')else:return []else:return __search_10000_get_result(client=client, locals_=locals_)def __search_10000_get_result(client: Elasticsearch, locals_: dict):""":param client::param locals_::return:"""from xxx_use.common_functions import compare_dict_keyone_choice = random.getrandbits(2)Log.info(one_choice)if not one_choice:Log.info('scroll + scan')scroll_list = __scroll(client=client, locals_=locals_)scan_list = __scan(client=client, locals_=locals_)# 很多时候 因为sort值不同scroll_list = __change_before_compare(scroll_list)scan_list = __change_before_compare(scan_list)compare_dict_key(scroll_list, scan_list, assert_0=True)compare_dict_key(scan_list, scroll_list, assert_0=True)return scroll_listelif one_choice == 1:Log.info('scroll')return __scroll(client=client, locals_=locals_)elif one_choice == 2:Log.info('scan')return __scan(client=client, locals_=locals_)else:# return __limit(client=client, locals_=locals_)# 不推荐Log.info('指定seq范围 【自己造的假数据 确保 每条都有seq】')limit_list = __limit(client=client, locals_=locals_)scan_list = __scan(client=client, locals_=locals_)limit_list = __change_before_compare(limit_list)scan_list = __change_before_compare(scan_list)compare_dict_key(limit_list, scan_list, assert_0=True)compare_dict_key(scan_list, limit_list, assert_0=True)return limit_listdef __change_before_compare(result_list: list):"""scroll + scan 结果比较前 对数据做个统一:param result_list::return:"""for rl in result_list:# 每个结果还有一个 _score ,它衡量了文档与查询的匹配程度。默认情况下,首先返回最相关的文档结果,就是说,返回的文档是按照 _score 降序排列的。rl.pop('sort', '不存在key')rl.pop('_score', '不存在key')return result_listdef __scan(client: Elasticsearch, locals_: dict):""":param client::param locals_::return:"""# https://elasticsearch-py.readthedocs.io/en/v7.17.0/helpers.html#scanfrom elasticsearch.helpers import scan# query 要传的是 body for the search() api# query={"query": {"match": {"blog": "zyooooxie"}}}result = scan(client=client, index=locals_.get('index_str'), query=locals_.get('kwargs'),size=5000,scroll="3m") # Any additional keyword arguments will be passed to the initial search() callLog.info(f'{result}, {type(result)}')res = [gr for gr in result]Log.info(len(res))return resdef __scroll(client: Elasticsearch, locals_: dict):""":param client::param locals_::return:"""# https://elasticsearch-py.readthedocs.io/en/v7.17.0/api.html#elasticsearch.Elasticsearch.scrollscroll_time = '3m'search_res = client.search(index=locals_.get('index_str'), scroll=scroll_time,query=locals_.get('kwargs').get('query'),size=5000,sort=['_doc'])scroll_id = search_res.get('_scroll_id')Log.info(scroll_id)total = search_res.get('hits').get('total').get('value')Log.info(f'总共有{total}条')res = search_res.get('hits').get('hits')while True:scroll_res = client.scroll(scroll_id=scroll_id, scroll=scroll_time)scroll_id = scroll_res.get('_scroll_id')data = scroll_res.get('hits').get('hits')res.extend(data)if not data:breakassert total == len(res)# Search context are automatically removed when the scroll timeout has been exceeded.# 手动清理,using the clear-scroll APIclear_res = client.clear_scroll(scroll_id=scroll_id)Log.info(clear_res)return resdef __limit(client: Elasticsearch, locals_: dict):""":param client::param locals_::return:"""seq_max: int = get_seq_max(client=client, index_str=locals_.get('index_str'))query = locals_.get('kwargs').get('query')search_size = 10000 # search的传参 取最大limit_size = 5000 # 查询时 以seq排序,每次取的长度assert limit_size <= search_sizeres = list()for i in range(math.ceil(seq_max / limit_size)):query_new = {'bool': {'must': [query,{'range': {'seq': {'gt': limit_size * i, 'lte': limit_size * (i + 1)}}} # gt、lte]}}# Log.info(query_new)search_res = client.search(index=locals_.get('index_str'),query=query_new,size=search_size)data = search_res.get('hits').get('hits')res.extend(data)else:Log.info(len(res))return res
本文链接:https://blog.csdn.net/zyooooxie/article/details/109588072
个人博客 https://blog.csdn.net/zyooooxie