多进程与进程间通信
# 创建进程
# 创建进程的方式
- 使用方法如下,调用
start()
方法后Process会再次运行该Python脚本的代码,另外附带执行指定的函数
# 导入多进程模块
from multiprocessing import Process
# 实例化一个进程对象
prc = Process(target=函数, args=(实参1, 实参2...))
# 启动创建一个进程并执行指定函数
prc.start()
1
2
3
4
5
6
2
3
4
5
6
- 也可以不指定执行函数,也就是不附带执行指定函数,例如:
prc = Process()
prc.start()
1
2
2
# 例子
import time
from multiprocessing import Process
def task(name):
print("你的名字是: {}".format(name))
time.sleep(3)
print("{} is over.".format(name))
print("outside")
if __name__ == "__main__":
prc = Process(target=task, args=('cris',))
prc.start()
print('test')
输出 >> outside
输出 >> test
输出 >> outside
输出 >> 你的名字是: cris
输出 >> cris is over.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 注意事项
- windows下,创建进程的代码一定要书写在
main
内创建,linux则中不用,但还是建议写在main
之中以提高跨平台性if __name__ == "__main__": ... ...
- 因为windows下创建进程类似于模块导入的方式,会从上往下依次执行代码,而在执行创建进程的代码时,会再次读取并执行python脚本中的代码,如果此时不判断是不是在main内,那么等再次读取到创建进程的代码时,又会去创建进程
- 这就会进入一个死循环的状态,所以会报错
# 进程总结
- 创建进程就是再内存中申请一块内存空间,将需要运行的代码丢进去运行
- 一个进程对应在内存中就是一块独立的内存空间
- 多个进程对应在内存中就是多块独立的内存空间
- 进程与进程之间的数据默认情况下是相互隔离的,如果想要交互可以借助第三方工具/模块
- 子进程的数据,是与主进程和其他子进程隔离的
# 进程对象的join方法
# join方法
[进程对象].join()
- join方法是让主进程等待指定的子进程的代码运行结束之后,再继续运行
- 但其他已经在运行中的子进程不受影响
# 注意事项
- 需在
start()
之后调用 join()
是会阻塞主进程,直到子进程子进程运行完
# 例子
import time
from multiprocessing import Process
def task(name, t):
print("{}进程运行中...".format(name))
time.sleep(t)
print("{}进程运行结束!".format(name))
if __name__ == "__main__":
prc1 = Process(target=task, args=('01', 1))
prc2 = Process(target=task, args=('02', 2))
start = time.time()
prc1.start()
prc2.start()
prc1.join()
prc2.join()
# 执行到prc2.join()时,prc2已经运行了1秒,再过1秒就会继续执行print(),所以执行完只需要2秒
print("主 " + str(time.time() - start))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 进程对象及其方法
# 系统区分进程的方式
- 系统会给每一个运行的进程分配一个PID号,便于区分与管理不同的进程
- Python第一种查看方式(推荐)
# 导入os模块
import os
# 获取当前进程的PID号
os.getpid()
# 获取当前进程的父进程的PID号
os.getppid()
1
2
3
4
5
6
2
3
4
5
6
- Python第二种查看方式
from multiprocecssing import current_process
# 获取当前进程的PID号
current_process().pid
1
2
3
2
3
- windows系统查看方式
- cmd中执行
tasklist
命令
- cmd中执行
- linux系统查看方式
- shell中执行
ps aux
命令
- shell中执行
# 杀死进程对象的进程
[进程对象].terminate()
- 告诉操作系统去杀死进程对象的进程,但杀死需要一定的时间,而代码的运行速度比之更快
- 所以可能需要
sleep(0.1)
一些时间,再去判断线程是否存活,结果才会准确
# 判断进程对象的进程是否存活
[进程对象].is_alive()
# 僵尸进程与孤儿进程
# 僵尸进程
- 死了但还没有死透的进程
- 当子进程运行完后,也就是子进程死后,子进程所占用的进程号不会立马释放
- 这是为了让父进程能够查看到子进程的一些基本信息,如: 占用的pid号,运行时间等等
- 所有的进程都会短暂的成为僵尸进程
- 异常情况
- 父进程不死,并且在无限制的创建子进程并且子进程也不结束
- 这就会越来越多的占用系统的pid号
- pid号什么时候回收
- 1.父进程等待子进程运行结束
- 2.父进程调用join方法
# 孤儿进程
- 父进程意外死亡,但子进程还存活
- 操作系统会自行去管理孤儿进程,回收相关资源等
# 守护进程
# 介绍
- 守护着某个进程的进程,被守护的进程活,那么守护进程也活。被守护的进程死,则守护进程也死
- 当主进程死后,子进程也会跟着结束运行
# 实现方式
[进程对象].daemon = True
[进程对象].start()
1
2
2
# 例子
import time
from multiprocessing import Process
def task(name, t):
print("{}进程运行中...".format(name))
time.sleep(t)
print("{}进程运行结束!".format(name))
if __name__ == "__main__":
prc1 = Process(target=task, args=('01', 2))
prc1.daemon = True
prc1.start()
time.sleep(1)
print("主")
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
- 子进程后面的那句话不会执行,是因为子进程已经跟着主进程死了,主进程执行完print就死了
# 互斥锁
# 介绍
- 针对多个进程,操作同一份数据的时候,会出现数据错乱的问题
- 针对上述问题,解决方式就是加锁处理
- 加锁的作用是,将并发变成串行,牺牲效率,但是保证了数据的安全
# 注意事项
- 锁不要轻易地使用,容易造成 死锁 现象 (写代码时一般不会用到锁,都是内部封装好的)
- 锁应当只在处理数据的部分加,以来保证数据的安全可靠性,因为加的锁越多,效率越低
# 使用方法
# 先导入 Process 和 Lock
from multiprocessing import Process, Lock
def task(mutex):
search()
# 在遇到mutex.acquire()时,子进程需要抢到锁才会继续执行,否则会一直等待
mutex.acquire()
buy()
# 执行完操作后释放锁,不然其他子进程会一直等待锁释放
mutex.release()
if __name__ == '__main__':
# 在主进程中生成一把锁,让所有的子进程抢,谁先抢到谁先执行
mutex = Lock()
# 在创建子进程对象时,将锁作为参数传给子进程
p1 = Process(target=task, args=(mutex,))
p2 = Process(target=task, args=(mutex,))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
- 总而言之就是,一个子进程拿到锁之后,其他有着相同锁的子进程,除非等抢到锁的进程释放锁,否者会一直等待,直到锁释放后然后抢锁,接着运行
# 使用例子
- data.json文件
{"ticket": 1}
1
- mutex.py文件
import json
from multiprocessing import Process, Lock
def search(name):
# 查票
with open('data.json', 'rt', encoding='utf-8') as f:
data = json.load(f)
print('用户: {}, 查询余票: {}'.format(name, data.get('ticket')))
def buyticket(name):
# 查票
with open('data.json', 'rt', encoding='utf-8') as f:
data = json.load(f)
# 买票
if data.get('ticket') > 0:
data['ticket'] -= 1
with open('data.json', 'wt', encoding='utf-8') as f:
json.dump(data, f)
print('用户: {}, 购票成功!'.format(name))
else:
print('用户: {}, 购票失败,余票不足!'.format(name))
def run(name, mutex):
search(name)
mutex.acquire()
buyticket(name)
mutex.release()
if __name__ == '__main__':
mutex = Lock()
pr1 = Process(target=run, args=('cecilia',mutex))
pr2 = Process(target=run, args=('anli',mutex))
pr1.start()
pr2.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 队列
# 介绍
- 队列
- 先进先出
- 管道 + 锁
- 堆栈
- 先进后出
# queue模块
- 导入队列模块
import queue
Queue
队列:类用于创建队列对象que = queue.Queue(maxsize=0)
- maxsize参数
- 指定队列可以存放的最大数据量,默认0,表示无限制大小
- maxsize参数
put()
方法:队列插入元素que.put(item, block=True, timeout=None)
- 默认情况下,当队列数据存满了之后,如果还往里存数据,则程序会进入阻塞状态,直到有位置让出来
- item参数
- 要插入的元素
- block参数
- 如果block为True,则在队列存满时,进入阻塞状态,可通过timeout控制阻塞时长
- timeout参数
- timeout用于控制block为true时阻塞的时长,超过设定的时间没有空位置供插入元素则报错,None为一直阻塞
get()
方法:队列弹出元素que.get(block=True, timeout=None)
- 默认情况下,当队列数据为空之后,如果还执行弹出操作,则程序会进入阻塞状态,直到有数据可以弹出
- block参数
- 如果block为True,则在队列为空时,进入阻塞状态,可通过timeout控制阻塞时长
- timeout参数
- timeout用于控制block为true时阻塞的时长,超过设定的时间队列还没有元素则报错,None为一直阻塞
que.full()
、que.empty()
、que.get_nowait()
方法- 这些方法在多进程中,可能是不准的,除非加锁,可能会出现前面的进程刚判断,后面的进程就已经更该了操作,这样就会造成不准的情况
- 所以一般用put()和get()就行
# IPC (进程间通信)
# IPC介绍
- 利用队列来使进程与进程之间进行通信
- 如: 两个进程共用一个队列,然后一个进程put,另一个进程get,这就实现了进程间的通信
# 例如
- 子进程和主进程交互
from multiprocessing import Process, Queue
def producer(que):
print('hello girl!')
que.put('你好!帅哥')
if __name__ == '__main__':
que = Queue()
pcs = Process(target=producer, args=[que])
pcs.start()
print(que.get())
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
- 子进程和子进程交互
from multiprocessing import Process, Queue
def producer(que):
print('生产!')
que.put('消费!')
def consumer(que):
print(que.get())
if __name__ == '__main__':
que = Queue()
pcs1 = Process(target=producer, args=[que])
pcs2 = Process(target=consumer, args=[que])
pcs1.start()
pcs2.start()
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
- 注意:子进程传入的队列对象,需要是multiprocessing中的Queue类实例化得来的,否则无法正常启动子进程
# 生产者/消费者模型
# 生产者
- 生产、制作消息,并往队列中插入消息
# 消息队列
- 作为消费者和生产者交互的媒介,用于存储消息
# 消费者
- 从队列中取出消息,并消费、处理消息
# 使用例子
from multiprocessing import Process, Queue
def producer(que):
for i in range(0, 10):
que.put(i)
print("produce: {}".format(i))
def consumer(que):
while True:
data = que.get()
if data is None:
print('consume done!')
break
else:
print('consume: {}'.format(data))
if __name__ == '__main__':
que = Queue()
producer_process = Process(target=producer, args=(que,))
consumer01_process = Process(target=consumer, args=(que,))
consumer02_process = Process(target=consumer, args=(que,))
producer_process.start()
consumer01_process.start()
consumer02_process.start()
producer_process.join()
que.put(None)
que.put(None)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
- 等生产者都生产完了,就插入一个None,而消费者就判断如果取到None就结束
- 有几个消费者就要put个None
# JoinableQueue队列
# 导入模块
from multiprocessing import JoinableQueue
# 介绍
- 与Queue的区别是,每当往该类创建的队列实例里插入put数据时,内部就会有一个计数器+1
- 当调用task_done时,计数器就会-1
# joinablequeue.join()方法
- 一般用在主进程中,当计数器为0时,才往后运行,也就是都消费完了才继续执行
- 一般与daemon一同使用,使主进程结束时,子进程跟着结束