网络IO模型与协程
# 网络IO模型介绍
# 介绍
- 主机A
- 由于应用程序是不能直接操作硬件的,所以在进程想要发送数据到其他主机时
- 就需要将应用数据拷贝给内核,再由内核,去进行各种协议的封装,最后通过网络发送出去(copy datagram)
- 主机B
- 发送系统调用,等待数据准备,根据使用的IO模型不同,接收数据的方式也不同
- 操作系统接收到数据并处理之后,会将数据从内核拷贝到对应的进程中(copy datagram)
- 然后应用程序再对接收到的数据进行处理
# 五种网络IO模型
阻塞IO
(blocking IO)非阻塞IO
(nonblocking IO)IO多路复用
(IO multiplexing)信号驱动IO
(signal driven IO) 一般用不上异步IO
(asynchronous IO)
# IO模型对比
- 异步IO > IO多路复用 > 非阻塞IO > 阻塞IO
# 阻塞IO
# 介绍
- 一直IO阻塞,直到接收到数据为止,比如
accept、recv
等等
# 非阻塞IO
# 介绍
- 系统调用之后,无论是否有数据,都会立刻获得一个结果
- 它会将所有的阻塞操作变成非阻塞,收到响应后,即可以选择再次系统调用获取数据,也可以做其他操作
- 如果系统调用时,内核已经获取到了数据,则内核会将数据拷贝到进程
# 例子
import socket
server = socket.socket()
server.bind(('0.0.0.0', 8080))
server.listen(5)
server.setblocking(False)
conn_list = []
while 1:
try:
conn, addr = server.accept()
conn_list.append(conn)
print(f'接收到客户端[{addr}]的连接!')
except BlockingIOError:
pass
# 通过循环和切换来处理连接和消息
for conn in conn_list.copy():
try:
data = conn.recv(1024)
# 如果连接断开
if len(data) == 0:
conn.close()
# 删除无用的连接
conn_list.remove(conn)
print(f'客户端[{addr}]断开连接!')
conn.send(data.upper())
except BlockingIOError:
continue
except ConnectionResetError:
conn.close()
conn_list.remove(conn)
print(f'客户端[{addr}]断开连接!')
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 总结
- 虽然非阻塞IO看上去厉害,但是该模型会长时间占用CPU,并且不干活,类似于while死循环
- 实际应用中,也不会考虑直接使用非阻塞IO模型
# IO多路复用
# 介绍
- 多个进程的IO注册到同一个管道上,这个管道会统一和内核进行交互,也就是内核用一条管道来帮你监管多个socket对象和conn对象,多个IO对象只需要阻塞一次
- 操作系统会轮询的去检查对象是否被触发
- 只要被监管的对象被触发,就会返回被监管对象自身,进程就可以调用该对象的方法获取数据
- 相当于由IO多路复用模型,来帮你判断被监管对象是否有收到数据
- 监管机制(如
select、poll、epoll
等),是操作系统提供的,python如果想要使用,可以使用它们的模块
# 注意
- 被监管的IO需要是非阻塞的,阻塞的任务将由内核来完成 (IO多路复用)
- 当监管的对象只有一个的时候,IO多路复用的效率甚至不如阻塞IO,但是IO多路复用可以一次性监管很多个对象,且多个对象用一条管道,可以提高效率
# select函数
- 介绍
- 多个进程的IO可以注册到同一个
select
上,当用户进程调用该select
,select
会监听所有注册好的IO- 这里的IO复用模型,并没有向内核注册信号处理函数,所以他是阻塞的。但被监管的对象需要是非阻塞
- 在被监听的IO所需的数据还没有准备好时,
select
调用进程会阻塞,直到任意一个IO对象接收到数据 - 当任意一个IO接收到数据,
select
调用就会返回IO对象自身,进程就可以通过调用该对象的非阻塞IO方法来获取数据,比如accept、recv
等等
- 多个进程的IO可以注册到同一个
- 使用方法
# 导入模块
import select
# 监听非阻塞对象,需要将对象放到列表里
read_list = [server]
r_list, w_list, x_list = select.select(read_list, [], [])
1
2
3
4
5
2
3
4
5
- 例子
from os import read
import select
import socket
server = socket.socket()
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)
read_list = [server]
while 1:
r_list, w_list, x_list = select.select(read_list, [], [])
for i in r_list:
# 如果接收到数据的对象是server
if i is server:
# 调用接收到的server对象,其accept方法
# 因为前面的阻塞已经监测到有数据,所以此处一定能获取到数据
conn, addr = i.accept()
# 将conn对象,也添加到监听对象列表
read_list.append(conn)
# 如果接收到数据的对象是客户端的conn
else:
try:
res = i.recv(1024)
# 如果被关闭连接,则回收连接资源并从监管列表中移除
if (len(res)) == 0:
i.close()
read_list.remove(i)
print(res)
i.send(res.upper())
except ConnectionResetError:
continue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# select机制 & poll机制 & epoll机制
select特点
- select在windows和linux都可以使用
- 单个进程监控的文件描述符有限,通常为1024*8个文件描述符
- 决定着最多能监管多少个对象
- 文件描述符数量越多,性能越差
- 内核/用户数据拷贝频繁,操作复杂
- select在调用之前,需要在程序里将要监控的对象添加到fed_set集合中,然后加载到内核进行监控
- 当内核事件发生,再将fed_set集合中没有发生的文件描述符清空,然后拷贝到用户区,和数组中的文件描述符进行比对
- 再调用selecct也是如此,每次调用,都需要了来回拷贝
- 轮询时间效率低,需要遍历整个数组才知道谁发生了变化,轮询代价大,同时可能会造成延迟
poll特点
- poll只能在linux使用
- poll基于链表来存储,没有最大轮询数量的限制
- 文件描述符数量越多,性能越差
- 除以上之外,和select基本没有差别
epoll特点
- epoll只能在linux使用
- epoll会给每一个监管对象,绑定一个回调机制,一旦有响应,回调机制立即发起提醒
- 共同点
- 本质上都是同步I/O,因为他们都需要在读写事件就绪后由进程负责进行读写,也就是说这个读写过程是阻塞的
# selectors模块
- 该模块会自动针对不同操作系统,选择不同的监管机制
select/poll/epoll
# 异步IO
# 介绍
- 异步IO模型是所有模型中,效率最高的,也是使用最广泛的
- 提交任务后,可以做自己的事,直到有收到通知触发回调函数
- 相关模块:
asyncio
- 相关框架:
sanic、tronado、twisted
# 协程
# 介绍
- 协程是人的设计思想,计算机并没有协程的概念,协程可以使单个线程可以实现并发的效果
- 实现原理
- 在代码层面对程序的IO操作进行切换处理,一旦遇到IO操作,我们就通过代码来完成切换
- 使系统认为我们的程序一直在运行,没有进行IO,从而欺骗操作系统
# gevent模块(不推荐)
# 介绍
- 核心就是遇到IO操作,会自动切换到其他协程
- 如果其他协程也在IO,则可以再进行切换,同时通过协程间切换的动作也可以避开阻塞态
- 如果正在执行的协程没有遇到IO,则会将该协程先执行完,然后才会切换到其他的协程
- 只在遇到阻塞的时候切换,没有轮询和线程开销
# 使用方法
# 从gevent中导入猴子补丁,必须打补丁,才能正常使用,它会将python标准库中的一些阻塞操作变为非阻塞,比如:它会将python内置的socket换成封装了IO多路复用的socket
# 使用猴子补丁monkey.patch_all()可以写在第一行
from gevent import monkey;monkey.patch_all()
# 导入gevent库中需要用到的模块,spawn类用于创建协程对象
from gevent import spawn
# 创建协程对象,并异步执行
# 传入函数作为协程的任务,后面的都将作为参数,它会监控任务中的IO操作,一旦遇到IO操作就会切换到其他协程
g = spawn(func, var1, var2...)
# 阻塞,直到协程对象执行完毕
g1.join()
1
2
3
4
5
6
7
8
9
10
11
12
13
2
3
4
5
6
7
8
9
10
11
12
13
# 使用例子
from gevent import monkey;monkey.patch_all()
from gevent import spawn
# 通过切换协程,实现了避免阻塞的效果
import time
def a():
print('a-1')
time.sleep(2)
print('a-2')
def b():
print('b-1')
time.sleep(1)
print('b-2')
starttime = time.time()
g1 = spawn(a)
g2 = spawn(b)
g1.join()
g2.join()
print('运行时长:{}'.format(time.time() - starttime))
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# asyncio模块(推荐)
# 介绍
- asyncio的编程模型就是一个消息循环,我们从asyncio模块中获取一个EventLoop池
- 然后把需要执行的协程扔到EventLoop中执行,就实现了异步IO
# 使用例子1
# 导入asyncio模块
import asyncio
# 把一个生成器标记为coroutine类型,然后把这个coroutine扔到EventLoop中执行
# @asyncio.coroutine他会将函数装饰成一个协程对象
@asyncio.coroutine
def hello():
print("Hello world!")
# 异步调用asyncio.sleep(1)
r = yield from asyncio.sleep(1)
print("Hello again!")
# 获取一个EventLoop池
loop = asyncio.get_event_loop()
# 异步执行EventLoop池中的协程
loop.run_until_complete(hello())
# 关闭EventLoop池
loop.close()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
- hello()会首先打印出Hello world!
- 然后yield from语法会挂起当前协程,去执行其他协程,并等待该语法所在的协程返回值,再继续执行
- asyncio.sleep()是一个coroutine,所以线程不会真的等待asyncio.sleep(),而是直接中断并执行下一个协程
- asyncio.sleep(1)是一个耗时1秒的IO操作,但在此期间,主线程并未等待而是去执行EventLoop中其他可以执行的协程了,也就是说实现了并发执行
- 直到asyncio.sleep()返回时,线程从yield from拿到返回值后,才会接着执行下一行语句 print("Hello again!")
# 语法糖
- 基本用法
- 在普通的函数前面加
async
关键字 - 在需要等待的函数调用签名加
await
关键字 - 使用
asyncio.run(task)
运行程序
- 在普通的函数前面加
- await注意
await
表示在这个地方等待执行完再往下执行,它会挂起自己的协程,并等待另一个协程完成,也就是变成串行await
只能在带有async
关键字的函数中运行await
后面的对象需要是一个Awaitable
,或者实现了相关的协议
- 语法糖说明
async def hello()
同等于
@asyncio.coroutine
def hello()
---
r = await asyncio.sleep(1)
同等于
r = yield from asyncio.sleep(1)
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 使用例子2
# 导入asyncio模块
import asyncio
# 将hello函数装饰成一个协程对象
async def hello():
print('Hello world!')
# 执行到此处时,会去执行当前hello协程之外的其他协程,直到asyncio.sleep()结束并返回值
await asyncio.sleep(1)
print('Hello again!')
# 使用wait收集两个hello协程对象到池中
tasks = asyncio.wait([asyncio.create_task(hello()), asyncio.create_task(hello())])
# 异步执行EventLoop池中的协程,执行完毕后,会将任务执行的结果返回,如:tasks = asyncio.run(asyncio.wait(task_list))
asyncio.run(tasks)
print("Done")
for task in tasks:
for i in task:
# 输出任务执行结果
print(i.result())
# 可在池任务运行时,添加任务
asyncio.create_task(任务函数())
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21