多线程与线程间通信
# 线程理论
# 什么是线程
- 进程和线程都是虚拟单位,只是为了我们更加方便的进行描述
- 进程
- 进程是资源单位,起一个进程仅仅只是在内存空间中,开辟出一个的独立的空间
- 用于存放资源,和给线程提供所需资源 (如变量等)
- 每一个进程必然自带一个线程
- 线程
- 进程是执行单位,真正被CPU执行的其实是进程里面的线程,线程指的就是代码的执行过程
- 执行代码所需要使用到的资源 (如变量等),都是找所在的进程去获取
# 为什么要有线程
- 多进程和多线程都能实现并行和并发,但开设一个进程需要申请内存空间、拷贝代码。而在进程内开设更多线程,却无需再次申请内存空间和拷贝代码的操作
- 所以开设线程的开销,要远远小于开设进程的开销,并且同一进程下的多个线程之间数据是共享的
- 例如:
from threading import Thread
def one():
global a
a = 666
def two():
global a
print(a)
thr1 = Thread(target=one)
thr2 = Thread(target=two)
thr1.start()
thr1.join()
thr2.start()
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 什么时候用线程
- 多个功能之间需要数据共享的时候,开多线程处理更加合适
# 创建线程
# 第一种创建方式
- 指定函数的实现方法
# 导入线程模块
from threading import Thread
# 实例化一个线程对象
thr = Thread(target=函数, args=(实参1, 实参2...))
# 告诉操作系统帮你创建一个线程(线程启动时只会执行指定的函数)
thr.start()
1
2
3
4
5
6
2
3
4
5
6
- 例如
from threading import Thread
def task(var):
print(var)
thr = Thread(target=task,args=['子'])
thr.start()
print('主')
1
2
3
4
5
6
2
3
4
5
6
# 第二种创建方式
- 实现方法
# 导入线程模块
from threading import Thread
# 继承Thread类
class MyThread(Thread):
def __init__(self,var):
super().__init__()
self.var = var
# 使用继承类的方法时,需要在类中定义run方法
def run(self):
print(self.var)
# 实例化对象
thr = MyThread()
# 告诉操作系统帮你创建一个线程(线程启动时,只会运行run方法中的代码)
thr.start()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
- 例如
from threading import Thread
class MyThread(Thread):
def __init__(self,var):
super().__init__()
self.var = var
def run(self):
print(self.var)
thr = MyThread('子')
thr.start()
print('主')
1
2
3
4
5
6
7
8
9
10
11
2
3
4
5
6
7
8
9
10
11
# 线程对象的join方法
# join方法
Thread.join()
- thread对象的join方法,可以使主线程等待子线程执行完毕,然后再继续执行,和进程的join方法差不多
# 例子
import time
from threading import Thread
def test():
time.sleep(3)
print('子')
thr = Thread(target=test)
thr.start()
# 执行join()方法后,主线程会等待子线程执行完再继续执行后面的代码
thr.join()
print('主')
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 线程对象及其他方法
# 获取当前线程的线程名
current_thread().name
- 获取当前线程的线程名
- 例如
from threading import Thread, current_thread
def task():
print('子线程名: ' + current_thread().name)
thr = Thread(target=task)
print('主线程名: ' + current_thread().name)
thr.start()
1
2
3
4
5
6
2
3
4
5
6
# 获取当前正在活跃的线程数
active_count()
- 会获取当前活跃的主线程数和子线程数的和
- 例如
import time
from threading import Thread, active_count
def task():
time.sleep(2)
print('子线程执行完毕')
thr = Thread(target=task)
thr.start()
print('活跃线程数: {}'.format(active_count()))
thr.join()
print('活跃线程数: {}'.format(active_count()))
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
# 守护线程
# 介绍
- 主线程的代码运行结束之后不会立刻结束进程,而是会等待所有其他非守护线程运行结束才会结束
- 就是说,主线程如果要结束,不会管守护线程是否执行完,只会管非守护线程是否执行完
- 但在主线程等待非守护线程执行完毕的期间,守护线程仍然会在执行
# 实现方式
[线程对象].daemon = True
[线程对象].start()`
1
2
2
# 例子
import time
from threading import Thread
def task(i):
print('子线程执行开始')
time.sleep(i)
print('子线程执行完毕')
thr1 = Thread(target=task,args=(2,))
thr2 = Thread(target=task,args=(1,))
thr1.daemon = 1
thr1.start()
thr2.start()
print('主')
1
2
3
4
5
6
7
8
9
10
11
12
2
3
4
5
6
7
8
9
10
11
12
# 线程互斥锁
# 介绍
- 同进程的互斥锁一样,也是将并发变成串行,牺牲效率,以保证数据安全性的
# 使用例子
import time
from threading import Thread, Lock
money = 20
mutex = Lock()
def task(mutex):
mutex.acquire()
global money
tmp = money
time.sleep(1)
money = tmp - 1
mutex.release()
t_list = []
for t in range(10):
t = Thread(target=task,args=(mutex,))
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(money)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# GIL全局解释器锁
# 介绍
- GIL不是Python的特点,而是CPython解释器的特点
- GIL是一把互斥锁,用来阻止同一进程下的多个线程的同时执行
- 也就是同一进程下的多个线程无法利用多核优势
- 也就是CPython解释器不支持多线程并发
- 如果要实现并发,可以使用多进程来实现并发
- GIL是用来保证解释器级别的数据的安全
- GIL是解释型语言的通病,因为解释型语言是一行一行执行的。而编译型是事先编译,事先就已经规避了
# 原因
- CPython中的内存管理(垃圾回收机制) 不是线程安全的
- 线程不安全就是指在多线程写操作时,可能出现数据写入不正常的情况
- 反向推理为什么不安全
- CPython的垃圾回收使用的是引用计数等,假如CPython支持多线程同时执行
- 则垃圾回收线程作为一个线程,会一直去扫描内存中没有被名称引用的值
- 如果此时其他线程刚申请到一个值,但是还没来得及赋值给名称
- 而这个还没来的及赋值给名称的值,被垃圾回收扫描到了,则会直接回收掉它
- 这就是垃圾回收在多线程情况下的线程不安全
# 实现方式
- GIL锁是加在CPython解释器上的,在我们线程通过CPython解释器去执行代码的时候,就需要线程抢到了锁,它才能执行
- 至于执行时看起来像是同时执行,是因为线程之间的保持状态和切换,所以是多线程并行
# 检验方法
import time
from threading import Thread
money = 20
def task():
global money
tmp = money
money = tmp - 1
t_list = []
for t in range(10):
t = Thread(target=task)
t.start()
t_list.append(t)
for t in t_list:
t.join()
print(money)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 注意
- 由于GIL锁的原因,多个线程之间并非同时执行,也就是说在无延迟的情况下,GIL锁可以保证一定的数据安全,所以执行结果会为正确的10
- 这种情况是理想情况,实际使用时几乎不可能无延迟,所以只要有一点延迟,就需要我们自己加锁来保证数据的安全性
# 进程池和线程池
# 介绍
- 我们必须保证计算机的硬件不会崩溃,在使其正常工作的情况下,最大限度的利用它
- 如:线程或进程过多,就可能导致程序或计算机卡死、崩溃等
- 池的是用来保证计算机硬件安全,它使我们能够最大限度的利用计算机,它降低了程序的运行效率,但是保证了程序的运行稳定性
- 所谓进程池和线程池,就是用来管理限制进程和线程的,限制进程数或线程数等
# 线程池方法
导入线程池类
from concurrent.futures import ThreadPoolExecutor
创建线程池
thread_pool = ThreadPoolExecutor()
- 创建线程池,同时指定max_workers固定线程数,为该线程池固定线程数,以限制线程数量,如果为None,则默认为"逻辑CPU数 + 4"
- 线程池创建后,线程池内会在生成同等于max_workers值的固定线程数
- 这些被创建出来的线程是固定不变的,每一次执行任务,都是这些线程在执行,不会再出现线程的创建和销毁的过程,这样可以节省资源,且提高效率
提交线程任务
thread_pool.submit(task, var1, var2...)
- 朝线程池中,以异步的方式提交线程任务,需要传入要执行的函数名,后面跟需要传入的参数
submit()
提交任务后会返回一个Future对象,可用于获取运行状态、返回结果等
获取任务执行的返回结果
future.result()
- 获取任务执行的返回结果,调用submit返回的对象的result方法可手动获取任务的返回结果
- 调用该方法时,如果任务还没有执行完,则会阻塞直到任务执行完毕,然后再返回结果
绑定回调函数
thread_pool.submit(task).add_done_callback(call_back)
- 线程池绑定回调函数,当任务执行完时,会将submit生成的future对象,传入回调函数中
关闭线程池
thread_pool.shutdown(wait=True, cancel_futures=False)
- 阻塞到线程池中的所有任务执行完毕,然后关闭线程池
- 默认会等到线程池中的任务全部执行完,再执行关闭线程池操作,关闭后就不能再往里面提交任务了,需要重新创建线程池
wait
表示是否阻塞到线程池中的任务全部执行完后,再执行关闭操作cancel_futures
表示是否直接清除所以线程任务
# 进程池方法
与线程池方法基本一致
导入进程池类
from concurrent.futures import ProcessPoolExecutor
创建进程池
thread_pool = ProcessPoolExecutor()
- max_workers指固定进程池,与线程池不同的是,进程池如果为None,则默认为逻辑CPU数
提交进程任务
process_pool.submit(task, var1, var2...)
绑定回调函数
process_pool.submit(task).add_done_callback(call_back)
获取任务执行的返回结果
future.result()
关闭进程池
process_pool.shutdown(wait=True, cancel_futures=False)
# 回调机制
- 在提交异步任务时,给每个异步提交的任务绑定一个回调函数,当任务执行完后,会将其结果传入回调函数
- 例如
# 定义一个回调函数
def call_back(future):
# 当任务执行完时,会将submit生成的future对象,传入回调函数中,回调函数就可以获取其返回值进行处理
print(future.result())
# 提交线程或进程的任务时,绑定回调函数
thread_pool.submit(task).add_done_callback(call_back)
1
2
3
4
5
6
2
3
4
5
6
# Python多进程和多线程的选择
# 单核情况下
- 单核一律使用多线程即可,因为单核开多进程不会提高效率,却会消耗额外资源
# 多核情况下
- IO密集型(IO工作比较多)
- 多进程
- 运算效率不会提升,却会消耗额外资源
- 同时进程创建时也会有时间开销
- 多线程 (推荐)
- 可以节省资源,不会有额外的资源开销
- 多进程
- 计算密集型(CPU工作比较多)
- 多进程 (推荐)
- 虽然会消耗额外资源,但会大幅提高运算效率
- 多线程
- 虽然不会消耗额外资源,但也不会提高运算效率
- 多进程 (推荐)
# 实际对比
- IO密集型使用多进程测试
import time
from multiprocessing import Process
def task():
time.sleep(2)
if __name__ == '__main__':
start_time = time.time()
# 测试2核所以起2个进程
p_list = []
for i in range(2):
p = Process(target=task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
run_time = time.time() - start_time
# 执行耗时: 2.117844820022583 秒
print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- IO密集型使用多线程测试
import time
from threading import Thread
def task():
time.sleep(2)
if __name__ == '__main__':
start_time = time.time()
# 测试2核所以起2个进程
p_list = []
for i in range(2):
p = Thread(target=task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
run_time = time.time() - start_time
# 执行耗时: 2.0063345432281494 秒
print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- IO密集型测试结论
- 多进程没有提高效率,而且会消耗额外资源,所以多线程胜出
- 计算密集型使用多进程测试
import time
from multiprocessing import Process
def task():
a = 0
for i in range(100000000):
a = a * i
if __name__ == '__main__':
start_time = time.time()
# 测试2核所以起2个进程
p_list = []
for i in range(2):
p = Process(target=task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
run_time = time.time() - start_time
# 执行耗时: 4.36698579788208 秒
print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
- 计算密集型使用多线程测试
import time
from threading import Thread
def task():
a = 0
for i in range(100000000):
a = a * i
if __name__ == '__main__':
start_time = time.time()
# 测试2核所以起2个进程
p_list = []
for i in range(2):
p = Thread(target=task)
p.start()
p_list.append(p)
for p in p_list:
p.join()
run_time = time.time() - start_time
# 执行耗时: 8.072399139404297 秒
print('执行耗时: {} 秒'.format(run_time))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
- 计算密集型测试结论
- 多进程虽然会消耗额外资源,但是会大幅提高运算效率,所以多进程胜出
# 总结
- 多进程和多线程都有各自的优势,一般写程序时,我们会在多进程下再开设多线程,这样我们既可以利用多核优势,也可以节省资源消耗