ThankNeko's Blog ThankNeko's Blog
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)

Hoshinozora

尽人事,听天命。
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)
  • Python笔记

    • 基础知识

    • 类与面向对象

    • 并发编程

      • 并发相关介绍
      • 多进程与进程间通信
        • 创建进程
        • 进程对象的join方法
        • 进程对象及其方法
        • 僵尸进程与孤儿进程
        • 守护进程
        • 互斥锁
        • 队列
        • IPC (进程间通信)
        • 生产者/消费者模型
        • JoinableQueue队列
      • 多线程与线程间通信
      • 其他锁与队列
      • 网络IO模型与协程
    • Web编程

    • 模块笔记

    • 其他

  • C笔记

  • C++笔记

  • Arduino笔记

  • Dev
  • Python笔记
  • 并发编程
Hoshinozora
2023-02-25
目录

多进程与进程间通信

# 创建进程

# 创建进程的方式

  • 使用方法如下,调用start()方法后Process会再次运行该Python脚本的代码,另外附带执行指定的函数
# 导入多进程模块
from multiprocessing import Process
# 实例化一个进程对象
prc = Process(target=函数, args=(实参1, 实参2...))
# 启动创建一个进程并执行指定函数
prc.start()
1
2
3
4
5
6
  • 也可以不指定执行函数,也就是不附带执行指定函数,例如:
prc = Process()
prc.start()
1
2

# 例子

import time
from multiprocessing import Process

def task(name):
    print("你的名字是: {}".format(name))
    time.sleep(3)
    print("{} is over.".format(name))

print("outside")

if __name__ == "__main__":
    prc = Process(target=task, args=('cris',))
    prc.start()
    print('test')

输出 >> outside
输出 >> test
输出 >> outside
输出 >> 你的名字是: cris
输出 >> cris is over.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 注意事项

  • windows下,创建进程的代码一定要书写在main内创建,linux则中不用,但还是建议写在main之中以提高跨平台性
    • if __name__ == "__main__": ... ...
  • 因为windows下创建进程类似于模块导入的方式,会从上往下依次执行代码,而在执行创建进程的代码时,会再次读取并执行python脚本中的代码,如果此时不判断是不是在main内,那么等再次读取到创建进程的代码时,又会去创建进程
    • 这就会进入一个死循环的状态,所以会报错

# 进程总结

  • 创建进程就是再内存中申请一块内存空间,将需要运行的代码丢进去运行
  • 一个进程对应在内存中就是一块独立的内存空间
  • 多个进程对应在内存中就是多块独立的内存空间
  • 进程与进程之间的数据默认情况下是相互隔离的,如果想要交互可以借助第三方工具/模块
    • 子进程的数据,是与主进程和其他子进程隔离的


# 进程对象的join方法

# join方法

  • [进程对象].join()
  • join方法是让主进程等待指定的子进程的代码运行结束之后,再继续运行
    • 但其他已经在运行中的子进程不受影响

# 注意事项

  • 需在start()之后调用
  • join()是会阻塞主进程,直到子进程子进程运行完

# 例子

import time
from multiprocessing import Process

def task(name, t):
    print("{}进程运行中...".format(name))
    time.sleep(t)
    print("{}进程运行结束!".format(name))

if __name__ == "__main__":
    prc1 = Process(target=task, args=('01', 1))
    prc2 = Process(target=task, args=('02', 2))
    start = time.time()
    prc1.start()
    prc2.start()
    prc1.join()
    prc2.join()
    # 执行到prc2.join()时,prc2已经运行了1秒,再过1秒就会继续执行print(),所以执行完只需要2秒
    print("主 " + str(time.time() - start))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18


# 进程对象及其方法

# 系统区分进程的方式

  • 系统会给每一个运行的进程分配一个PID号,便于区分与管理不同的进程
  • Python第一种查看方式(推荐)
# 导入os模块
import os
# 获取当前进程的PID号
os.getpid()
# 获取当前进程的父进程的PID号
os.getppid()
1
2
3
4
5
6
  • Python第二种查看方式
from multiprocecssing import current_process
# 获取当前进程的PID号
current_process().pid
1
2
3
  • windows系统查看方式
    • cmd中执行tasklist命令
  • linux系统查看方式
    • shell中执行ps aux命令

# 杀死进程对象的进程

  • [进程对象].terminate()
  • 告诉操作系统去杀死进程对象的进程,但杀死需要一定的时间,而代码的运行速度比之更快
  • 所以可能需要sleep(0.1)一些时间,再去判断线程是否存活,结果才会准确

# 判断进程对象的进程是否存活

  • [进程对象].is_alive()


# 僵尸进程与孤儿进程

# 僵尸进程

  • 死了但还没有死透的进程
  • 当子进程运行完后,也就是子进程死后,子进程所占用的进程号不会立马释放
  • 这是为了让父进程能够查看到子进程的一些基本信息,如: 占用的pid号,运行时间等等
  • 所有的进程都会短暂的成为僵尸进程
  • 异常情况
    • 父进程不死,并且在无限制的创建子进程并且子进程也不结束
    • 这就会越来越多的占用系统的pid号
  • pid号什么时候回收
    • 1.父进程等待子进程运行结束
    • 2.父进程调用join方法

# 孤儿进程

  • 父进程意外死亡,但子进程还存活
  • 操作系统会自行去管理孤儿进程,回收相关资源等


# 守护进程

# 介绍

  • 守护着某个进程的进程,被守护的进程活,那么守护进程也活。被守护的进程死,则守护进程也死
  • 当主进程死后,子进程也会跟着结束运行

# 实现方式

[进程对象].daemon = True
[进程对象].start()
1
2

# 例子

import time
from multiprocessing import Process
def task(name, t):
    print("{}进程运行中...".format(name))
    time.sleep(t)
    print("{}进程运行结束!".format(name))
if __name__ == "__main__":
    prc1 = Process(target=task, args=('01', 2))
    prc1.daemon = True
    prc1.start()
    time.sleep(1)
    print("主")
1
2
3
4
5
6
7
8
9
10
11
12
  • 子进程后面的那句话不会执行,是因为子进程已经跟着主进程死了,主进程执行完print就死了


# 互斥锁

# 介绍

  • 针对多个进程,操作同一份数据的时候,会出现数据错乱的问题
  • 针对上述问题,解决方式就是加锁处理
  • 加锁的作用是,将并发变成串行,牺牲效率,但是保证了数据的安全

# 注意事项

  • 锁不要轻易地使用,容易造成 死锁 现象 (写代码时一般不会用到锁,都是内部封装好的)
  • 锁应当只在处理数据的部分加,以来保证数据的安全可靠性,因为加的锁越多,效率越低

# 使用方法

# 先导入 Process 和 Lock
from multiprocessing import Process, Lock

def task(mutex):
    search()
    # 在遇到mutex.acquire()时,子进程需要抢到锁才会继续执行,否则会一直等待
    mutex.acquire()
    buy()
    # 执行完操作后释放锁,不然其他子进程会一直等待锁释放
    mutex.release()

if __name__ == '__main__':
    # 在主进程中生成一把锁,让所有的子进程抢,谁先抢到谁先执行
    mutex = Lock()
    # 在创建子进程对象时,将锁作为参数传给子进程
    p1 = Process(target=task, args=(mutex,))
    p2 = Process(target=task, args=(mutex,))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
  • 总而言之就是,一个子进程拿到锁之后,其他有着相同锁的子进程,除非等抢到锁的进程释放锁,否者会一直等待,直到锁释放后然后抢锁,接着运行

# 使用例子

  • data.json文件
{"ticket": 1}
1
  • mutex.py文件
import json
from multiprocessing import Process, Lock

def search(name):
    # 查票
    with open('data.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
    print('用户: {}, 查询余票: {}'.format(name, data.get('ticket')))

def buyticket(name):
    # 查票
    with open('data.json', 'rt', encoding='utf-8') as f:
        data = json.load(f)
    # 买票
    if data.get('ticket') > 0:
        data['ticket'] -= 1
        with open('data.json', 'wt', encoding='utf-8') as f:
            json.dump(data, f)
        print('用户: {}, 购票成功!'.format(name))
    else:
        print('用户: {}, 购票失败,余票不足!'.format(name))

def run(name, mutex):
    search(name)
    mutex.acquire()
    buyticket(name)
    mutex.release()

if __name__ == '__main__':
    mutex = Lock()
    pr1 = Process(target=run, args=('cecilia',mutex))
    pr2 = Process(target=run, args=('anli',mutex))
    pr1.start()
    pr2.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34


# 队列

# 介绍

  • 队列
    • 先进先出
    • 管道 + 锁
  • 堆栈
    • 先进后出

# queue模块

  • 导入队列模块
    • import queue
  • Queue队列:类用于创建队列对象
    • que = queue.Queue(maxsize=0)
      • maxsize参数
        • 指定队列可以存放的最大数据量,默认0,表示无限制大小
  • put()方法:队列插入元素
    • que.put(item, block=True, timeout=None)
      • 默认情况下,当队列数据存满了之后,如果还往里存数据,则程序会进入阻塞状态,直到有位置让出来
      • item参数
        • 要插入的元素
      • block参数
        • 如果block为True,则在队列存满时,进入阻塞状态,可通过timeout控制阻塞时长
      • timeout参数
        • timeout用于控制block为true时阻塞的时长,超过设定的时间没有空位置供插入元素则报错,None为一直阻塞
  • get()方法:队列弹出元素
    • que.get(block=True, timeout=None)
      • 默认情况下,当队列数据为空之后,如果还执行弹出操作,则程序会进入阻塞状态,直到有数据可以弹出
      • block参数
        • 如果block为True,则在队列为空时,进入阻塞状态,可通过timeout控制阻塞时长
      • timeout参数
        • timeout用于控制block为true时阻塞的时长,超过设定的时间队列还没有元素则报错,None为一直阻塞
  • que.full()、que.empty()、que.get_nowait()方法
    • 这些方法在多进程中,可能是不准的,除非加锁,可能会出现前面的进程刚判断,后面的进程就已经更该了操作,这样就会造成不准的情况
    • 所以一般用put()和get()就行


# IPC (进程间通信)

# IPC介绍

  • 利用队列来使进程与进程之间进行通信
  • 如: 两个进程共用一个队列,然后一个进程put,另一个进程get,这就实现了进程间的通信

# 例如

  • 子进程和主进程交互
from multiprocessing import Process, Queue
def producer(que):
    print('hello girl!')
    que.put('你好!帅哥')
if __name__ == '__main__':
    que = Queue()
    pcs = Process(target=producer, args=[que])
    pcs.start()
    print(que.get())
1
2
3
4
5
6
7
8
9
  • 子进程和子进程交互
from multiprocessing import Process, Queue
def producer(que):
    print('生产!')
    que.put('消费!')
def consumer(que):
    print(que.get())
if __name__ == '__main__':
    que = Queue()
    pcs1 = Process(target=producer, args=[que])
    pcs2 = Process(target=consumer, args=[que])
    pcs1.start()
    pcs2.start()
1
2
3
4
5
6
7
8
9
10
11
12
  • 注意:子进程传入的队列对象,需要是multiprocessing中的Queue类实例化得来的,否则无法正常启动子进程


# 生产者/消费者模型

# 生产者

  • 生产、制作消息,并往队列中插入消息

# 消息队列

  • 作为消费者和生产者交互的媒介,用于存储消息

# 消费者

  • 从队列中取出消息,并消费、处理消息

# 使用例子

from multiprocessing import Process, Queue

def producer(que):
    for i in range(0, 10):
        que.put(i)
        print("produce: {}".format(i))

def consumer(que):
    while True:
        data = que.get()
        if data is None:
            print('consume done!')
            break
        else:
            print('consume: {}'.format(data))

if __name__ == '__main__':
    que = Queue()
    producer_process = Process(target=producer, args=(que,))
    consumer01_process = Process(target=consumer, args=(que,))
    consumer02_process = Process(target=consumer, args=(que,))
    producer_process.start()
    consumer01_process.start()
    consumer02_process.start()
    producer_process.join()
    que.put(None)
    que.put(None)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
  • 等生产者都生产完了,就插入一个None,而消费者就判断如果取到None就结束
  • 有几个消费者就要put个None


# JoinableQueue队列

# 导入模块

  • from multiprocessing import JoinableQueue

# 介绍

  • 与Queue的区别是,每当往该类创建的队列实例里插入put数据时,内部就会有一个计数器+1
  • 当调用task_done时,计数器就会-1

# joinablequeue.join()方法

  • 一般用在主进程中,当计数器为0时,才往后运行,也就是都消费完了才继续执行
  • 一般与daemon一同使用,使主进程结束时,子进程跟着结束
#多进程使用#进程对象方法#僵尸进程#孤儿进程#守护进程#互斥锁#队列#IPC#生产者消费者模型
并发相关介绍
多线程与线程间通信

← 并发相关介绍 多线程与线程间通信→

最近更新
01
二〇二五年四月十七日随笔
04-17
02
二〇二五年四月十六日随笔
04-16
03
二〇二五年四月九日随笔
04-09
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Hoshinozora | MIT License
湘ICP备2022022820号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式