Python基础之多进程

news/2024/7/7 7:35:22/文章来源:https://www.cnblogs.com/jingzh/p/18276358

目录
  • 1 多进程
    • 1.1 简介
    • 1.2 Linux下多进程
    • 1.3 multiprocessing
    • 1.4 Pool
    • 1.5 进程间通信
    • 1.6 分布式进程

1 多进程

1.1 简介

要让Python程序实现多进程(multiprocessing),我们先了解操作系统的相关知识。
Unix/Linux操作系统提供了一个fork()系统调用,它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后,分别在父进程和子进程内返回。

子进程永远返回0,而父进程返回子进程的ID。这样做的理由是,一个父进程可以fork出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID。

1.2 Linux下多进程

Python的os模块封装了常见的系统调用,其中就包括fork,可以在Python程序中轻松创建子进程:

# multiprocessing.py
import osprint ('Process (%s) start...' % os.getpid())
pid = os.fork()
if pid==0:print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
运行结果如下:Process (876) start...
I (876) just created a child process (877).
I am child process (877) and my parent is 876.

由于Windows没有fork调用,上面的代码在Windows上无法运行。由于Mac系统是基于BSD(Unix的一种)内核,所以,在Mac下运行是没有问题的

有了fork调用,一个进程在接到新任务时就可以复制出一个子进程来处理新任务,常见的Apache服务器就是由父进程监听端口,每当有新的http请求时,就fork出子进程来处理新的http请求。

1.3 multiprocessing

如果打算编写多进程的服务程序,Unix/Linux无疑是正确的选择。由于Windows没有fork调用,难道在Windows上无法用Python编写多进程的程序?
由于Python是跨平台的,自然也应该提供一个跨平台的多进程支持。multiprocessing模块就是跨平台版本的多进程模块。

multiprocessing模块提供了一个Process类来代表一个进程对象,下面的例子演示了启动一个子进程并等待其结束:

from multiprocessing import Process
import os# 子进程要执行的代码
def run_proc(name):print ('Run child process %s (%s)...' % (name, os.getpid()))if __name__=='__main__':print ('Parent process %s.' % os.getpid())p = Process(target=run_proc, args=('test',))print ('Process will start.')p.start()p.join()print 'Process end.'执行结果如下:
Parent process 928.
Process will start.
Run child process test (929)...
Process end.

创建子进程时,只需要传入一个执行函数和函数的参数,创建一个Process实例,用start()方法启动,这样创建进程比fork()还要简单。
join()方法可以等待子进程结束后再继续往下运行,通常用于进程间的同步。

1.4 Pool

如果要启动大量的子进程,可以用进程池的方式批量创建子进程:

from multiprocessing import Pool
import os, time, random
def long_time_task(name):print ('Run task %s (%s)...' % (name, os.getpid()))start = time.time()time.sleep(random.random() * 3)end = time.time()print ('Task %s runs %0.2f seconds.' % (name, (end - start)))if __name__=='__main__':print 'Parent process %s.' % os.getpid()p = Pool()for i in range(5):p.apply_async(long_time_task, args=(i,))print ('Waiting for all subprocesses done...')p.close()p.join()print 'All subprocesses done.'执行结果如下:
Parent process 669.
Waiting for all subprocesses done...
Run task 0 (671)...
Run task 1 (672)...
Run task 2 (673)...
Run task 3 (674)...
Task 2 runs 0.14 seconds.
Run task 4 (673)...
Task 1 runs 0.27 seconds.
Task 3 runs 0.86 seconds.
Task 0 runs 1.41 seconds.
Task 4 runs 1.91 seconds.
All subprocesses done.

代码解读:

  • 对Pool对象调用join()方法会等待所有子进程执行完毕,调用join()之前必须先调用close(),调用close()之后就不能继续添加新的Process了。
  • 注意输出的结果,task 0,1,2,3是立刻执行的,而task 4要等待前面某个task完成后才执行,这是因为Pool的默认大小在电脑上是4,因此,最多同时执行4个进程。这是Pool有意设计的限制,并不是操作系统的限制。如果改成:p = Pool(5),就可以同时跑5个进程。
  • 由于Pool的默认大小是CPU的核数,如果拥有8核CPU,那么要提交至少9个子进程才能看到上面的等待效果。

1.5 进程间通信

Process之间肯定是需要通信的,操作系统提供了很多机制来实现进程间的通信。Pythonmultiprocessing模块包装了底层的机制,提供了Queue、Pipes等多种方式来交换数据。

我们以 Queue 为例,在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

from multiprocessing import Process, Queue
import os, time, random# 写数据进程执行的代码:
def write(q):for value in ['A', 'B', 'C']:print ('Put %s to queue...' % value)q.put(value)time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):while True:value = q.get(True)print ('Get %s from queue.' % value)if __name__=='__main__':# 父进程创建Queue,并传给各个子进程:q = Queue()pw = Process(target=write, args=(q,))pr = Process(target=read, args=(q,))# 启动子进程pw,写入:pw.start()# 启动子进程pr,读取:pr.start()# 等待pw结束:pw.join()# pr进程里是死循环,无法等待其结束,只能强行终止:pr.terminate()运行结果如下:Put A to queue...
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

Unix/Linux下,multiprocessing模块封装了fork()调用,使我们不需要关注fork()的细节。由于Windows没有fork调用,因此,multiprocessing需要“模拟”出fork的效果,父进程所有Python对象都必须通过pickle序列化再传到子进程去,所以,如果multiprocessingWindows下调用失败了,要先考虑是不是pickle失败了。

1.6 分布式进程

Pythonmultiprocessing模块不但支持多进程,其中managers子模块还支持把多进程分布到多台机器上。一个服务进程可以作为调度者,将任务分布到其他多个进程中,依靠网络通信。

举个例子:如果我们已经有一个通过Queue通信的多进程程序在同一台机器上运行,现在,由于处理任务的进程任务繁重,希望把发送任务的进程和处理任务的进程分布到两台机器上。怎么用分布式进程实现?
原有的Queue可以继续使用,但是,通过managers模块把Queue通过网络暴露出去,就可以让其他机器的进程访问Queue了。
我们先看服务进程,服务进程负责启动Queue,把Queue注册到网络上,然后往Queue里面写入任务:

# taskmanager.pyimport random, time, Queue
from multiprocessing.managers import BaseManager# 发送任务的队列:
task_queue = Queue.Queue()
# 接收结果的队列:
result_queue = Queue.Queue()# 从BaseManager继承的QueueManager:
class QueueManager(BaseManager):pass# 把两个Queue都注册到网络上, callable参数关联了Queue对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 启动Queue:
manager.start()
# 获得通过网络访问的Queue对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):n = random.randint(0, 10000)print('Put task %d...' % n)task.put(n)
# 从result队列读取结果:
print('Try get results...')
for i in range(10):r = result.get(timeout=10)print('Result: %s' % r)
# 关闭:
manager.shutdown()

请注意,当我们在一台机器上写多进程程序时,创建的Queue可以直接拿来用,但是,在分布式多进程环境下,添加任务到Queue不可以直接对原始的task_queue进行操作,那样就绕过了QueueManager的封装,必须通过manager.get_task_queue()获得的Queue接口添加。
然后,在另一台机器上启动任务进程(本机上启动也可以):

# taskworker.pyimport time, sys, Queue
from multiprocessing.managers import BaseManager# 创建类似的QueueManager:
class QueueManager(BaseManager):pass# 由于这个QueueManager只从网络上获取Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')# 连接到服务器,也就是运行taskmanager.py的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与taskmanager.py设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 从网络连接:
m.connect()
# 获取Queue的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从task队列取任务,并把结果写入result队列:
for i in range(10):try:n = task.get(timeout=1)print('run task %d * %d...' % (n, n))r = '%d * %d = %d' % (n, n, n*n)time.sleep(1)result.put(r)except Queue.Empty:print('task queue is empty.')
# 处理结束:
print('worker exit.')

任务进程要通过网络连接到服务进程,所以要指定服务进程的IP。

现在,可以试试分布式进程的工作效果了。先启动taskmanager.py服务进程:

$ python taskmanager.py
Put task 3411...
Put task 1605...
Put task 1398...
Put task 4729...
Put task 5300...
Put task 7471...
Put task 68...
Put task 4219...
Put task 339...
Put task 7866...
Try get results...
taskmanager进程发送完任务后,开始等待result队列的结果。现在启动taskworker.py进程:$ python taskworker.py 127.0.0.1
Connect to server 127.0.0.1...
run task 3411 * 3411...
run task 1605 * 1605...
run task 1398 * 1398...
run task 4729 * 4729...
run task 5300 * 5300...
run task 7471 * 7471...
run task 68 * 68...
run task 4219 * 4219...
run task 339 * 339...
run task 7866 * 7866...
worker exit.

taskworker进程结束,在taskmanager进程中会继续打印出结果:

Result: 3411 * 3411 = 11634921
Result: 1605 * 1605 = 2576025
Result: 1398 * 1398 = 1954404
Result: 4729 * 4729 = 22363441
Result: 5300 * 5300 = 28090000
Result: 7471 * 7471 = 55815841
Result: 68 * 68 = 4624
Result: 4219 * 4219 = 17799961
Result: 339 * 339 = 114921
Result: 7866 * 7866 = 61873956

这个简单的Manager/Worker模型有什么用?其实这就是一个简单但真正的分布式计算,把代码稍加改造,启动多个worker,就可以把任务分布到几台甚至几十台机器上,比如把计算n*n的代码换成发送邮件,就实现了邮件队列的异步发送。

Queue对象存储在哪?注意到taskworker.py中根本没有创建Queue的代码,所以,Queue对象存储在taskmanager.py进程中:
在这里插入图片描述
Queue之所以能通过网络访问,就是通过QueueManager实现的。由于QueueManager管理的不止一个Queue,所以,要给每个Queue的网络调用接口起个名字,比如get_task_queue

authkey:是为了保证两台机器正常通信,不被其他机器恶意干扰。如果taskworker.py的authkey和taskmanager.py的authkey不一致,肯定连接不上。

注意Queue的作用是用来传递任务接收结果,每个任务的描述数据量要尽量小。比如发送一个处理日志文件的任务,就不要发送几百兆的日志文件本身,而是发送日志文件存放的完整路径,由Worker进程再去共享的磁盘上读取文件。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/734173.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[LeetCode] 169. Majority Element

排序,返回中值。class Solution:def majorityElement(self, nums: List[int]) -> int:#always existsnums.sort()return nums[len(nums)//2]

BUUCTF---childRSA(费马引理)

题目点击查看代码 from random import choice from Crypto.Util.number import isPrime, sieve_base as primes from flag import flagdef getPrime(bits):while True:n = 2while n.bit_length() < bits:n *= choice(primes)if isPrime(n + 1):return n + 1e = 0x10001 m = …

Python 使用__slots__来限制实例动态添加属性

在Python中,是可以随便在对象实例中动态添加属性的。那么,怎么样可以防止其他人在调用类实例的时候胡乱添加属性和方法?使用 __slots__ 属性,来限制 class 实例能添加的属性也就是说,只有在 __slots__ 变量中的属性才能被动态添加,否则会添加失败。例如,创建一个 Person …

[python] Python日志记录库loguru使用指北

Loguru是一个功能强大且易于使用的开源Python日志记录库。它建立在Python标准库中的logging模块之上,并提供了更加简洁直观、功能丰富的接口。Logging模块的使用见:Python日志记录库logging总结。Loguru官方仓库见:loguru,loguru官方文档见: loguru-doc。 Loguru的主要特点…

Codeforces Round 955 (Div. 2, with prizes from NEAR!) codeforces div2 955

A. Soccer ------------------------题解--------------- 给你开始比分和结束比分问你中间两队比分有没有相等过有可能就是YES不可能就是NO 结束时两队比分肯定>=各自队伍开始时比分,我们只需要让开始时大的先到达结束比分,再让开始时落后的比分到达结束时比分,只需要在心…

PTA题目集7~8的总结

PTA题目集7~8的总结 一、前言 第七次题目集为家居强电电路模拟程序3。本题模拟的控制设备包括:开关、互斥开关、分档调速器、连续调速器。模拟的受控设备包括:灯、风扇、受控窗帘。两种设备都有两根引脚,通过两根引脚电压的电压差驱动设备工作。输入信息有设备信息、连接信息…

STM32F4驱动USB实现虚拟串口

实现目的 使用Dap-link和stlink的时候,就发现这些仿真器上并没有USB转TTL芯片,就可以实现USB转串口,实现虚拟串口,非常方便。这里实测得出,使用USB虚拟串口,可以轻松达到921600波特率,接近1M/s,因为这个虚拟串口实际就是USB通讯,使用USB通讯,模拟COM类通讯端口协议,…

使用clion向STM32H7外置flash下载代码

简介 根据安福莱的STM32H7教程,H7单片机的QSPI外设是直接连到芯片内核上的,地址是0X90000000;那么就可以通过QSPI外设,将外置flash内存映射,并由此执行代码。相关操作在keil5上比较简单,配置点东西就行;可以参考安福莱教程。 这里要介绍的是在linux环境下没有keil5 IDE的…

Windows和Ubuntu网络文件共享

Windows11访问Ubuntu22.04共享文件夹 ubuntu配置右键选中需要共享的文件夹,选择属性->本地网络共享;选中共享此目录、允许其他人创建和删除这个文件夹里的文件、允许匿名登陆;并设置共享名配置好之后,点击修改共享。如果没有安装过samba服务,则会提升安装samba相关依赖…

Linux下使用clion+cubemx+openocd开发stm32

简述 后面需要在linux下学习驱动开发,然后不想玩虚拟机,就直接安装了双系统。用kde桌面玩的蛮开心的,就索性把win下的开发内容都搬过来吧 在Linux下开发STM32,使用Clion和Cubemx开发,openocd调试,关于芯片下载,编译器选项,代码起始内容都需要自行通过工程的配置文件修改…

银行视频监控智能分析

视频监控智能分析银行系统通过安装在银行的营业厅、取款机处或者银行柜台以及银行门口等区域的各大品牌的终端监控摄像头,视频监控智能分析在系统后台软件上的视频画面内设置智能分析区域,通过上面的操作实现对银行的7*24小时的智能视频监控分析报警,对进出人员行为进行智能…

智慧港口视频智能分析系统解决方案

港口视频监控智能分析系统是对港口各处的监控回传的视频流利用视频监控智能分析技术进行智能分析处理,视频监控智能分析系统将处理结果然后传送到中心管理服务器或者流媒体服务器进行有效管理。监控中心设立在港口控制中心,主要用于相关工作人员进行远程监控和管理。港口监控…

电力视频监控系统

电力视频监控系统通过对电力工程建设领域利用电力视频监控系统进行违规违章操作检测及其他安全区域监测,电力视频监控系统可以降低或减少安全事故造成的人员伤害和设备损害,提升公司社会形象,杜绝违规行为的发生。视频智能分析系统可以进一步强化安全管理,能够降低电力安全…

2024 最新上海市提取公积金缴纳房租 All In One

2024 最新上海市提取公积金缴纳房租 All In One 提取公积金 图解教程2024 最新上海市提取公积金缴纳房租 All In One最新版(亲测可用 ✅)随申办市民云 Apphttps://apps.apple.com/cn/app/随申办市民云/id732618720 提取公积金 图解教程步骤 图解 备注1. 打开随申办 App / 小程序…

Windows上使用VTune分析PyTorchExtension调用的Cpp程序

概述 最近在实现一个通过PyTorch Extension扩展PyTorch算子的C++算法,需要分析代码的运行瓶颈进行针对性优化。Intel VTune就是一个能从汇编级和源码级分析CPU运行瓶颈的工具。由于不明原因我没在服务器上跑通命令行版的VTune,所以把程序搬到Windows下分析了,因此记录一下Wi…

[LeetCode] 80. Remove Duplicates from Sorted Array II

原来leetcode使用Count也不需要import collections class Solution:def removeDuplicates(self, nums: List[int]) -> int:# len =0if len(nums) == 0:return 0# elsecountList = Counter(nums)countModify = {key:min(value,2) for key,value in countList.items()}ret = […

个人标语

从来不觉得自己比任何人差,讨厌懒惰,骄傲自满与妄自菲薄。

IDER如何生成数据库的ER图?

1.打开 IDEA,并连接到 MySQL数据库这里连接到你的数据库 2.在菜单栏中选择"View"->"Tool windows"->"Database" 3.在"Database" 工具窗口中,展开你想要査看的数据库,并选择"ER Diagrams"。 4.右键单击"ER Dia…

服务器系统瘫痪系统损坏数据恢复

故障服务器数据恢复环境: 一台故障服务器;共有4块146G SAS硬盘组成的RAID5;故障服务器分析检测: 服务器在运行过程中系统瘫痪,重装系统后系统损坏。系统中有数据库、网站程序与网页。 服务器数据恢复过程: 1、对全盘reiserfs树节点之间的关联确定原来的reiserfs分区位置镜…

服务器瘫痪,里面存有mysql数据库表结构

最上层,大多数基于网络服务器的工具或服务都有类似架构。 第二层,大多数MySQL的核心服务,包括查询解析、分析、优化、缓存以及所有的内置函数,所有跨存储引擎的功能都在这一层实现:存储过程、触发器、视图等。 第三层,存储引擎负责MySQL中数据的存储和提取。服务器通过AP…