flask 异步任务celery中运行ipython或python repl出现阻塞

news/2024/12/26 11:13:51/文章来源:https://www.cnblogs.com/Gimm/p/18631651

问题场景:上传文件

调用上传文件接口,异步任务解析文件,解析中需要执行python代码,此时会出现阻塞

启动celery命令

 celery -A app.celery worker -P gevent -c 1 --loglevel INFO   -Q nltable 

代码:

import logging
import time
import datetime
from celery import shared_task
from services.nltable.table_async_service import TableAsyncService
import asynciologger = logging.getLogger(__name__)@shared_task(queue='nltable')
def async_read_table_info_task(enterprise_id: str, table_id: str):"""异步添加读取表格信息任务到队列Usage: async_read_table_info_task.delay(enterprise_id, table_id)"""start_at = time.perf_counter()current_time = datetime.datetime.now()formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")logger.info(f"开始添加读取表格信息任务到队列, [{formatted_time}] enterprise_id: {enterprise_id}, table_id: {table_id} ")TableAsyncService.sync_update_table_infos(enterprise_id, table_id)end_at = time.perf_counter()logger.info(f"读取表格信息任务添加到队列成功, 耗时: {end_at - start_at:.2f}秒")
class TableAsyncService:@staticmethoddef sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:"""同步更新表格信息"""logger.info("start")from langchain_experimental.utilities import PythonREPLlogger.info("2222")python_repl = PythonREPL()logger.info("111")res = python_repl.run("print(1)")logger.info(res)

执行结果

[2024-12-25 18:17:32,517: INFO/MainProcess] Task tasks.read_table_tasks.async_read_table_info_task[92ae44b2-d791-4706-960a-477ef70206d3] received
[2024-12-25 18:17:32,518: INFO/MainProcess] 开始添加读取表格信息任务到队列, [2024-12-25 18:17:32] enterprise_id: 1750727272898039810, table_id: eb9d87a6-6bac-47ec-8e7d-5598259aa760    
[2024-12-25 18:17:32,518: INFO/MainProcess] start
[2024-12-25 18:17:32,518: INFO/MainProcess] 2222
[2024-12-25 18:17:32,518: INFO/MainProcess] 111
[2024-12-25 18:17:32,519: WARNING/MainProcess] Python REPL can execute arbitrary code. Use with caution.

参考说明文档
https://docs.celeryq.dev/en/stable/userguide/application.html

    @classmethoddef worker(cls,command: str,globals: Optional[Dict],locals: Optional[Dict],queue: multiprocessing.Queue,) -> None:old_stdout = sys.stdoutsys.stdout = mystdout = StringIO()try:logger.info("self.worker")cleaned_command = cls.sanitize_input(command)exec(cleaned_command, globals, locals)sys.stdout = old_stdoutlogger.info(f"sys.stdout {sys.stdout}")logger.info(f"{mystdout.getvalue()}")queue.put(mystdout.getvalue())logger.info(f"put")except Exception as e:sys.stdout = old_stdoutqueue.put(repr(e))def run(self, command: str, timeout: Optional[int] = None) -> str:"""Run command with own globals/locals and returns anything printed.Timeout after the specified number of seconds."""# Warn against dangers of PythonREPLwarn_once()queue: multiprocessing.Queue = multiprocessing.Queue()logger.info(f"langchain 123 {timeout}  {queue}")# Only use multiprocessing if we are enforcing a timeoutif timeout is not None:# create a Processp = multiprocessing.Process(target=self.worker, args=(command, self.globals, self.locals, queue))# start itp.start()# wait for the process to finish or kill it after timeout secondsp.join(timeout)if p.is_alive():p.terminate()return "Execution timed out"else:self.worker(command, self.globals, self.locals, queue)# get the result from the worker functionlogger.info(f"queue.get {queue.get()}")return queue.get()

执行结果

[2024-12-26 09:29:57,229: INFO/MainProcess] langchain 123 None  <multiprocessing.queues.Queue object at 0x0000021FB5C92BD0>
[2024-12-26 09:29:57,229: INFO/MainProcess] self.worker
[2024-12-26 09:29:57,230: INFO/MainProcess] sys.stdout <celery.utils.log.LoggingProxy object at 0x0000021FB5B37E50>
[2024-12-26 09:29:57,230: INFO/MainProcess] 1[2024-12-26 09:29:57,230: INFO/MainProcess] put

无法执行

        logger.info(f"queue.get {queue.get()}")return queue.get()
  1. 尝试设置超时时间
        res = python_repl.run("print(1)",timeout=10)logger.info(f"res {res}")

此时能执行run方法中的日志

[2024-12-26 09:57:00,765: INFO/MainProcess] queue.get 1

但return语句出现阻塞

  1. 测试去掉方法,直接在任务中执行

class TableAsyncService:@staticmethoddef sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:"""同步更新表格信息"""from langchain_experimental.utilities import PythonREPLfrom gevent.queue import Queuefrom gevent import spawnqueue: multiprocessing.Queue = multiprocessing.Queue()python_repl = PythonREPL()p = multiprocessing.Process(target=python_repl.worker, args=("print(1)", {}, {}, queue))# start itp.start()# wait for the process to finish or kill it after timeout secondsp.join(10)print(f"res {queue.get()}")

此时能够正常运行

  1. 进一步封装成方法
class TableAsyncService:@staticmethoddef run_command(command:str,queue: multiprocessing.Queue):from langchain_experimental.utilities import PythonREPLpython_repl = PythonREPL()p = multiprocessing.Process(target=python_repl.worker, args=("print(1)", {}, {}, queue))# start itp.start()# wait for the process to finish or kill it after timeout secondsp.join(10)print("return")return queue.get()@staticmethoddef sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:"""同步更新表格信息"""queue: multiprocessing.Queue = multiprocessing.Queue()res = TableAsyncService.run_command("print(1)",queue)print(f"res {res}")

此时也可以正常运行

其他方案:
尝试使用spawn+multiprocessing queue

class TableAsyncService:@staticmethoddef sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:"""同步更新表格信息"""from gevent import spawn# queue: multiprocessing.Queue = multiprocessing.Queue()# res = TableAsyncService.run_command("print(1)",queue)# print(f"res {res}")from langchain_experimental.utilities import PythonREPLpython_repl = PythonREPL()queue = multiprocessing.Queue()command = "print(1)"greelet = spawn(python_repl.worker,command,{},{},queue)greelet.join(10)result = queue.get()logger.info(f"res {result}")

能够正常运行

测试发现,如果queue在内置里初始化,return queue.get无法输出

两种方式的代码集合:

class TableAsyncService:@staticmethoddef run_command(command:str,queue: multiprocessing.Queue):from langchain_experimental.utilities import PythonREPLpython_repl = PythonREPL()p = multiprocessing.Process(target=python_repl.worker, args=(command, {}, {}, queue))# start itp.start()# wait for the process to finish or kill it after timeout secondsp.join(10)return queue.get()@staticmethoddef sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:"""同步更新表格信息"""# queue: multiprocessing.Queue = multiprocessing.Queue()# res = TableAsyncService.run_command("print(1)",queue)# print(f"res {res}")from langchain_experimental.utilities import PythonREPLfrom gevent import spawnpython_repl = PythonREPL()queue = multiprocessing.Queue()command = "print(1)"greelet = spawn(python_repl.worker,command,{},{},queue)greelet.join(10)result = queue.get()logger.info(f"res {result}")
import logging
import time
import datetime
from celery import shared_task
from services.nltable.table_async_service import TableAsyncServicelogger = logging.getLogger(__name__)@shared_task(queue='nltable')
def async_read_table_info_task(enterprise_id: str, table_id: str):"""异步添加读取表格信息任务到队列Usage: async_read_table_info_task.delay(enterprise_id, table_id)"""start_at = time.perf_counter()current_time = datetime.datetime.now()formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")logger.info(f"开始添加读取表格信息任务到队列, [{formatted_time}] enterprise_id: {enterprise_id}, table_id: {table_id} ")TableAsyncService.sync_update_table_infos(enterprise_id, table_id)end_at = time.perf_counter()logger.info(f"读取表格信息任务添加到队列成功, 耗时: {end_at - start_at:.2f}秒")

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/859238.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

HuntBack(反击狩猎):用于攻防演练中,防守方对恶意ip进行web指纹扫描与识别

#溯源 #攻防演练 HuntBack(反击狩猎),用于攻防演练中,防守方对恶意ip进行web指纹扫描与识别 应用场景 在蓝队职守中,安全设备爆出恶意攻击ip地址,如果对方使用的是自己的服务器,并且搭建了一些安全业务,可使用本工具对目前已知工具进行探测。 功能 1.红队指纹识别 2.ipwh…

代码随想录——贪心23监控二叉树

思路 这道题目首先要想,如何放置,才能让摄像头最小的呢? 从题目中示例,其实可以得到启发,我们发现题目示例中的摄像头都没有放在叶子节点上! 这是很重要的一个线索,摄像头可以覆盖上中下三层,如果把摄像头放在叶子节点上,就浪费的一层的覆盖。 所以把摄像头放在叶子节…

CH585 CH584 I2C时钟配置(超1MHz 最高1.8MHz)

I2C的R16_I2C_CTRL2寄存器描述:[5:0] FREQ :允许的范围在2~36MHz之间。必须设置在000010b 到100100b 之间RB_I2C_FREQ决定SCL的建立时间和SDA的保持时间,理论上I2C 频率可以达到一分频,实际频率可在36MHz之上,可使用最高80MHz。 I2C的R16_I2C_CKCFGR寄存器描述:[11:0] CC…

JMeter JDBC 请求实战宝典

《JMeter JDBC 请求实战宝典》 宝子们,今天咱就来唠唠 JMeter 里超厉害的 JDBC 请求,这玩意儿就像是数据库世界的神奇魔杖,能帮咱把数据库里的各种秘密(数据)都挖出来,还能对这些数据进行各种操作,不管是查查看、改一改,还是加点新东西、删点旧东西,它都能轻松搞定,而…

STM32-I2C软件模拟

1.I2C介绍 I2C是一种多主机、两线制、低速串行通信总线,广泛用于微控制器和各种外围设备之间的通信。它使用两条线路:串行数据线(SDA)和串行时钟线(SCL)进行双向传输。2.时序启动条件:SCL高电平时、SDA由高电平变为低电平 停止条件:SCL高电平时、SDA由低电平变为高电平…

系统攻防-WindowsLinux远程探针本地自检任意代码执行权限提升入口点

Windows&Linux&远程探针&本地自检&任意代码执行&权限提升&入口点知识点: 1、远程漏扫-Nessus&Nexpose&Goby 2、本地漏扫(提权)-Wesng&Tiquan&Suggester 3、利用场景-远程利用&本地利用&利用条件一、演示案例-操作系统-远程漏…

新能源汽车门店管理:项目管理工具的高效运用策略

使用项目管理工具进行新能源汽车门店管理可以显著提高门店的运营效率和团队协作能力。通过明确项目目标和需求、选择合适的项目管理工具、实施项目管理以及持续优化与改进等步骤,新能源汽车门店可以更好地应对市场变化,提升竞争力。使用项目管理工具进行新能源汽车门店管理,…

C5GAME 游戏饰品交易平台借助 RocketMQ Serverless 保障千万级玩家流畅体验

C5GAME 通过采用云消息队列 RocketMQ 版 Serverless 系列,有效解决了现有架构中存在的性能瓶颈,极大增强了交易系统的灵活性和稳定性,有效实现了流量的削峰填谷,显著提升了整体运维效率,确保了千万级玩家能够享受到流畅的游戏交易体验。作者:邹星宇、刘尧 C5GAME:安全便…

SQL语言1-MySQL

1. SQL语言 1.1 关系型数据库的常见组件数据库:database 表的集合,物理上表现为一个目录 表:table,行:row 列:column 索引:index 视图:view,虚拟的表 存储过程:procedure 存储函数:function 触发器:trigger 事件调度器:event scheduler,任务计划 用户:user 权限…

虚引用

当被应用的强引用失去后,强引用关联的这个引用也就是虚引用相关的分配就会被回收以ThreadLocal<T>为例1.进行塞值 public void set(T value) { Thread t = Thread.currentThread(); ThreadLocalMap map = getMap(t); if (map != null) { …

容器环境的MySQL、canal、Elasticsearch数据同步测试

回顾一次容器环境的MySQL、canal、Elasticsearch数据同步 MySQL和Elasticsearch安装初始化就不展示了,版本如下:sql表关键字段如下: CREATE TABLE `fault_code` (`title` varchar(255) CHARACTER SET utf8mb4 DEFAULT NULL,`description` varchar(512) CHARACTER SET utf8mb…

从全球到本地:跨境电商如何提升供应链管理效率

一、引言 随着全球化的推进与互联网技术的飞速发展,跨境电商已成为全球贸易的重要组成部分。跨境电商平台通过缩短国际贸易的链条,打破了传统贸易壁垒,使消费者能够方便快捷地购买来自世界各地的商品。然而,跨境电商的成功不仅仅依赖于商品的丰富性和价格竞争力,背后更为复…