1. 多进程相关理论
1.1 什么是进程
进程是一个正在执行的任务或程序
负责执行任务的是CPU
(1)单任务单核CPU+多道技术 实现多个进程的伪并发
(2)多任务多个任务并发执行
1.2 进程和程序的区别
程序是代码的集合体
进程是程序的执行过程
1.3 进程的调度算法
(1)先来先服务算法
FCFS算法既可用于作业调度,也可用于进程调度
该算法有利于长作业(进程),不利于短作业(进程)
适合于CPU繁忙型作业(进程),不利于I/O繁忙型作业(进程)
(2)短作业优先算法
短作业优先(Shortest Job First,SJF)或短进程优先(Shortest Process First,SPF)
作业的长短只是被估算出来的
(3)时间片轮转法
Round Robin算法的思路是让每个进程在就绪队列中的等待时间与服务的时间成正比
该算法中,将CPU的处理时间分成固定大小的时间片,几十毫秒至几百毫秒
如果一个进程在被调度选中之后用完了系统规定的时间片,但又未完成要求的任务,则它自行释放自己所占有的CPU而排到就绪队列的末尾,等待下一次调度。
(4)多级反馈队列
按顺序调度队列
仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;
仅当第1~(i-1)队列均空时,才会调度第i队列中的进程运行。
如果处理机正在第i队列中为某进程服务时,又有新进程进入优先权较高的队列(第1~(i-1)中的任何一个队列)
则此时新进程将抢占正在运行进程的处理机
即由调度程序把正在运行的进程放回到第i队列的末尾
把处理机分配给新到的高优先权进程。
1.4 并发与并行
(1)概念
无论是并发还是并行,在用户看来都是“同时”运行的
并发:
一种伪并行,在同一个时间段内,多个任务交替进行,表面上看起来是同时进行的,实际上并不是同时进行
并行:
同时运行,只有具备多个CPU才能实现并行
CPU为单核时:利用多道技术实现并行,多核中的每个核也可以利用多道技术(多道技术是针对单核而言的)
(2) 多道技术
内存中同时存入多道(多个)程序,CPU从一个进程快速切换到另外一个,使每个进程各自运行几十或几百毫秒
因此,虽然在某个瞬间一个CPU只能执行一个任务,但是在1秒内CPU却可以运行多个进程,这就给用户产生了并行的错觉,即伪并发
以此来区分多CPU操作系统的真正硬件并行(多个CPU共享同一个物理内存)
(3) 并行、并发总结
并行肯定算并发
单核的计算机不能实现并行,但是可以实现并发
1.5 同步、异步 阻塞、非阻塞
(1) 同步
在运行一个程序的时候,没有得到结果就不会运行第二个程序
(2)异步
在运行一个程序的时候,不需要等待运行结果,就可以执行第二个程序
(3)阻塞
在线程返回结果之前,当前线程会被挂起(在有io操作的情况下)
(4) 非阻塞
不需要等待结果,直接运行下一个
(5) 总结
同步与异步针对的是函数/任务的调用方式
同步调用:当一个进程发起一个函数(任务)调用的时候,该调用一直等待直到函数(任务)完成,并未阻塞
异步调用:当一个进程发起一个函数(任务)调用的时候,不会等待函数返回,而是继续往下执行,函数返回时通过状态、通知、事件等方式通知进程任务完成
阻塞与非阻塞针对的是进程或线程
阻塞:当请求不满足时将进程挂起
非阻塞:当请求不满足时不会阻塞当前的进程
阻塞调用:使用socket建立tcp连接后,只要recv没有接收到数据就会一直处于阻塞的状态,直到有数据为止
1.6 进程的创建与终止
1.6.1 引入
计算机中的硬件都需要操作系统去管理,只要存在操作系统,就有进程的概念
1.6.2 系统通用的4种创建进程的方式
(1)系统初始化系统启动的时候,操作系统会自动启动一些内置的进程
Windows中任务管理器查看进程,Linux中ps命令查看
(2)进程中开启子进程例如开启Pycharm之后在里面运行py文件
(3)交互式请求双击启动一个应用程序
(4)批处理作业的初始化只在大型的批处理系统中应用
1.6.3 不同系统中创建进程的方式
(1)unixfork创建一个父进程一样的副本
(2)windowsCreateProcess既处理进程的创建,也负责把正确的程序装入新进程
1.6.4 进程的终止
(1)正常退出
自行关闭软件
(2)出错退出
在程序运行过程中发生错误,导致程序退出
(3)严重错误
执行错误指令,如引用不存在的内存,1/0等
(4)被其它进程终止
使用命令kill -9在Linux中强制终止进程
1.7 进程的状态
(1) 五态模型
创建态:进程正在被创建,但还没有完成创建的过程,双击应用程序图标正在读取内存数据的状态
就绪态:进程具备运行的条件,但还没有分配到CPU,处于等待CPU分配的时间段
阻塞态:进程正在执行某个操作,但是需要等待某个事件的完成,比如读写文件
执行态:进程正在执行,此时CPU正在执行该进程的指令
终止态:进程正在被销毁,此时操作系统会回收该进程所占用的资源
(2)三态模型
运行态:进程在运行过程中的状态
就绪态:进程具备运行的条件,但还没有分配到CPU,处于等待CPU分配的时间段
阻塞态:进程正在执行某个操作,但是需要等待某个事件的完成,比如等待用户输入数据 / 读写文件
2. 多进程相关操作
2.1 多进程模块(multiprocessing)与进程类(Process)
multiprocessing模块用来开启子进程,并且在子进程中执行定制的任务(比如函数)
import multiprocessingp1 = multiprocessing.Process() # Process类创建子进程对象
Process类的基类BaseProcess中有以下参数
group:当前的进程组,值始终为None
target:表示当前需要创建子进程的函数对象
name:当前子进程的名称,默认不修改
args:在调用子进程函数时需要传递的参数,按照位置传递
kwargs:在调用子进程函数时需要传递的参数,按照关键字传递
daemon:守护进程是否开启
子进程的操作方法
p1.start() # 启动子进程,并调用该子进程中的p1.run()
p1.run() # 进程启动时运行的方法,正是它去调用target指定的函数,自定义的类中一定要实现该方法
p1.join() # 主进程等待所有子进程结束后再结束
p1.is_alive() # 判断子进程的状态
p1.terminate() # 强制终止进程p1,不会进行任何清理操作;如果p1创建了子进程,该子进程就成了僵尸进程;如果p1还保存了一个锁那么也将不会被释放,进而导致死锁
p1.close() # 终止子进程
子进程的属性
p1.daemon # 默认是False,当前进程是否开启守护进程,如果设置为True,p1不能创建自身的新进程,且必须在p1.start()之前设置
p1.name # 子进程的名称
p1.pid # 子进程的pid
p1.exitcode # 退出状态码,进程在运行时为None、如果为–N,表示被信号N结束(了解)
p1.authkey # 进程的身份验证键,默认是32位的16进制数
2.2 Process类的使用
2.2.1 提示
在windows中Process()必须放到 if __name == 'main__':下
Since Windows has no fork, the multiprocessing module starts a new Python process **and** imports the calling module.
If Process() gets called upon import, then this sets off an infinite succession of new processes (**or** until your machine runs out of resources).
This **is** the reason **for** hiding calls to Process() inside**if name == "main"**
since statements inside this **if**-statement will **not** get called upon import.
由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。
如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。
这是隐藏对Process()内部调用的原,使用if **name** == “**main** ”,这个if语句中的语句将不会在导入时被调用。
2.2.2 创建子进程的两种方式
(1)方法一:使用Process方法
from multiprocessing import Process
import random
import time# 1.定义子进程函数
def work(name):rest = random.randint(1, 3)print(f'{name}开始休息{rest}秒')time.sleep(rest)print(f'{name}结束休息')# 2.产生子进程对象并启动子进程
def create_process():# 产生两个子进程对象p1 = Process(target=work, # 目标子进程函数,给的是函数名(内存地址)args=('one',) # 向目标函数传递的位置参数,是一个元组
)p2 = Process(target=work,args=('two',))# 启动子进程:start触发run的运行,分别用start和run启动
p1.start()p2.start()if __name__ == '__main__':print('主程序入口开始')create_process()print('主程序入口结束')主程序入口开始
主程序入口结束
one开始休息2秒
two开始休息2秒
one结束休息
two结束休息
from multiprocessing import Process
import random
import time
# 1.定义子进程函数
def work(name):rest = random.randint(1, 3)print(f'{name}开始休息{rest}秒')time.sleep(rest)print(f'{name}结束休息')# 2.产生子进程对象并启动子进程
def create_process():# 产生两个子进程对象p1 = Process(target=work, # 目标子进程函数,给的是函数名(内存地址)args=('one',) # 向目标函数传递的位置参数,是一个元组
)p2 = Process(target=work,args=('two',))# 启动子进程:start触发run的运行,分别用start和run启动
p1.run()p2.run()if __name__ == '__main__':print('主程序入口开始')create_process()print('主程序入口结束')主程序入口开始
one开始休息3秒
one结束休息
two开始休息1秒
two结束休息
主程序入口结束
以上开启多进程是为了观察几个进程同时运行时的现象
run( )函数的效果是多个进程串行
(2)方法二:继承Process类,重写run方法
from multiprocessing import Process
import random
import timeclass NewProcess(Process):def __init__(self, name):self.name = name # 接收传给类的参数super().__init__() # 要将基类Process的基类BaseProcess中的__init__继承过来,没该函数下的参数当前代码会报错def run(self):rest = random.randint(1, 3)print(f'{self.name}开始休息{rest}秒')time.sleep(rest)print(f'{self.name}结束休息')def create_process():# 1.获得两个子进程对象p1 = NewProcess(name='one')p2 = NewProcess(name='two')# 2.启动子进程
p1.start()p2.start()if __name__ == '__main__':print('主程序入口开始')create_process()print('主程序入口结束')
主程序入口开始
主程序入口结束
NewProcess-1开始休息1秒
NewProcess-2开始休息1秒
NewProcess-1结束休息
NewProcess-2结束休息
2.2.3 进程之间的数据是隔离的
from multiprocessing import Processmoney = 99def change_money():global moneyprint(f'修改之前的余额为{money}')money += 1print(f'修改之后的余额为{money}')def create_process():task_list = []for i in range(3):p = Process(target=change_money)task_list.append(p) # 将遍历得到的3个子进程对象添加到任务列表中[p2.start() for p2 in task_list]if __name__ == '__main__':create_process()print(f'所有子进程修改后的money为{money}')所有子进程修改后的money为99
修改之前的余额为99
修改之后的余额为100
修改之前的余额为99
修改之后的余额为100
修改之前的余额为99
修改之后的余额为100
每一个子进程修改后得到money都是100
因为每一个子进程都会将主进程的所有数据拷贝一份
修改的数据是自身拷贝出来的那一份,而原本的全局不变
2.2.4 多进程实现TCP服务端并发
(1)TCP服务端串行
# 服务端
import socketserver = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
server.bind(('127.0.0.1', 9000))
server.listen(5)
while True:conn, addr = server.accept()while True:info_from_client = conn.recv(1024)if not info_from_client:conn.close()breakprint(f'客户端发来的消息为{info_from_client.decode()}')info_send_to = info_from_client.decode().upper()conn.send(info_send_to.encode())
from socket import socket, AF_INET, SOCK_STREAMclient = socket(family=AF_INET, type=SOCK_STREAM)client.connect(('127.0.0.1', 9696))while True:msg = input('请输入要发送的消息:')if not msg: continueclient.send(msg.encode('utf-8'))data_from_server = client.recv(1024)print(f"服务器返回的数据:{data_from_server.decode('utf-8')}")
串行最多可以连接监听数+1个客户端
串行只允许第一个连接的客户端与服务端交互;关闭第一个客户端,则允许最近的一个客户端交互
(2)TCP服务端并发
import multiprocessing
import socket# 定义子进程函数
def work(conn):while True:info_from_client = conn.recv(1024)if not info_from_client:conn.close()breakprint(f'客户端发来的信息为{info_from_client.decode("utf-8")}')info_to_client = info_from_client.decode("utf-8").upper()conn.send(info_to_client.encode("utf-8"))def main():server = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 防止端口被占用server.bind(('127.0.0.1', 9020))server.listen(5)while True:conn, addr = server.accept() # 接收每一个客户端的连接对象p = multiprocessing.Process(target=work, args=(conn,)) # 将连接对象所要执行的操作创建成一个子进程p.start() # 子进程启动if __name__ == '__main__':main()
# 客户端
import socketclient = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
client.connect(('127.0.0.1', 9020))
while True:info_to_server = input('请输入发送给服务端的信息:')if not info_to_server:continueclient.send(info_to_server.encode('utf-8'))info_from_server = client.recv(1024).decode('utf-8')print(f'服务端返回的信息为{info_from_server}')
服务端开启多进程并发时,客户端的连接数量不受监听限制
并发允许所有连接的客户端与服务端交互
2.2.5 Process对象(子进程)的join方法
将并行转换为串行;主进程等待子进程结束而结束
(1)非join方法 并行
import random
import time
from multiprocessing import Process# 定义计算函数运行时间的装饰器
def outer(func):def inner(*args, **kwargs):start = time.time()res = func(*args, **kwargs)end = time.time()print(f'函数{func.__name__}运行时长为{end - start}秒')return resreturn inner# 定义子进程工作函数
def work(name):rest = random.randint(1, 3)print(f'{name}开始休息{rest}秒')time.sleep(rest)print(f'{name}休息结束')# 定义产生子进程及运行子进程函数
@outer
def create_process():sp_list = [Process(target=work, kwargs={'name': f'子进程-{i}'}) for i in range(5)] # 参数也可以写成args=(f'子进程-{i}',)[p.start() for p in sp_list]if __name__ == '__main__':print('主函数开始运行')create_process()print('主函数结束运行')# 主函数开始运行
# 函数create_process运行时长为0.03391122817993164秒
# 主函数结束运行
# 子进程-0开始休息2秒
# 子进程-1开始休息2秒
# 子进程-2开始休息2秒
# 子进程-3开始休息3秒
# 子进程-4开始休息1秒
# 子进程-4休息结束
# 子进程-0休息结束
# 子进程-1休息结束
# 子进程-2休息结束
# 子进程-3休息结束
主进程不会等待所有子进程结束再结束
(2)一个子进程start之后直接join 串行
import random
import time
from multiprocessing import Process# 定义计算函数运行时间的装饰器
def outer(func):def inner(*args, **kwargs):start = time.time()res = func(*args, **kwargs)end = time.time()print(f'函数{func.__name__}运行时长为{end - start}秒')return resreturn inner# 定义子进程工作函数
def work(name):rest = random.randint(1, 3)print(f'{name}开始休息{rest}秒')time.sleep(rest)print(f'{name}休息结束')# 定义产生子进程及运行子进程函数
@outer
def create_process_wait1():sp_list = [Process(target=work, kwargs={'name': f'子进程-{i}'}) for i in range(5)] # 参数也可以写成args=(f'子进程-{i}',)for p in sp_list:p.start()p.join()if __name__ == '__main__':print('主函数开始运行')create_process_wait1()print('主函数结束运行')# 主函数开始运行
# 子进程-0开始休息3秒
# 子进程-0休息结束
# 子进程-1开始休息3秒
# 子进程-1休息结束
# 子进程-2开始休息2秒
# 子进程-2休息结束
# 子进程-3开始休息3秒
# 子进程-3休息结束
# 子进程-4开始休息1秒
# 子进程-4休息结束
# 函数create_process_wait1运行时长为12.343429327011108秒
# 主函数结束运行
子进程逐个运行,前提是拿到上一个子进程运行的结果
主函数的运行时长是所有子进程与产生子进程函数运行时间的总和
(3)所有子进程start之后再逐个join 并行
import random
import time
from multiprocessing import Process# 定义计算函数运行时间的装饰器
def outer(func):def inner(*args, **kwargs):start = time.time()res = func(*args, **kwargs)end = time.time()print(f'函数{func.__name__}运行时长为{end - start}秒')return resreturn inner# 定义子进程工作函数
def work(name):rest = random.randint(1, 3)print(f'{name}开始休息{rest}秒')time.sleep(rest)print(f'{name}休息结束')# 定义产生子进程及运行子进程函数
@outer
def create_process_wait2():sp_list = [Process(target=work, kwargs={'name': f'子进程-{i}'}) for i in range(5)] # 参数也可以写成args=(f'子进程-{i}',)[p.start() for p in sp_list] # 将当前所有的子进程的状态改为就绪态[p.join() for p in sp_list] # 整合所有子进程,进行进程间的调度if __name__ == '__main__':print('主函数开始运行')create_process_wait2()print('主函数结束运行')# 主函数开始运行
# 子进程-0开始休息3秒
# 子进程-1开始休息1秒
# 子进程-2开始休息2秒
# 子进程-3开始休息3秒
# 子进程-4开始休息2秒
# 子进程-1休息结束
# 子进程-2休息结束
# 子进程-4休息结束
# 子进程-0休息结束
# 子进程-3休息结束
# 函数create_process_wait2运行时长为3.104586124420166秒
# 主函数结束运行
并行就是所有进程同时执行;结束时间取决于时间最长的那个进程
(4)为什么集体join变并行
start()方法与join()方法调用顺序对程序执行流程影响的一种理解尝试。
【1】多进程 start() 后紧跟 join() 是“串行”行为的理解
在Python中,如果你在一个循环中创建了多个子进程,并且每个子进程创建后立即调用了join()方法,这实际上并不是典型的并行执行模式。
这里的“串行”是一种相对的说法,指的是逻辑上的执行流程给人的直观感受,并不是指CPU调度层面真的变为串行。
在这个例子中,每个子进程确实是在其前一个子进程结束(join()完成)后才开始。 # 同步
这是因为join()会阻塞主进程,直到相应的子进程执行完毕。
因此,尽管多个子进程可能在不同CPU核心上并行执行,但从主进程的角度来看,它们是按顺序等待的,形成了“串行”的效果。
【2】先start()后添加到列表中再挨个join()是“并行”行为的理解
如果改变策略,先启动所有子进程,然后在外部循环中逐个调用join(),情况就有所不同:
在这种情况下,所有子进程几乎同时启动(受到系统资源限制),然后每个进程并行执行。
当到达第一个join()循环时,主进程会等待每个子进程完成,但它并不阻止其他子进程的同时执行。
也就是说,虽然主进程自身仍然是串行等待每个子进程结束,但实际上所有子进程是并行执行的。
这种方式最大化了并行潜力,因为子进程之间的执行不依赖于彼此完成的顺序。
【3】结论
“串行”和“并行”的概念在这里更多地是从程序逻辑控制流的角度来区分的。
直接在每个start()之后调用join()看起来像是串行,因为主进程的执行路径是按顺序等每个任务完成。 # 同步执行
而先全部start()再集体join()则更接近并行处理,因为所有子任务被同时发起,尽管最终的join()调用还是按照顺序进行。 # 异步执行
真正的并行性体现在子进程层面,它们可以在不同的处理器上同时运行,不受主进程控制流程的影响。
'''
2.2.6 Process对象(子进程)的其它属性
[1]进程号的概念
计算机会给每一个运行的进程分配一个PID号
[2]系统中查看进程列表
Windows中 tasklist
Linux中 ps aux
[3]根据PID号查看指定进程具体信息
Windows中 tasklist |findstr PID号
Linux中 ps aux|grep PID号
[4]查看进程号的方法:current_process().pid os.getpid()
current_process().pid与os.getpid()用法一致
import os
from multiprocessing import Process, current_processdef work(name):print(f'子进程{name}的pid为{current_process().pid}')print(f'子进程{name}的pid为{os.getpid()}')if __name__ == '__main__':p1 = Process(target=work, args=(1,))p1.start()p2 = Process(target=work, args=(2,))p2.start()print(f'主程序的pid为{current_process().pid}')# 主程序的pid为10128
# 子进程1的pid为5508
# 子进程1的pid为5508
# 子进程2的pid为10692
# 子进程2的pid为10692
[5]查看父进程号的方法:os.getppid()
import os
from multiprocessing import Process, current_processdef work(name):print(f'子进程{name}的进程号为{current_process().pid}')print(f'子进程{name}的父进程号为{os.getppid()}')if __name__ == '__main__':p = Process(target=work, args=(1,))p.start()print(f'主程序的进程号为{current_process().pid}')print(f'主程序的父进程号为{os.getppid()}')
主程序的进程号为3460
主程序的父进程号为11928
子进程1的进程号为7312
子进程1的父进程号为3460
[6]查看进程状态方法p.is_alive(),终止进程方法p.terminate()
(1)没有sleep情况下,terminate也会终止进程,但是is_alive识别仍为True
from multiprocessing import Process, current_processdef work(name):print(f'子进程{name}的进程号为{current_process().pid}')def create_process():sp_list = []for i in range(5):p = Process(target=work, args=(i,))print(f'{p.pid}') # 子进程生成但未启动无法获取到子进程号p.start() # 将当前所有的子进程状态改为就绪态if i == 3:print(f'进程号{p.pid} is alive? {p.is_alive()}')p.terminate()print(f'进程号{p.pid} is alive? {p.is_alive()}')sp_list.append(p)[p.join() for p in sp_list]if __name__ == '__main__':print(f'主程序开始运行')create_process()print(f'主程序结束运行')
# 主程序开始运行
# None 子进程生成但未启动无法获取到进程号
# None
# None
# None
# 进程号11312 is alive? True
# 进程号11312 is alive? True 虽然判断仍为True,但后续代码未打印子进程3的进程号
# None
# 子进程0的进程号为6136
# 子进程1的进程号为2904
# 子进程2的进程号为1644
# 子进程4的进程号为144
# 主程序结束运行
(2)有sleep的情况下,terminate会终止进程,is_alive识别为False
import time
from multiprocessing import Process, current_processdef work(name):print(f'子进程{name}的进程号为{current_process().pid}')def create_process():sp_list = []for i in range(5):p = Process(target=work, args=(i,))print(f'{p.pid}') # 子进程生成但未启动无法获取到子进程号p.start() # 将当前所有的子进程状态改为就绪态if i == 3:print(f'进程号{p.pid} is alive? {p.is_alive()}')p.terminate()time.sleep(2)print(f'进程号{p.pid} is alive? {p.is_alive()}')sp_list.append(p)[p.join() for p in sp_list]if __name__ == '__main__':print(f'主程序开始运行')create_process()print(f'主程序结束运行')
# 主程序开始运行
# None 子进程生成但未启动无法获取到子进程号
# None
# None
# None
# 进程号1640 is alive? True
# 子进程0的进程号为10060
# 子进程1的进程号为15000
# 子进程2的进程号为5236
# 进程号1640 is alive? False 程序休息2秒之后判断为False
# None
# 子进程4的进程号为4172
# 主程序结束运行