网络并发模型
IO并发模型
IO概述
什么是IO
- 在程序中存在读写数据操作行为的事件均是IO行为,比如终端输入输出,文件读写,数据库修改和网络消息收发等
程序分类
- IO密集型程序:在程序执行中有大量IO操作,而运算操作较少。消耗cpu较少,耗时长
- 计算密集型程序:程序运行中运算较多,IO操作相对较少。cpu消耗多,执行速度快,几乎没有阻塞
- IO分类:阻塞IO ,非阻塞IO,IO多路复用等
阻塞IO
- 定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态
- 效率:阻塞IO效率很低。但是由于逻辑简单所以是默认IO行为
阻塞情况
- 因为某种执行条件没有满足造成的函数阻塞
accept
,input
,recv
- 处理IO的时间较长产生的阻塞状态 网络传输 大文件读写
非阻塞IO
定义:通过修改IO属性行为,使原本阻塞的IO变为非阻塞的状态
# 设置套接字为非阻塞IO
sockfd.setblocking(bool)
功能:设置套接字为非阻塞IO
参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞
# 超时检测 :设置一个最长阻塞时间,超过该时间后则不再阻塞等待。
sockfd.settimeout(sec)
功能:设置套接字的超时时间
参数:设置的时间
- 非阻塞IO示例
"""
设置非阻塞的套接字
"""
from socket import *
from time import sleep, ctime
# 日志文件模拟与网络无关IO
file = open("my.log", "a")
# 创建tcp套接字
sock = socket()
sock.bind(("127.0.0.1", 8888))
sock.listen(5)
# 设置为非阻塞
# sock.setblocking(False)
# 设置超时事件
sock.settimeout(3)
# 循环处理客户端连接
while True:
try:
connfd, addr = sock.accept()
print("Connect from", addr)
except timeout as e:
# 模拟一个与accept 无关的事件
msg = "%s : %s\n" % (ctime(), e)
file.write(msg)
except BlockingIOError as e:
# 模拟一个与accept 无关的事件
msg = "%s : %s\n" % (ctime(), e)
file.write(msg)
sleep(2)
else:
# accept 正常执行
data = connfd.recv(1024)
print(data.decode())
IO多路复用
定义
- 同时监控多个IO事件,当哪个IO事件准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率。
具体方案
- select方法:Windows,Linux,Unix
- epoll方法:Linux
- select 方法
rs, ws, xs=select(rlist, wlist, xlist[, timeout])
功能: 监控IO事件,阻塞等待IO发生
参数: rlist 列表 读IO列表,添加等待发生的或者可读的IO事件
wlist 列表 写IO列表,存放要可以主动处理的或者可写的IO事件
xlist 列表 异常IO列表,存放出现异常要处理的IO事件
timeout 超时时间
返回值: rs 列表 rlist中准备就绪的IO
ws 列表 wlist中准备就绪的IO
xs 列表 xlist中准备就绪的IO
- select 方法示例
"""
IO多路复用 基础演示 select
"""
from select import select
from socket import *
# 创建几个IO对象
tcp_sock = socket()
tcp_sock.bind(("0.0.0.0",8888))
tcp_sock.listen(5)
file = open("my.log",'r')
udp_sock = socket(AF_INET,SOCK_DGRAM)
print("开始监控IO")
rs,ws,xs = select([file,udp_sock],[file,udp_sock],[])
print("rlist:",rs)
print("wlist:",ws)
print("xlist:",xs)
- epoll方法
ep = select.epoll()
功能 : 创建epoll对象
返回值: epoll对象
ep.register(fd,event)
功能: 注册关注的IO事件
参数:fd 要关注的IO
event 要关注的IO事件类型
常用类型EPOLLIN 读IO事件(rlist)
EPOLLOUT 写IO事件 (wlist)
EPOLLERR 异常IO (xlist)
e.g. ep.register(sockfd,EPOLLIN|EPOLLERR)
ep.unregister(fd)
功能:取消对IO的关注
参数:IO对象或者IO对象的fileno
events = ep.poll()
功能: 阻塞等待监控的IO事件发生
返回值: 返回发生的IO
events格式 [(fileno,event),()....]
每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型
- epoll方法示例
"""
IO多路复用 基础演示 epoll
"""
from select import *
from socket import *
# 创建几个IO对象
tcp_sock = socket()
tcp_sock.bind(("0.0.0.0",8888))
tcp_sock.listen(5)
file = open("my.log",'r+')
udp_sock = socket(AF_INET,SOCK_DGRAM)
# 创建epoll对象
ep = epoll()
# 关注IO对象
ep.register(tcp_sock,EPOLLIN)
ep.register(udp_sock,EPOLLOUT|EPOLLERR)
# 建立查找字典
map = {
tcp_sock.fileno():tcp_sock,
udp_sock.fileno():udp_sock,
}
print("开始监控IO")
events = ep.poll()
print(events) # 就绪的IO
# 不再关注
ep.unregister(udp_sock)
del map[udp_sock.fileno()]
- select 方法与epoll方法对比
- select 跨平台好,同时关注1024个IO
- epoll 效率比select要高
- epoll 只支持Linux,同时监控IO数量比select要多,同时关注IO数量无上限
- epoll 支持EPOLLET触发方式(边缘触发)
- 水平触发:当有IO事件发生时如果没有处理则一直返回提醒
- 边缘触发:当有IO事件发生时如果没有处理则不会一直提醒,直到下次再有事件发生再次提醒处理
IO多路复用并发模型
利用IO多路复用等技术,同时处理多个客户端IO请求
- 优点:资源消耗少,能同时高效处理多个IO行为
- 缺点:只针对处理并发产生的IO事件
- 适用情况:HTTP请求,网络传输等都是IO行为,可以通过IO多路复用监控多个客户端的IO请求
网络并发服务实现过程
- 将套接字对象设置为关注的IO,通常设置为非阻塞状态
- 通过IO多路复用方法提交,进行IO监控
- 阻塞等待,当监控的IO有事件发生时结束阻塞
- 遍历返回值列表,确定就绪的IO事件类型
- 处理发生的IO事件
- 继续循环监控IO发生
- IO多路复用并发模型
################################# select 方法 ####################################
"""
基于select的并发服务模型
使用函数完成
"""
from select import select
from socket import *
# 服务器地址
HOST = "0.0.0.0"
PORT = 8888
ADDR = (HOST,PORT)
# 监控列表
rlist = []
wlist = []
xlist = []
# 处理客户端连接
def connect_client(sock):
connfd, addr = sock.accept()
print("Connect from", addr)
connfd.setblocking(False)
rlist.append(connfd) # 增加关注对象
# 处理客户端消息
def handle_client(connfd):
data = connfd.recv(1024)
# 处理客户端退出
if not data:
rlist.remove(connfd) # 不再关注
connfd.close()
return
print(data.decode())
connfd.send(b"Thanks")
def main():
# 创建监听套接字
sock = socket()
sock.bind(ADDR)
sock.listen(3)
# 配合非阻塞IO防止网络中断带来的内部阻塞
sock.setblocking(False)
rlist.append(sock) # 初始监控的IO对象
# 循环监控关注的IO发生
while True:
rs,ws,xs = select(rlist,wlist,xlist)
for r in rs:
if r is sock:
connect_client(r) # 连接客户端
else:
handle_client(r) # 处理客户端消息
if __name__ == '__main__':
main()
################################ epoll 方法 ################################
"""
基于epoll的并发服务模型
使用类实现
"""
from select import *
from socket import *
class EpollServer:
def __init__(self, host="", port=0):
self.host = host
self.port = port
self.address = (host, port)
self.sock = self._create_socket()
self.ep = epoll()
self.map = {} # 查找字典
def _create_socket(self):
sock = socket()
sock.bind(self.address)
sock.setblocking(False)
return sock
# 处理客户端连接
def _connect_client(self, fd):
connfd, addr = self.map[fd].accept()
print("Connect from", addr)
connfd.setblocking(False)
# 增加关注对象,设置边缘触发
self.ep.register(connfd, EPOLLIN | EPOLLET)
self.map[connfd.fileno()] = connfd # 维护字典
# 处理客户端消息
def _handle_client(self, fd):
data = self.map[fd].recv(1024)
# 处理客户端退出
if not data:
self.ep.unregister(fd) # 不再关注
self.map[fd].close()
del self.map[fd] # 从字典删除
return
print(data.decode())
self.map[fd].send(b"Thanks")
# 启动服务
def serve_forever(self):
self.sock.listen(3)
print("Listen the port %d" % self.port)
self.ep.register(self.sock, EPOLLIN) # 设置关注
self.map[self.sock.fileno()] = self.sock
while True:
events = self.ep.poll()
# 循环查看哪个IO发生就处理哪个
for fd, event in events:
if fd == self.sock.fileno():
self._connect_client(fd)
elif event == EPOLLIN:
self._handle_client(fd)
if __name__ == '__main__':
ep = EpollServer(host="0.0.0.0", port=8888)
ep.serve_forever() # 启动服务
最后一次更新于2022-12-12 15:48
0 条评论