Python脚本之操作Elasticsearch【一】

本文为博主原创,未经授权,严禁转载及使用。
本文链接:https://blog.csdn.net/zyooooxie/article/details/109588072

前面刚写了 requests发请求 操作Elasticsearch - Search https://blog.csdn.net/zyooooxie/article/details/123730279,再来分享下 使用elasticsearch库 ;

【实际这篇博客推迟发布N个月】

个人博客:https://blog.csdn.net/zyooooxie

【以下所有内容仅为个人项目经历,如有不同,纯属正常】

Python Client

在这里插入图片描述

https://www.elastic.co/guide/en/elasticsearch/client/index.html

我使用的 是 7.17.0;

https://pypi.org/project/elasticsearch/7.17.0/

https://www.elastic.co/guide/en/elasticsearch/client/python-api/7.17/overview.html

https://elasticsearch-py.readthedocs.io/en/v7.17.0/index.html

"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
@email: zyooooxie@gmail.com
"""import time
import traceback
import sys
import json
import string
import math
import random
from typing import Optional, Union, List, Any
from user_log import Logfrom elasticsearch import Elasticsearch
from elasticsearch.helpers import BulkIndexErrorgl_es_host_new = 'http://1.1.1.1:1111'
gl_es_host_new_2 = ['http://1.1.1.1:1111', 'http://2.2.2.2:2222']# ``port`` needs to be an int.
gl_es_host_new_3 = [{'host': '2.2.2.2', 'port': 2222}]
gl_es_host_new_4 = [{'host': '2.2.2.2', 'port': 2222}, {'host': '1.1.1.1', 'port': 1111}]gl_es_auth = ('es_username', 'es_password')gl_type = '_doc'gl_search_dict = {'size': 100, 'from': 0, "sort": {"xxxXXX": {"order": "desc"}}}# pip install elasticsearch==7.17.0
# https://pypi.org/project/elasticsearch/7.17.0/
# https://www.elastic.co/guide/en/elasticsearch/reference/7.17/docs.html# https://elasticsearch-py.readthedocs.io/en/v7.17.0/api.html# doc_type 不建议使用 【Specifying types in requests is deprecated】
# https://www.elastic.co/guide/en/elasticsearch/reference/7.17/removal-of-types.html# Note that in 7.0, _doc is a permanent part of the path, and represents the endpoint name rather than the document type.
# In Elasticsearch 7.0, each API will support typeless requests, and specifying a type will produce a deprecation warning.

搜索

"""
@blog: https://blog.csdn.net/zyooooxie
@qq: 153132336
@email: zyooooxie@gmail.com
"""def connect_es_client(hosts: Union[str, list], auth: tuple):""":param hosts::param auth::return:"""client = Elasticsearch(hosts,sniff_on_start=True,  # sniff before doing anythingsniff_on_node_failure=True,  # refresh nodes after a node fails to respondrequest_timeout=60,http_auth=auth)  # HTTP authentication uses the http_auth parameter by passing in a username and password within a tupleLog.error('连接-{}'.format(client))return clientdef close_es_client(client: Elasticsearch):""":param client::return:"""client.close()Log.error('断开连接')def _es_search(index_str: str, client: Elasticsearch,size_: int = 10000, from_: int = 0,sort_: Union[str, dict] = {"seq": {"order": "desc"}},get_more_10000: bool = False,**kwargs):""":param index_str::param client::param size_::param from_::param sort_: query 传值是 {"seq": {"order": "desc"}} ; body 是 'seq:desc';:param get_more_10000:是否查询超过10000条的数据:param kwargs: 不建议使用 body传参;查全部时,啥都不传;:return:"""# 索引不存在时,返回值是 Noneif not client.indices.exists(index=index_str):return None# from + size must be less than or equal to: [10000]assert size_ + from_ <= 10000# # ✅ New usage:# es.search(query={...})## # ❌ Deprecated usage:# es.search(body={"query": {...}})Log.debug(locals())# search() 的 from: Defaults to 0.   size: Defaults to 10.# 但有时候为了查出来所有数据,size 默认给 最大值10000,from 默认给0;res = client.search(index=index_str, size=size_, from_=from_, sort=sort_, **kwargs)total = res.get('hits').get('total').get('value')Log.info(f'total:{total}')hits_len = len(res.get('hits').get('hits'))Log.info(f'hits有:{hits_len}条')result = _search_10000(hits_len=hits_len, first_search_result=res, locals_=locals(),client=client, first_search_size=size_, get_more_10000=get_more_10000)Log.info(result[-10:])Log.info(f'search返回的结果有:{len(result)}条')return resultdef _search_10000(client: Elasticsearch, hits_len: int, first_search_result: dict, locals_: dict,first_search_size: int,get_more_10000: bool = False):""":param client::param hits_len::param first_search_result::param locals_::param first_search_size::param get_more_10000::return:"""if hits_len < first_search_size or not get_more_10000:if hits_len:return first_search_result.get('hits').get('hits')else:return []else:return __search_10000_get_result(client=client, locals_=locals_)def __search_10000_get_result(client: Elasticsearch, locals_: dict):""":param client::param locals_::return:"""from xxx_use.common_functions import compare_dict_keyone_choice = random.getrandbits(2)Log.info(one_choice)if not one_choice:Log.info('scroll + scan')scroll_list = __scroll(client=client, locals_=locals_)scan_list = __scan(client=client, locals_=locals_)# 很多时候 因为sort值不同scroll_list = __change_before_compare(scroll_list)scan_list = __change_before_compare(scan_list)compare_dict_key(scroll_list, scan_list, assert_0=True)compare_dict_key(scan_list, scroll_list, assert_0=True)return scroll_listelif one_choice == 1:Log.info('scroll')return __scroll(client=client, locals_=locals_)elif one_choice == 2:Log.info('scan')return __scan(client=client, locals_=locals_)else:# return __limit(client=client, locals_=locals_)# 不推荐Log.info('指定seq范围 【自己造的假数据 确保 每条都有seq】')limit_list = __limit(client=client, locals_=locals_)scan_list = __scan(client=client, locals_=locals_)limit_list = __change_before_compare(limit_list)scan_list = __change_before_compare(scan_list)compare_dict_key(limit_list, scan_list, assert_0=True)compare_dict_key(scan_list, limit_list, assert_0=True)return limit_listdef __change_before_compare(result_list: list):"""scroll + scan 结果比较前 对数据做个统一:param result_list::return:"""for rl in result_list:# 每个结果还有一个 _score ,它衡量了文档与查询的匹配程度。默认情况下,首先返回最相关的文档结果,就是说,返回的文档是按照 _score 降序排列的。rl.pop('sort', '不存在key')rl.pop('_score', '不存在key')return result_listdef __scan(client: Elasticsearch, locals_: dict):""":param client::param locals_::return:"""# https://elasticsearch-py.readthedocs.io/en/v7.17.0/helpers.html#scanfrom elasticsearch.helpers import scan# query 要传的是 body for the search() api# query={"query": {"match": {"blog": "zyooooxie"}}}result = scan(client=client, index=locals_.get('index_str'), query=locals_.get('kwargs'),size=5000,scroll="3m")  # Any additional keyword arguments will be passed to the initial search() callLog.info(f'{result}, {type(result)}')res = [gr for gr in result]Log.info(len(res))return resdef __scroll(client: Elasticsearch, locals_: dict):""":param client::param locals_::return:"""# https://elasticsearch-py.readthedocs.io/en/v7.17.0/api.html#elasticsearch.Elasticsearch.scrollscroll_time = '3m'search_res = client.search(index=locals_.get('index_str'), scroll=scroll_time,query=locals_.get('kwargs').get('query'),size=5000,sort=['_doc'])scroll_id = search_res.get('_scroll_id')Log.info(scroll_id)total = search_res.get('hits').get('total').get('value')Log.info(f'总共有{total}条')res = search_res.get('hits').get('hits')while True:scroll_res = client.scroll(scroll_id=scroll_id, scroll=scroll_time)scroll_id = scroll_res.get('_scroll_id')data = scroll_res.get('hits').get('hits')res.extend(data)if not data:breakassert total == len(res)# Search context are automatically removed when the scroll timeout has been exceeded.# 手动清理,using the clear-scroll APIclear_res = client.clear_scroll(scroll_id=scroll_id)Log.info(clear_res)return resdef __limit(client: Elasticsearch, locals_: dict):""":param client::param locals_::return:"""seq_max: int = get_seq_max(client=client, index_str=locals_.get('index_str'))query = locals_.get('kwargs').get('query')search_size = 10000  # search的传参 取最大limit_size = 5000  # 查询时 以seq排序,每次取的长度assert limit_size <= search_sizeres = list()for i in range(math.ceil(seq_max / limit_size)):query_new = {'bool': {'must': [query,{'range': {'seq': {'gt': limit_size * i, 'lte': limit_size * (i + 1)}}}  # gt、lte]}}# Log.info(query_new)search_res = client.search(index=locals_.get('index_str'),query=query_new,size=search_size)data = search_res.get('hits').get('hits')res.extend(data)else:Log.info(len(res))return res

本文链接:https://blog.csdn.net/zyooooxie/article/details/109588072

个人博客 https://blog.csdn.net/zyooooxie

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/455117.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

关于RabbitMQ面试题汇总

什么是消息队列&#xff1f;消息队列有什么用&#xff1f; 消息队列是一种在应用程序之间传递消息的通信机制。它是一种典型的生产者-消费者模型&#xff0c;其中生产者负责生成消息并将其发送到队列中&#xff0c;而消费者则从队列中获取消息并进行处理。消息队列的主要目的是…

束集搜索(Beam search)

在seq2seq任务重&#xff0c;传统的获取decoder输出的结果过程中&#xff0c;在每一个时间步上&#xff0c;我们只选择概率最大的那个词&#xff0c;作为当前时间步的输出&#xff0c;即在每一个时间步上我们取到的都是最大概率的词。等到解码器获取到 <EOS> 词元结束循环…

环境配置:Ubuntu18.04 ROS Melodic安装

前言 不同版本的Ubuntu与ROS存在对应关系。 ROS作为目前最受欢迎的机器人操作系统&#xff0c;其核心代码采用C编写&#xff0c;并以BSD许可发布。ROS起源于2007年&#xff0c;是由斯坦福大学与机器人技术公司Willow Garage合作的Switchyard项目。2012年&#xff0c;ROS团队从…

MySQL事务原理的分析

1.事务 并发连接下考虑事务。 事务的本质是并发控制的单元&#xff0c;是用户定义的一个操作序列。这些操作要么都做&#xff0c;要么都不做&#xff0c;是一个不可分割的工作单位。 事务控制语句 ACID特性 原子性&#xff1a;要么都做&#xff0c;要走么都不做。在事务执…

计算机毕业设计 | SSM 医药信息管理系统(附源码)

1&#xff0c; 概述 1.1 课题背景 本系统由说书客面向广大民营药店、县区级医院、个体诊所等群体的药品和客户等信息的管理需求&#xff0c;采用SpringSpringMVCMybatisEasyui架构实现&#xff0c;为单体药店、批发企业、零售连锁企业&#xff0c;提供有针对性的信息数据管理…

第十四篇【传奇开心果系列】Python的OpenCV库技术点案例示例:图像特征提取与描述

传奇开心果短博文系列 系列短博文目录Python的OpenCV库技术点案例示例系列短博文目录前言一、OpenCV图像特征提取与描述介绍二、OpenCV图像特征提取与描述初步示例代码三、扩展思路介绍四、特征点筛选和匹配优化示例代码五、多尺度特征提取示例代码六、非局部特征描述子示例代码…

七、类与对象

文章目录 类与对象1.1 自定义类1.2 第一个类1.3 private变量1.4 变量默认值1.5 构造方法1.6 类和对象的生命周期 类与对象 本文为书籍《Java编程的逻辑》1和《剑指Java&#xff1a;核心原理与应用实践》2阅读笔记 将客观世界中存在的一切可以描述的事物称为对象&#xff08;实…

Unity3D判断屏幕中某个坐标点的位置是否在指定UI区域内

系列文章目录 unity工具 文章目录 系列文章目录前言一、使用rect.Contains()判断1-1、转换坐标1-2、代码如下&#xff1a;1-3、注意事项1-3、测试效果如下 二、使用坐标计算在不在区域内2-1、方法如下&#xff1a;2-2、注意事项 三、使用RectTransformUtility.ScreenPointToLo…

【PTA函数题】6-2 约瑟夫环之循环链表

n个人围成一圈&#xff08;编号依次为&#xff1a;0,1,2...n-1&#xff09;,从第一个人开始报数&#xff0c;1&#xff0c;2&#xff0c;……数到m者出列&#xff0c;再从下一个开始重新报数&#xff0c;数到m者再出列……。 下面的程序中&#xff0c;用不带附加表头的循环单链…

在 MacOS 上虚拟化 x86Linux 的最佳方法(通过 Rosetta)

categories: [VM] tags: MacOS VM 写在前面 买了 ARM 的 mac, 就注定了要折腾一下虚拟机了… 之前写过一篇文章是通过 utm 虚拟化archlinux, 其实本质上还是调用了 qemu-system-x86_64, 所以速度并不快, 后来想着能不能借用 Rosetta 的优势即原生转译, 来虚拟化 Intel 的 Linu…

正点原子-STM32定时器学习笔记(1)未完待续

1. 通用定时器简介&#xff08;F1为例&#xff09; F1系列通用定时器有4个&#xff0c;TIM2/TIM3/TIM4/TIM5 主要特性&#xff1a; 16位递增、递减、中心对齐计数器&#xff08;计数值&#xff1a;0~65535&#xff09;&#xff1b; 16位预分频器&#xff08;分频系数&#xff…

Mybatis中的sql-xml延迟加载机制

Mybatis中的sql-xml延迟加载机制 hi&#xff0c;我是阿昌&#xff0c;今天记录一下关于Mybatis中的sql-xml延迟加载机制 一、前言 首先mybatis技术本身就不多介绍&#xff0c;说延迟加载机制之前&#xff0c;那要先知道2个概念&#xff1a; 主查询对象关联对象 假设咱们现…