ThankNeko's Blog ThankNeko's Blog
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)

Hoshinozora

尽人事,听天命。
首页
  • 操作系统

    • Linux基础
    • Linux服务
    • WindowsServer笔记
    • Ansible笔记
    • Shell笔记
  • 容器服务

    • Docker笔记
    • Kubernetes笔记
    • Git笔记
  • 数据库服务

    • MySQL笔记
    • ELK笔记
    • Redis笔记
  • 监控服务

    • Zabbix笔记
  • Web服务

    • Nginx笔记
    • Tomcat笔记
  • 数据处理

    • Kettle笔记
  • Python笔记
  • Bootstrap笔记
  • C笔记
  • C++笔记
  • Arduino笔记
  • 分类
  • 标签
  • 归档
  • 随笔
  • 关于
GitHub (opens new window)
  • Python笔记

    • 基础知识

    • 类与面向对象

    • 并发编程

      • 并发相关介绍
      • 多进程与进程间通信
      • 多线程与线程间通信
        • 线程理论
        • 创建线程
        • 线程对象的join方法
        • 线程对象及其他方法
        • 守护线程
        • 线程互斥锁
        • GIL全局解释器锁
        • 进程池和线程池
        • Python多进程和多线程的选择
      • 其他锁与队列
      • 网络IO模型与协程
    • Web编程

    • 模块笔记

    • 其他

  • C笔记

  • C++笔记

  • Arduino笔记

  • Dev
  • Python笔记
  • 并发编程
Hoshinozora
2023-02-25
目录

多线程与线程间通信

# 线程理论

# 什么是线程

  • 进程和线程都是虚拟单位,只是为了我们更加方便的进行描述
  • 进程
    • 进程是资源单位,起一个进程仅仅只是在内存空间中,开辟出一个的独立的空间
    • 用于存放资源,和给线程提供所需资源 (如变量等)
    • 每一个进程必然自带一个线程
  • 线程
    • 进程是执行单位,真正被CPU执行的其实是进程里面的线程,线程指的就是代码的执行过程
    • 执行代码所需要使用到的资源 (如变量等),都是找所在的进程去获取

# 为什么要有线程

  • 多进程和多线程都能实现并行和并发,但开设一个进程需要申请内存空间、拷贝代码。而在进程内开设更多线程,却无需再次申请内存空间和拷贝代码的操作
  • 所以开设线程的开销,要远远小于开设进程的开销,并且同一进程下的多个线程之间数据是共享的
  • 例如:
from threading import Thread
def one():
    global a
    a = 666
def two():
    global a
    print(a)
thr1 = Thread(target=one)
thr2 = Thread(target=two)
thr1.start()
thr1.join()
thr2.start()
1
2
3
4
5
6
7
8
9
10
11
12

# 什么时候用线程

  • 多个功能之间需要数据共享的时候,开多线程处理更加合适


# 创建线程

# 第一种创建方式

  • 指定函数的实现方法
# 导入线程模块
from threading import Thread
# 实例化一个线程对象
thr = Thread(target=函数, args=(实参1, 实参2...))
# 告诉操作系统帮你创建一个线程(线程启动时只会执行指定的函数)
thr.start()
1
2
3
4
5
6
  • 例如
from threading import Thread
def task(var):
    print(var)
thr = Thread(target=task,args=['子'])
thr.start()
print('主')
1
2
3
4
5
6

# 第二种创建方式

  • 实现方法
# 导入线程模块
from threading import Thread
# 继承Thread类
class MyThread(Thread):
    def __init__(self,var):
        super().__init__()
        self.var = var
    # 使用继承类的方法时,需要在类中定义run方法
    def run(self):
        print(self.var)

# 实例化对象
thr = MyThread()
# 告诉操作系统帮你创建一个线程(线程启动时,只会运行run方法中的代码)
thr.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
  • 例如
from threading import Thread
class MyThread(Thread):
    def __init__(self,var):
        super().__init__()
        self.var = var
    def run(self):
        print(self.var)

thr = MyThread('子')
thr.start()
print('主')
1
2
3
4
5
6
7
8
9
10
11


# 线程对象的join方法

# join方法

  • Thread.join()
  • thread对象的join方法,可以使主线程等待子线程执行完毕,然后再继续执行,和进程的join方法差不多

# 例子

import time
from threading import Thread
def test():
    time.sleep(3)
    print('子')
thr = Thread(target=test)
thr.start()
# 执行join()方法后,主线程会等待子线程执行完再继续执行后面的代码
thr.join()
print('主')
1
2
3
4
5
6
7
8
9
10


# 线程对象及其他方法

# 获取当前线程的线程名

  • current_thread().name
    • 获取当前线程的线程名
  • 例如
from threading import Thread, current_thread
def task():
    print('子线程名: ' + current_thread().name)
thr = Thread(target=task)
print('主线程名: ' + current_thread().name)
thr.start()
1
2
3
4
5
6

# 获取当前正在活跃的线程数

  • active_count()
    • 会获取当前活跃的主线程数和子线程数的和
  • 例如
import time
from threading import Thread, active_count
def task():
    time.sleep(2)
    print('子线程执行完毕')
thr = Thread(target=task)
thr.start()
print('活跃线程数: {}'.format(active_count()))
thr.join()
print('活跃线程数: {}'.format(active_count()))
1
2
3
4
5
6
7
8
9
10


# 守护线程

# 介绍

  • 主线程的代码运行结束之后不会立刻结束进程,而是会等待所有其他非守护线程运行结束才会结束
  • 就是说,主线程如果要结束,不会管守护线程是否执行完,只会管非守护线程是否执行完
  • 但在主线程等待非守护线程执行完毕的期间,守护线程仍然会在执行

# 实现方式

[线程对象].daemon = True
[线程对象].start()`
1
2

# 例子

import time
from threading import Thread
def task(i):
    print('子线程执行开始')
    time.sleep(i)
    print('子线程执行完毕')
thr1 = Thread(target=task,args=(2,))
thr2 = Thread(target=task,args=(1,))
thr1.daemon = 1
thr1.start()
thr2.start()
print('主')
1
2
3
4
5
6
7
8
9
10
11
12


# 线程互斥锁

# 介绍

  • 同进程的互斥锁一样,也是将并发变成串行,牺牲效率,以保证数据安全性的

# 使用例子

import time
from threading import Thread, Lock
money = 20
mutex = Lock()
def task(mutex):
    mutex.acquire()
    global money
    tmp = money
    time.sleep(1)
    money = tmp - 1
    mutex.release()
t_list = []
for t in range(10):
    t = Thread(target=task,args=(mutex,))
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(money)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


# GIL全局解释器锁

# 介绍

  • GIL不是Python的特点,而是CPython解释器的特点
  • GIL是一把互斥锁,用来阻止同一进程下的多个线程的同时执行
    • 也就是同一进程下的多个线程无法利用多核优势
    • 也就是CPython解释器不支持多线程并发
    • 如果要实现并发,可以使用多进程来实现并发
  • GIL是用来保证解释器级别的数据的安全
  • GIL是解释型语言的通病,因为解释型语言是一行一行执行的。而编译型是事先编译,事先就已经规避了

# 原因

  • CPython中的内存管理(垃圾回收机制) 不是线程安全的
    • 线程不安全就是指在多线程写操作时,可能出现数据写入不正常的情况
  • 反向推理为什么不安全
    • CPython的垃圾回收使用的是引用计数等,假如CPython支持多线程同时执行
    • 则垃圾回收线程作为一个线程,会一直去扫描内存中没有被名称引用的值
    • 如果此时其他线程刚申请到一个值,但是还没来得及赋值给名称
    • 而这个还没来的及赋值给名称的值,被垃圾回收扫描到了,则会直接回收掉它
    • 这就是垃圾回收在多线程情况下的线程不安全

# 实现方式

  • GIL锁是加在CPython解释器上的,在我们线程通过CPython解释器去执行代码的时候,就需要线程抢到了锁,它才能执行
  • 至于执行时看起来像是同时执行,是因为线程之间的保持状态和切换,所以是多线程并行

# 检验方法

import time
from threading import Thread
money = 20
def task():
    global money
    tmp = money
    money = tmp - 1
t_list = []
for t in range(10):
    t = Thread(target=task)
    t.start()
    t_list.append(t)
for t in t_list:
    t.join()
print(money)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

# 注意

  • 由于GIL锁的原因,多个线程之间并非同时执行,也就是说在无延迟的情况下,GIL锁可以保证一定的数据安全,所以执行结果会为正确的10
  • 这种情况是理想情况,实际使用时几乎不可能无延迟,所以只要有一点延迟,就需要我们自己加锁来保证数据的安全性




# 进程池和线程池

# 介绍

  • 我们必须保证计算机的硬件不会崩溃,在使其正常工作的情况下,最大限度的利用它
    • 如:线程或进程过多,就可能导致程序或计算机卡死、崩溃等
  • 池的是用来保证计算机硬件安全,它使我们能够最大限度的利用计算机,它降低了程序的运行效率,但是保证了程序的运行稳定性
  • 所谓进程池和线程池,就是用来管理限制进程和线程的,限制进程数或线程数等

# 线程池方法

  • 导入线程池类

    • from concurrent.futures import ThreadPoolExecutor
  • 创建线程池

    • thread_pool = ThreadPoolExecutor()
    • 创建线程池,同时指定max_workers固定线程数,为该线程池固定线程数,以限制线程数量,如果为None,则默认为"逻辑CPU数 + 4"
    • 线程池创建后,线程池内会在生成同等于max_workers值的固定线程数
    • 这些被创建出来的线程是固定不变的,每一次执行任务,都是这些线程在执行,不会再出现线程的创建和销毁的过程,这样可以节省资源,且提高效率
  • 提交线程任务

    • thread_pool.submit(task, var1, var2...)
    • 朝线程池中,以异步的方式提交线程任务,需要传入要执行的函数名,后面跟需要传入的参数
    • submit()提交任务后会返回一个Future对象,可用于获取运行状态、返回结果等
  • 获取任务执行的返回结果

    • future.result()
    • 获取任务执行的返回结果,调用submit返回的对象的result方法可手动获取任务的返回结果
    • 调用该方法时,如果任务还没有执行完,则会阻塞直到任务执行完毕,然后再返回结果
  • 绑定回调函数

    • thread_pool.submit(task).add_done_callback(call_back)
    • 线程池绑定回调函数,当任务执行完时,会将submit生成的future对象,传入回调函数中
  • 关闭线程池

    • thread_pool.shutdown(wait=True, cancel_futures=False)
    • 阻塞到线程池中的所有任务执行完毕,然后关闭线程池
    • 默认会等到线程池中的任务全部执行完,再执行关闭线程池操作,关闭后就不能再往里面提交任务了,需要重新创建线程池
    • wait表示是否阻塞到线程池中的任务全部执行完后,再执行关闭操作
    • cancel_futures表示是否直接清除所以线程任务

# 进程池方法

  • 与线程池方法基本一致

  • 导入进程池类

    • from concurrent.futures import ProcessPoolExecutor
  • 创建进程池

    • thread_pool = ProcessPoolExecutor()
    • max_workers指固定进程池,与线程池不同的是,进程池如果为None,则默认为逻辑CPU数
  • 提交进程任务

    • process_pool.submit(task, var1, var2...)
  • 绑定回调函数

    • process_pool.submit(task).add_done_callback(call_back)
  • 获取任务执行的返回结果

    • future.result()
  • 关闭进程池

    • process_pool.shutdown(wait=True, cancel_futures=False)

# 回调机制

  • 在提交异步任务时,给每个异步提交的任务绑定一个回调函数,当任务执行完后,会将其结果传入回调函数
  • 例如
# 定义一个回调函数
def call_back(future):
	# 当任务执行完时,会将submit生成的future对象,传入回调函数中,回调函数就可以获取其返回值进行处理
    print(future.result())
# 提交线程或进程的任务时,绑定回调函数
thread_pool.submit(task).add_done_callback(call_back)
1
2
3
4
5
6


# Python多进程和多线程的选择

# 单核情况下

  • 单核一律使用多线程即可,因为单核开多进程不会提高效率,却会消耗额外资源

# 多核情况下

  • IO密集型(IO工作比较多)
    • 多进程
      • 运算效率不会提升,却会消耗额外资源
      • 同时进程创建时也会有时间开销
    • 多线程 (推荐)
      • 可以节省资源,不会有额外的资源开销
  • 计算密集型(CPU工作比较多)
    • 多进程 (推荐)
      • 虽然会消耗额外资源,但会大幅提高运算效率
    • 多线程
      • 虽然不会消耗额外资源,但也不会提高运算效率

# 实际对比

  • IO密集型使用多进程测试
import time
from multiprocessing import Process
def task():
    time.sleep(2)
if __name__ == '__main__':
    start_time = time.time()
    # 测试2核所以起2个进程
    p_list = []
    for i in range(2):
        p = Process(target=task)
        p.start()
        p_list.append(p)
    
    for p in p_list:
        p.join()
    run_time = time.time() - start_time
    # 执行耗时: 2.117844820022583 秒
    print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  • IO密集型使用多线程测试
import time
from threading import Thread
def task():
    time.sleep(2)
if __name__ == '__main__':
    start_time = time.time()
    # 测试2核所以起2个进程
    p_list = []
    for i in range(2):
        p = Thread(target=task)
        p.start()
        p_list.append(p)
    
    for p in p_list:
        p.join()
    run_time = time.time() - start_time
    # 执行耗时: 2.0063345432281494 秒
    print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  • IO密集型测试结论
    • 多进程没有提高效率,而且会消耗额外资源,所以多线程胜出
  • 计算密集型使用多进程测试
import time
from multiprocessing import Process
def task():
    a = 0
    for i in range(100000000):
        a = a * i
if __name__ == '__main__':
    start_time = time.time()
    # 测试2核所以起2个进程
    p_list = []
    for i in range(2):
        p = Process(target=task)
        p.start()
        p_list.append(p)
    
    for p in p_list:
        p.join()
    run_time = time.time() - start_time
    # 执行耗时: 4.36698579788208 秒
    print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
  • 计算密集型使用多线程测试
import time
from threading import Thread
def task():
    a = 0
    for i in range(100000000):
        a = a * i
if __name__ == '__main__':
    start_time = time.time()
    # 测试2核所以起2个进程
    p_list = []
    for i in range(2):
        p = Thread(target=task)
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
    run_time = time.time() - start_time
    # 执行耗时: 8.072399139404297 秒
    print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
  • 计算密集型测试结论
    • 多进程虽然会消耗额外资源,但是会大幅提高运算效率,所以多进程胜出

# 总结

  • 多进程和多线程都有各自的优势,一般写程序时,我们会在多进程下再开设多线程,这样我们既可以利用多核优势,也可以节省资源消耗
#线程理论#线程使用#线程对象方法#守护线程#互斥锁#GIL全局解释器锁#进程池#线程池
多进程与进程间通信
其他锁与队列

← 多进程与进程间通信 其他锁与队列→

最近更新
01
二〇二五年四月十七日随笔
04-17
02
二〇二五年四月十六日随笔
04-16
03
二〇二五年四月九日随笔
04-09
更多文章>
Theme by Vdoing | Copyright © 2022-2025 Hoshinozora | MIT License
湘ICP备2022022820号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式