网络并发模型

IO并发模型

IO概述

什么是IO

  • 在程序中存在读写数据操作行为的事件均是IO行为,比如终端输入输出,文件读写,数据库修改和网络消息收发等

程序分类

  • IO密集型程序:在程序执行中有大量IO操作,而运算操作较少。消耗cpu较少,耗时长
  • 计算密集型程序:程序运行中运算较多,IO操作相对较少。cpu消耗多,执行速度快,几乎没有阻塞
  • IO分类:阻塞IO ,非阻塞IO,IO多路复用等

阻塞IO

  • 定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态
  • 效率:阻塞IO效率很低。但是由于逻辑简单所以是默认IO行为

阻塞情况

  • 因为某种执行条件没有满足造成的函数阻塞accept,input,recv
  • 处理IO的时间较长产生的阻塞状态 网络传输 大文件读写

非阻塞IO

定义:通过修改IO属性行为,使原本阻塞的IO变为非阻塞的状态

# 设置套接字为非阻塞IO
sockfd.setblocking(bool)
功能:设置套接字为非阻塞IO
参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞


# 超时检测 :设置一个最长阻塞时间,超过该时间后则不再阻塞等待。
sockfd.settimeout(sec)
功能:设置套接字的超时时间
参数:设置的时间
  • 非阻塞IO示例
"""
设置非阻塞的套接字
"""
from socket import *
from time import sleep, ctime

# 日志文件模拟与网络无关IO
file = open("my.log", "a")

# 创建tcp套接字
sock = socket()
sock.bind(("127.0.0.1", 8888))
sock.listen(5)

# 设置为非阻塞
# sock.setblocking(False)

# 设置超时事件
sock.settimeout(3)

# 循环处理客户端连接
while True:
  try:
      connfd, addr = sock.accept()
      print("Connect from", addr)
  except timeout as e:
      # 模拟一个与accept 无关的事件
      msg = "%s : %s\n" % (ctime(), e)
      file.write(msg)
  except BlockingIOError as e:
      # 模拟一个与accept 无关的事件
      msg = "%s : %s\n" % (ctime(), e)
      file.write(msg)
      sleep(2)
  else:
      # accept 正常执行
      data = connfd.recv(1024)
      print(data.decode())

IO多路复用

定义

  • 同时监控多个IO事件,当哪个IO事件准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率。

具体方案

  • select方法:Windows,Linux,Unix
  • epoll方法:Linux

  • select 方法
rs, ws, xs=select(rlist, wlist, xlist[, timeout])
功能: 监控IO事件,阻塞等待IO发生
参数: rlist  列表  读IO列表,添加等待发生的或者可读的IO事件
      wlist  列表  写IO列表,存放要可以主动处理的或者可写的IO事件
      xlist  列表 异常IO列表,存放出现异常要处理的IO事件
      timeout  超时时间

返回值: rs 列表  rlist中准备就绪的IO
        ws 列表  wlist中准备就绪的IO
          xs 列表  xlist中准备就绪的IO
  • select 方法示例
"""
IO多路复用 基础演示 select
"""
from select import select
from socket import *

# 创建几个IO对象
tcp_sock = socket()
tcp_sock.bind(("0.0.0.0",8888))
tcp_sock.listen(5)

file = open("my.log",'r')

udp_sock = socket(AF_INET,SOCK_DGRAM)

print("开始监控IO")
rs,ws,xs = select([file,udp_sock],[file,udp_sock],[])
print("rlist:",rs)
print("wlist:",ws)
print("xlist:",xs)
  • epoll方法
ep = select.epoll()
功能 : 创建epoll对象
返回值: epoll对象

ep.register(fd,event)   
功能: 注册关注的IO事件
参数:fd  要关注的IO
    event  要关注的IO事件类型
    常用类型EPOLLIN  读IO事件(rlist)
    EPOLLOUT 写IO事件 (wlist)
    EPOLLERR 异常IO  (xlist)
    e.g. ep.register(sockfd,EPOLLIN|EPOLLERR)

ep.unregister(fd)
功能:取消对IO的关注
参数:IO对象或者IO对象的fileno

events = ep.poll()
功能: 阻塞等待监控的IO事件发生
返回值: 返回发生的IO
        events格式  [(fileno,event),()....]
        每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型
  • epoll方法示例
"""
IO多路复用 基础演示 epoll
"""
from select import *
from socket import *

# 创建几个IO对象
tcp_sock = socket()
tcp_sock.bind(("0.0.0.0",8888))
tcp_sock.listen(5)

file = open("my.log",'r+')

udp_sock = socket(AF_INET,SOCK_DGRAM)


# 创建epoll对象
ep = epoll()

# 关注IO对象
ep.register(tcp_sock,EPOLLIN)
ep.register(udp_sock,EPOLLOUT|EPOLLERR)

# 建立查找字典
map = {
    tcp_sock.fileno():tcp_sock,
    udp_sock.fileno():udp_sock,
}

print("开始监控IO")
events = ep.poll()
print(events) # 就绪的IO

# 不再关注
ep.unregister(udp_sock)
del map[udp_sock.fileno()]
  • select 方法与epoll方法对比
  • select 跨平台好,同时关注1024个IO
  • epoll 效率比select要高
  • epoll 只支持Linux,同时监控IO数量比select要多,同时关注IO数量无上限
  • epoll 支持EPOLLET触发方式(边缘触发)
  • 水平触发:当有IO事件发生时如果没有处理则一直返回提醒
  • 边缘触发:当有IO事件发生时如果没有处理则不会一直提醒,直到下次再有事件发生再次提醒处理

IO多路复用并发模型

利用IO多路复用等技术,同时处理多个客户端IO请求

  • 优点:资源消耗少,能同时高效处理多个IO行为
  • 缺点:只针对处理并发产生的IO事件
  • 适用情况:HTTP请求,网络传输等都是IO行为,可以通过IO多路复用监控多个客户端的IO请求

网络并发服务实现过程

  1. 将套接字对象设置为关注的IO,通常设置为非阻塞状态
  2. 通过IO多路复用方法提交,进行IO监控
  3. 阻塞等待,当监控的IO有事件发生时结束阻塞
  4. 遍历返回值列表,确定就绪的IO事件类型
  5. 处理发生的IO事件
  6. 继续循环监控IO发生
  • IO多路复用并发模型

################################# select 方法 ####################################
"""
基于select的并发服务模型
使用函数完成
"""
from select import select
from socket import *

# 服务器地址
HOST = "0.0.0.0"
PORT = 8888
ADDR = (HOST,PORT)

# 监控列表
rlist = []
wlist = []
xlist = []

# 处理客户端连接
def connect_client(sock):
    connfd, addr = sock.accept()
    print("Connect from", addr)
    connfd.setblocking(False)
    rlist.append(connfd)  # 增加关注对象

# 处理客户端消息
def handle_client(connfd):
    data = connfd.recv(1024)
    # 处理客户端退出
    if not data:
        rlist.remove(connfd) # 不再关注
        connfd.close()
        return
    print(data.decode())
    connfd.send(b"Thanks")


def main():
    # 创建监听套接字
    sock = socket()
    sock.bind(ADDR)
    sock.listen(3)
    # 配合非阻塞IO防止网络中断带来的内部阻塞
    sock.setblocking(False)
    rlist.append(sock) #  初始监控的IO对象

    # 循环监控关注的IO发生
    while True:
        rs,ws,xs = select(rlist,wlist,xlist)
        for r in rs:
            if r is sock:
                connect_client(r) # 连接客户端
            else:
                handle_client(r) # 处理客户端消息

if __name__ == '__main__':
    main()



################################ epoll 方法 ################################
"""
基于epoll的并发服务模型
使用类实现
"""
from select import *
from socket import *


class EpollServer:
    def __init__(self, host="", port=0):
        self.host = host
        self.port = port
        self.address = (host, port)
        self.sock = self._create_socket()
        self.ep = epoll()
        self.map = {} # 查找字典

    def _create_socket(self):
        sock = socket()
        sock.bind(self.address)
        sock.setblocking(False)
        return sock

    # 处理客户端连接
    def _connect_client(self, fd):
        connfd, addr = self.map[fd].accept()
        print("Connect from", addr)
        connfd.setblocking(False)
        # 增加关注对象,设置边缘触发
        self.ep.register(connfd, EPOLLIN | EPOLLET)
        self.map[connfd.fileno()] = connfd  # 维护字典

    # 处理客户端消息
    def _handle_client(self, fd):
        data = self.map[fd].recv(1024)
        # 处理客户端退出
        if not data:
            self.ep.unregister(fd)  # 不再关注
            self.map[fd].close()
            del self.map[fd]  # 从字典删除
            return
        print(data.decode())
        self.map[fd].send(b"Thanks")

    # 启动服务
    def serve_forever(self):
        self.sock.listen(3)
        print("Listen the port %d" % self.port)
        self.ep.register(self.sock, EPOLLIN)  # 设置关注
        self.map[self.sock.fileno()] = self.sock

        while True:
            events = self.ep.poll()
            # 循环查看哪个IO发生就处理哪个
            for fd, event in events:
                if fd == self.sock.fileno():
                    self._connect_client(fd)
                elif event == EPOLLIN:
                    self._handle_client(fd)


if __name__ == '__main__':
    ep = EpollServer(host="0.0.0.0", port=8888)
    ep.serve_forever()  # 启动服务