Python 并发编程实战:优雅地使用 concurrent.futures

news/2024/12/25 0:38:05/文章来源:https://www.cnblogs.com/piperliu/p/18628980

在 Python 多线程编程中,concurrent.futures 模块提供了一个高层的接口来异步执行可调用对象。今天,我们将通过一个循序渐进的案例,深入了解如何使用这个强大的工具。

从一个模拟场景开始

假设我们需要处理一批网络请求。为了模拟这个场景,我们使用 sleep 来代表耗时操作:

import time
import randomdef slow_operation(task_id):"""模拟一个耗时的网络请求"""sleep_time = random.uniform(0.5, 2)time.sleep(sleep_time)return f"Task {task_id} completed in {sleep_time:.2f} seconds"# 串行处理
def process_serial():start = time.perf_counter()results = []for i in range(10):result = slow_operation(i)results.append(result)end = time.perf_counter()print(f"串行处理总耗时:{end - start:.2f} 秒")return results# 运行示例
if __name__ == '__main__':results = process_serial()for r in results:print(r)
串行处理总耗时:11.75 秒
Task 0 completed in 1.27 seconds
Task 1 completed in 1.10 seconds
Task 2 completed in 1.35 seconds
Task 3 completed in 1.36 seconds
Task 4 completed in 1.42 seconds
Task 5 completed in 1.55 seconds
Task 6 completed in 0.74 seconds
Task 7 completed in 0.55 seconds
Task 8 completed in 1.40 seconds
Task 9 completed in 0.97 seconds

运行这段代码,你会发现处理 10 个任务需要大约 10-15 秒。这显然不够高效。

使用传统的 threading 模块

让我们先看看使用传统的 threading 模块如何改进:

import threading
from queue import Queuedef slow_operation(task_id):"""模拟一个耗时的网络请求"""sleep_time = random.uniform(0.5, 2)time.sleep(sleep_time)return f"Task {task_id} completed in {sleep_time:.2f} seconds"def process_threading():start = time.perf_counter()results = []work_queue = Queue()lock = threading.Lock()# 填充工作队列for i in range(10):work_queue.put(i)def worker():while True:try:task_id = work_queue.get_nowait()result = slow_operation(task_id)with lock:results.append(result)work_queue.task_done()except Queue.Empty:breakthreads = []for _ in range(4):  # 使用4个线程t = threading.Thread(target=worker)t.start()threads.append(t)for t in threads:t.join()end = time.perf_counter()print(f"多线程处理总耗时:{end - start:.2f} 秒")return results
多线程处理总耗时:3.24 秒

这个版本使用了多线程,性能确实提升了,但代码比较复杂,需要手动管理线程、锁和队列。

concurrent.futures 的优雅解决方案

现在,让我们看看如何使用 concurrent.futures 来简化代码:

import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completeddef slow_operation(task_id):"""模拟一个耗时的网络请求"""sleep_time = random.uniform(0.5, 2)time.sleep(sleep_time)return f"Task {task_id} completed in {sleep_time:.2f} seconds"def process_concurrent():start = time.perf_counter()results = []# 创建线程池,设置最大线程数为4with ThreadPoolExecutor(max_workers=4) as executor:# 提交任务到线程池future_to_id = {executor.submit(slow_operation, i): i for i in range(10)}# 获取结果for future in as_completed(future_to_id):results.append(future.result())end = time.perf_counter()print(f"concurrent.futures 处理总耗时:{end - start:.2f} 秒")return resultsprocess_concurrent()
concurrent.futures 处理总耗时:3.54 秒

这里我们用到了几个关键概念:

  1. ThreadPoolExecutor :线程池执行器,用于管理一组工作线程。创建时可以指定最大线程数。

  2. executor.submit() :向线程池提交一个任务。返回 Future 对象,代表将来某个时刻会完成的操作。

  3. as_completed() :返回一个迭代器,在 Future 完成时产生对应的 Future 对象。这意味着结果是按照完成顺序而不是提交顺序返回的。

Future 对象的高级用法

Future 对象提供了多个有用的方法,让我们通过实例来了解:

import time
import random
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETEDdef slow_operation(task_id):"""模拟一个耗时的网络请求"""sleep_time = random.uniform(0.5, 2)time.sleep(sleep_time)return f"Task {task_id} completed in {sleep_time:.2f} seconds"def demonstrate_future_features():with ThreadPoolExecutor(max_workers=4) as executor:# 提交任务并获取 Future 对象futures = [executor.submit(slow_operation, i) for i in range(10)]# 1. done() 检查任务是否完成print("检查第一个任务是否完成:", futures[0].done())# 2. 使用 wait() 等待部分任务完成done, not_done = wait(futures, return_when=FIRST_COMPLETED)print(f"完成的任务数: {len(done)}, 未完成的任务数: {len(not_done)}")# 3. 获取结果时设置超时try:result = futures[0].result(timeout=1.0)print("获取到结果:", result)except TimeoutError:print("获取结果超时")# 4. cancel() 取消未开始的任务for f in not_done:cancelled = f.cancel()print(f"取消任务: {'成功' if cancelled else '失败'}")demonstrate_future_features()
检查第一个任务是否完成: False
完成的任务数: 1, 未完成的任务数: 9
获取到结果: Task 0 completed in 1.07 seconds
取消任务: 失败
取消任务: 成功
取消任务: 成功
取消任务: 失败
取消任务: 失败
取消任务: 失败
取消任务: 失败
取消任务: 成功
取消任务: 失败

线程/进程池还是异步 IO?

IO 密集型任务:优先选择 asyncio

为什么选择 asyncio

  1. 更低的资源开销asyncio 使用协程,不需要创建额外的线程或进程
  2. 更高的并发量:单线程可以轻松处理数千个并发任务
  3. 没有 GIL 的限制:协程在单线程内切换,完全规避了 GIL 的影响

让我们通过一个网络请求的例子来对比:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor# 模拟网络请求
def sync_request(url):time.sleep(1)  # 模拟网络延迟return f"Response from {url}"async def async_request(url):await asyncio.sleep(1)  # 模拟网络延迟return f"Response from {url}"# 使用线程池
def thread_pool_example():urls = [f"http://example.com/{i}" for i in range(100)]start = time.perf_counter()with ThreadPoolExecutor(max_workers=20) as executor:results = list(executor.map(sync_request, urls))end = time.perf_counter()print(f"ThreadPoolExecutor 耗时: {end - start:.2f} 秒")return results# 使用 asyncio
async def asyncio_example():urls = [f"http://example.com/{i}" for i in range(100)]start = time.perf_counter()tasks = [async_request(url) for url in urls]results = await asyncio.gather(*tasks)end = time.perf_counter()print(f"asyncio 耗时: {end - start:.2f} 秒")return resultsif __name__ == '__main__':# 运行线程池版本thread_results = thread_pool_example()# 运行 asyncio 版本asyncio_results = asyncio.run(asyncio_example())
ThreadPoolExecutor 耗时: 5.03 秒
asyncio 耗时: 1.00 秒

在这个例子中, asyncio 版本通常会表现出更好的性能,尤其是在并发量大的情况下。

CPU 密集型任务:使用 ProcessPoolExecutor

为什么选择多进程?

  1. 绕过 GIL:每个进程都有自己的 Python 解释器和 GIL
  2. 充分利用多核性能:可以真正实现并行计算
  3. 适合计算密集型任务:如数据处理、图像处理等

来看一个计算密集型任务的对比:

import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutordef cpu_intensive_task(n):"""计算密集型任务:计算大量浮点数运算"""result = 0for i in range(n):result += i ** 2 / 3.14return resultdef compare_performance():numbers = [10**6] * 20  # 20个大规模计算任务# 使用线程池start = time.perf_counter()with ThreadPoolExecutor(max_workers=4) as executor:thread_results = list(executor.map(cpu_intensive_task, numbers))thread_time = time.perf_counter() - startprint(f"线程池耗时: {thread_time:.2f} 秒")# 使用进程池start = time.perf_counter()with ProcessPoolExecutor(max_workers=4) as executor:process_results = list(executor.map(cpu_intensive_task, numbers))process_time = time.perf_counter() - startprint(f"进程池耗时: {process_time:.2f} 秒")if __name__ == '__main__':compare_performance()
线程池耗时: 4.61 秒
进程池耗时: 1.34 秒

在这种场景下, ProcessPoolExecutor 的性能明显优于 ThreadPoolExecutor

混合型任务:ThreadPoolExecutor 的优势

为什么有时候选择线程池?

  1. 更容易与现有代码集成:大多数 Python 库都是基于同步设计的
  2. 资源开销比进程池小:线程共享内存空间
  3. 适合 IO 和 CPU 混合的场景:当任务既有 IO 操作又有计算时

示例场景:

import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutordef mixed_task(task_id):"""混合型任务:既有 IO 操作又有计算"""# IO 操作time.sleep(0.5)# CPU 计算result = sum(i * i for i in range(10**5))# 再次 IO 操作time.sleep(0.5)return f"Task {task_id}: {result}"def demonstrate_mixed_workload():tasks = range(10)# 使用线程池start = time.perf_counter()with ThreadPoolExecutor(max_workers=4) as executor:thread_results = list(executor.map(mixed_task, tasks))thread_time = time.perf_counter() - startprint(f"线程池处理混合任务耗时: {thread_time:.2f} 秒")# 使用进程池start = time.perf_counter()with ProcessPoolExecutor(max_workers=4) as executor:process_results = list(executor.map(mixed_task, tasks))process_time = time.perf_counter() - startprint(f"进程池处理混合任务耗时: {process_time:.2f} 秒")if __name__ == '__main__':demonstrate_mixed_workload()
线程池处理混合任务耗时: 3.05 秒
进程池处理混合任务耗时: 3.11 秒

选择建议的决策树

在选择并发方案时,可以参考以下决策流程:

  1. 首先判断任务类型

    • 如果是纯 IO 密集型(网络请求、文件操作),优先选择 asyncio
    • 如果是纯 CPU 密集型(大量计算),优先选择 ProcessPoolExecutor
    • 如果是混合型任务,考虑使用 ThreadPoolExecutor
  2. 考虑其他因素

    • 现有代码是否易于改造为异步?
    • 是否需要与同步代码交互?
    • 并发量有多大?
    • 是否需要跨进程通信?
def choose_concurrency_model(task_type, concurrent_count,legacy_code=False,need_shared_memory=False):"""帮助选择并发模型的示例函数"""if task_type == "IO":if legacy_code or need_shared_memory:return "ThreadPoolExecutor"else:return "asyncio"elif task_type == "CPU":if need_shared_memory:return "ThreadPoolExecutor"else:return "ProcessPoolExecutor"else:  # mixedif concurrent_count > 1000:return "asyncio"else:return "ThreadPoolExecutor"

性能对比总结

方案 IO密集型 CPU密集型 混合型 资源开销 代码复杂度
asyncio 最佳 较差 最低 较高
ThreadPoolExecutor 较差 较好
ProcessPoolExecutor 一般 最佳 一般

总的来说,选择合适的并发方案需要综合考虑任务特性、性能需求、代码复杂度等多个因素。在实际应用中,有时候甚至可以混合使用多种方案,以达到最优的性能表现。

实用技巧总结

  1. 控制线程池大小
def demonstrate_pool_sizing():# CPU 核心数cpu_count = os.cpu_count()# IO 密集型任务,线程数可以设置为核心数的 1-4 倍io_bound_workers = cpu_count * 2# CPU 密集型任务,线程数不应超过核心数cpu_bound_workers = cpu_countprint(f"推荐的线程数:")print(f"IO 密集型任务:{io_bound_workers}")print(f"CPU 密集型任务:{cpu_bound_workers}")
  1. 批量提交任务
def demonstrate_batch_submit():with ThreadPoolExecutor(max_workers=4) as executor:results_ordered = list(executor.map(slow_operation, range(5)))futures = [executor.submit(slow_operation, i) for i in range(5)]results_completion = [f.result() for f in as_completed(futures)]return results_ordered, results_completion
  1. 错误处理
def demonstrate_error_handling():def faulty_operation(task_id):if task_id == 3:raise ValueError(f"Task {task_id} failed")return slow_operation(task_id)with ThreadPoolExecutor(max_workers=4) as executor:futures = [executor.submit(faulty_operation, i) for i in range(5)]for future in as_completed(futures):try:result = future.result()print(f"成功:{result}")except Exception as e:print(f"错误:{str(e)}")

总结

concurrent.futures 模块为 Python 并发编程提供了一个优雅的高级接口。相比传统的 threading / multiprocessing 模块,它具有以下优势:

  1. 使用线程池自动管理线程的生命周期
  2. 提供简洁的接口提交任务和获取结果
  3. 支持超时和错误处理
  4. 代码更加 Pythonic 和易于维护

希望这篇文章能帮助你更好地理解和使用 Python 的并发编程工具!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/858438.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LangChain简单大模型应用

LangChain官方示例教程(Build a Simple LLM Application)。重新组织顺序及说明方式,更加适合新手阅读。LangChain官方示例教程(Build a Simple LLM Application):https://python.langchain.com/docs/tutorials/llm_chain/将该官方示例教程适当调整及优化依赖pip install lang…

Elasticsearch filter context 的实践案例

知识背景 在 ES 查询优化的建议里,很多时候为了避免算分逻辑和利用缓存逻辑,Elastic 会建议大家使用 filter 条件。 filter 的使用条件和原理具体可以参照之前写的博文《Elasticsearch filter context 的使用原理》 这里我们来研究 2 个实用案例,具体的感受一下 filter cont…

产品发展的六阶段

一个成熟的互联网产品从最初的创意到完全成熟,通常需要经历以下几个主要阶段。每个阶段都有特定的目标、核心任务和关键成果,以下是详细的解析。 一、创意阶段 1、创意阶段的核心目标 1.1 识别用户需求 创意阶段的首要任务是发现并明确目标用户的痛点和需求。通过市场调研、…

【unity]学习制作类银河恶魔城游戏-2-

导入新资产切割新资产切割完成修改大小和清晰度球体已经设置了刚体和碰撞体积,直接应用给人物,改名circle为player中心点问题 因为切割的原因,碰撞模型的中心点和人物的中心点不相吻合解决:在子路径下渲染人物图片,将二者的中心点手动对齐手动对齐保存更改更改碰撞体模型运…

DevNow x Notion

DevNow x Notion: DevNow 支持了 Notion 作为其文档系统,可以帮助用户在 Notion 更高效地管理文档,实现文档的集中管理和协作。前言 Notion 应该是目前用户量比较大的一个在线笔记软件,它的文档系统也非常完善,支持多种文档格式,如 Markdown、富文本、表格、公式等。 早期…

平安夜吃苹果

祝大家平安夜快乐 有一棵特殊的苹果树,一连 n 天,每天都可以长出若干个苹果。在第 i 天,树上会长出 apples[i] 个苹果,这些苹果将会在 days[i] 天后(也就是说,第 i + days[i] 天时)腐烂,变得无法食用。也可能有那么几天,树上不会长出新的苹果,此时用 apples[i] == 0 …

基于Three.js的大屏3D地图(一)

依赖安装 yarn add three yarn add @types/three yarn add d3-geothree库安装后在node_modules下其还包含核心three/src和插件three/example/jsm的源码,在开发调试时可以直接查阅。使用Three.js过程中会涉及到许多的类、方法及参数配置,所以建议安装@types/three库;不仅能提…

Java 变量和运算符

Java 变量和运算符1. 变量(Variable)1.1 何为变量 1.2 数据类型(Data Types)1.2.1 整型:byte、short、int、long 1.2.2 浮点类型:float、double 1.2.3 字符类型:char 1.2.4 布尔类型:boolean1.3 变量的使用1.3.1 步骤1:变量的声明 1.3.2 步骤2:变量的赋值1.4. 基本数…

k8s阶段10 k8s指标流水线, 自定义流水线和HPA

1 Kubernetes指标流水线 资源指标Kubernetes有一些依赖于指标数据的组件,例如HPA和VPA等Kubernetes使用Metrics API暴露系统指标给这些组件 #只暴露nodes和pods上的内存,CPU指标该API仅提供CPU和内存相关的指标数据负责支撑Metrics API、生成并提供指标数据的组件,成为核心指标…