问题场景:上传文件
调用上传文件接口,异步任务解析文件,解析中需要执行python代码,此时会出现阻塞
启动celery命令
celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q nltable
代码:
import logging
import time
import datetime
from celery import shared_task
from services.nltable.table_async_service import TableAsyncService
import asynciologger = logging.getLogger(__name__)@shared_task(queue='nltable')
def async_read_table_info_task(enterprise_id: str, table_id: str):"""异步添加读取表格信息任务到队列Usage: async_read_table_info_task.delay(enterprise_id, table_id)"""start_at = time.perf_counter()current_time = datetime.datetime.now()formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")logger.info(f"开始添加读取表格信息任务到队列, [{formatted_time}] enterprise_id: {enterprise_id}, table_id: {table_id} ")TableAsyncService.sync_update_table_infos(enterprise_id, table_id)end_at = time.perf_counter()logger.info(f"读取表格信息任务添加到队列成功, 耗时: {end_at - start_at:.2f}秒")
class TableAsyncService:@staticmethoddef sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:"""同步更新表格信息"""logger.info("start")from langchain_experimental.utilities import PythonREPLlogger.info("2222")python_repl = PythonREPL()logger.info("111")res = python_repl.run("print(1)")logger.info(res)
执行结果
[2024-12-25 18:17:32,517: INFO/MainProcess] Task tasks.read_table_tasks.async_read_table_info_task[92ae44b2-d791-4706-960a-477ef70206d3] received
[2024-12-25 18:17:32,518: INFO/MainProcess] 开始添加读取表格信息任务到队列, [2024-12-25 18:17:32] enterprise_id: 1750727272898039810, table_id: eb9d87a6-6bac-47ec-8e7d-5598259aa760
[2024-12-25 18:17:32,518: INFO/MainProcess] start
[2024-12-25 18:17:32,518: INFO/MainProcess] 2222
[2024-12-25 18:17:32,518: INFO/MainProcess] 111
[2024-12-25 18:17:32,519: WARNING/MainProcess] Python REPL can execute arbitrary code. Use with caution.
参考说明文档
https://docs.celeryq.dev/en/stable/userguide/application.html
@classmethoddef worker(cls,command: str,globals: Optional[Dict],locals: Optional[Dict],queue: multiprocessing.Queue,) -> None:old_stdout = sys.stdoutsys.stdout = mystdout = StringIO()try:logger.info("self.worker")cleaned_command = cls.sanitize_input(command)exec(cleaned_command, globals, locals)sys.stdout = old_stdoutlogger.info(f"sys.stdout {sys.stdout}")logger.info(f"{mystdout.getvalue()}")queue.put(mystdout.getvalue())logger.info(f"put")except Exception as e:sys.stdout = old_stdoutqueue.put(repr(e))def run(self, command: str, timeout: Optional[int] = None) -> str:"""Run command with own globals/locals and returns anything printed.Timeout after the specified number of seconds."""# Warn against dangers of PythonREPLwarn_once()queue: multiprocessing.Queue = multiprocessing.Queue()logger.info(f"langchain 123 {timeout} {queue}")# Only use multiprocessing if we are enforcing a timeoutif timeout is not None:# create a Processp = multiprocessing.Process(target=self.worker, args=(command, self.globals, self.locals, queue))# start itp.start()# wait for the process to finish or kill it after timeout secondsp.join(timeout)if p.is_alive():p.terminate()return "Execution timed out"else:self.worker(command, self.globals, self.locals, queue)# get the result from the worker functionlogger.info(f"queue.get {queue.get()}")return queue.get()
执行结果
[2024-12-26 09:29:57,229: INFO/MainProcess] langchain 123 None <multiprocessing.queues.Queue object at 0x0000021FB5C92BD0>
[2024-12-26 09:29:57,229: INFO/MainProcess] self.worker
[2024-12-26 09:29:57,230: INFO/MainProcess] sys.stdout <celery.utils.log.LoggingProxy object at 0x0000021FB5B37E50>
[2024-12-26 09:29:57,230: INFO/MainProcess] 1[2024-12-26 09:29:57,230: INFO/MainProcess] put
无法执行
logger.info(f"queue.get {queue.get()}")return queue.get()
- 尝试设置超时时间
res = python_repl.run("print(1)",timeout=10)logger.info(f"res {res}")
此时能执行run方法中的日志
[2024-12-26 09:57:00,765: INFO/MainProcess] queue.get 1
但return语句出现阻塞
- 测试去掉方法,直接在任务中执行
class TableAsyncService:@staticmethoddef sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:"""同步更新表格信息"""from langchain_experimental.utilities import PythonREPLfrom gevent.queue import Queuefrom gevent import spawnqueue: multiprocessing.Queue = multiprocessing.Queue()python_repl = PythonREPL()p = multiprocessing.Process(target=python_repl.worker, args=("print(1)", {}, {}, queue))# start itp.start()# wait for the process to finish or kill it after timeout secondsp.join(10)print(f"res {queue.get()}")
此时能够正常运行
- 进一步封装成方法
class TableAsyncService:@staticmethoddef run_command(command:str,queue: multiprocessing.Queue):from langchain_experimental.utilities import PythonREPLpython_repl = PythonREPL()p = multiprocessing.Process(target=python_repl.worker, args=("print(1)", {}, {}, queue))# start itp.start()# wait for the process to finish or kill it after timeout secondsp.join(10)print("return")return queue.get()@staticmethoddef sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:"""同步更新表格信息"""queue: multiprocessing.Queue = multiprocessing.Queue()res = TableAsyncService.run_command("print(1)",queue)print(f"res {res}")
此时也可以正常运行
其他方案:
尝试使用spawn+multiprocessing queue
class TableAsyncService:@staticmethoddef sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:"""同步更新表格信息"""from gevent import spawn# queue: multiprocessing.Queue = multiprocessing.Queue()# res = TableAsyncService.run_command("print(1)",queue)# print(f"res {res}")from langchain_experimental.utilities import PythonREPLpython_repl = PythonREPL()queue = multiprocessing.Queue()command = "print(1)"greelet = spawn(python_repl.worker,command,{},{},queue)greelet.join(10)result = queue.get()logger.info(f"res {result}")
能够正常运行
测试发现,如果queue在内置里初始化,return queue.get无法输出
两种方式的代码集合:
class TableAsyncService:@staticmethoddef run_command(command:str,queue: multiprocessing.Queue):from langchain_experimental.utilities import PythonREPLpython_repl = PythonREPL()p = multiprocessing.Process(target=python_repl.worker, args=(command, {}, {}, queue))# start itp.start()# wait for the process to finish or kill it after timeout secondsp.join(10)return queue.get()@staticmethoddef sync_update_table_infos(enterprise_id: str, table_id: str) -> Optional[Table]:"""同步更新表格信息"""# queue: multiprocessing.Queue = multiprocessing.Queue()# res = TableAsyncService.run_command("print(1)",queue)# print(f"res {res}")from langchain_experimental.utilities import PythonREPLfrom gevent import spawnpython_repl = PythonREPL()queue = multiprocessing.Queue()command = "print(1)"greelet = spawn(python_repl.worker,command,{},{},queue)greelet.join(10)result = queue.get()logger.info(f"res {result}")
import logging
import time
import datetime
from celery import shared_task
from services.nltable.table_async_service import TableAsyncServicelogger = logging.getLogger(__name__)@shared_task(queue='nltable')
def async_read_table_info_task(enterprise_id: str, table_id: str):"""异步添加读取表格信息任务到队列Usage: async_read_table_info_task.delay(enterprise_id, table_id)"""start_at = time.perf_counter()current_time = datetime.datetime.now()formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")logger.info(f"开始添加读取表格信息任务到队列, [{formatted_time}] enterprise_id: {enterprise_id}, table_id: {table_id} ")TableAsyncService.sync_update_table_infos(enterprise_id, table_id)end_at = time.perf_counter()logger.info(f"读取表格信息任务添加到队列成功, 耗时: {end_at - start_at:.2f}秒")