漫谈IO高复用和高性能服务框架

  • instance referred to by epfd. The event is ignored and can be
  • NULL (but see BUGS below).
  • 我们需要重点关注epoll_event

    typedef union epoll_data {

    void *ptr;

    int fd;

    uint32_t u32;

    uint64_t u64;

    } epoll_data_t;

    struct epoll_event {

    uint32_t events; /* Epoll events */

    epoll_data_t data; /* User data variable */

    };

    epoll中的events和poll中的没有区别,只是在前面加了个E就可以了,两个额外的,我会在后面介绍。

    在epoll_data中我们可以指定fd,也可以用ptr成员来指定一个fd相关的数据,但是我们又不能弃用fd,但我们可以将ptr和fd进行绑定,即ptr所指向的数据中包含fd。

    然后我们就可以用epoll_wait来监听事件集合了。

    #include

    int epoll_wait(int epfd, struct epoll_event *events,

    int maxevents, int timeout);

    下面我们来说说特殊的LT/ET问题

    ​ 简而言之就是在LT(默认工作模式)下,epoll_wait检测到有事件发生并将事件通知应用程序后,若应用程序未能来得及处理这个事件,当应用程序下次调用epoll_wait的时候,epoll_wait还会向应用程序通告这个事件。但是当处于ET模式(event |= EPOLLET),应用程序必须立即处理该事件,因为后续epoll_wait不在会通告这个事件。

    假设我们读取一个很大的buf,我们用代码就能看出其中的不同

    LT:

    if (events[i].events & EPOLLIN)

    {

    printf("event trigger once\n");

    memset(buff, 0, BUFFER_SIZE);

    int ret = recv(sockfd, buff, BUFFER_SIZE, 0);

    if (ret <= 0)

    {

    close(sockfd);

    continue;

    }

    printf("get %d bytes of content : %s\n", ret, buff);

    }

    ET:

    if (events[i].events & EPOLLIN)

    {

    printf("event trigger once\n");

    while(1)

    {

    memset(buff, 0, BUFFER_SIZE);

    int ret = recv(sockfd, buff, BUFFER_SIZE, 0);

    if (ret < 0)

    {

    if ((errno == EAGAIN) || (errno == EWOULDBLOCK))

    {

    printf("read later\n");

    break;

    }

    close(sockfd);

    break;

    }

    else if (ret == 0)

    {

    close(sockfd);

    }

    else

    {

    printf("get %d bytes of content : %s\n", ret, buff);

    }

    }

    }

    ​ 可见ET模式很大程度上降低了同一个事件的重复触发,执行效率要比ET高一些。

    EPOLLONESHOP事件

    ​ 即使我们使用了ET模式,同一个fd可以触发多个事件,这里就会有race condition,假如一个线程正在读取信息,另一个线程却准备关掉链接,这就会非常糟糕,我们期望一个socket在任何时刻只有一个线程处理它。这一点我们就可以用EPOLLONESHOP实现。

    ​ 对于注册了EPOLLONESHOP的文件描述符,操作系统最多触发一次其上注册的可读、可写或者异常事件,且中触发一次,除非我们重置EPOLLONESHOP事件。

    漫谈IO高复用和高性能服务框架

    三.统一事件源

    ​ 信号是一个异步的事件,它的处理函数和主循环是分开的,但是信号要越早处理越好,以确保该信号不被屏蔽(为了避免一些竞争条件,信号在处理期间,提供不会再次触发它。我们可以将其和其他IO事件统一起来处理,即统一事件源。

    ​ 主要思想是把信号的处理函数放在程序的主循环中,当信号被触发时候,它只是简单通知循环程序接受信号,然后信号处理函数将信号通过管道发送给主循环,主循环在接受信号执行相应的逻辑代码。

    ​ 信号回调函数主要是将信号发送给主循环。

    void sig_handler(int sig)

    {

    int save_errno = errno;

    int msg = sig;

    send(pipefd[1], (char *)&msg, 1, 0);    //信号的回调函数将信号发送给主循环

    errno = save_errno;               //保证函数的可重入性

    }

    int addsig(int sig)

    {

    struct sigaction sa;

    memset(&sa, 0, sizeof(sa));

    sa.sa_handler = sig_handler; //注册回调函数

    sa.sa_flags |= SA_RESTART;        

    sigfillset(&sa.sa_mask);

    assert(sigaction(sig, &sa, NULL) != -1);

    }

    ​ 主函数部分

    int main(int argc, char *argv[])

    {

    ...

    int epollfd = epoll_create(5);

    ...

    int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); //创建管道

    setnoblocking(pipefd[1]);

    addfd(epollfd, pipefd[0]);    //注册管道的可读事件

     

    addsig(SIGHUP);     //设置信号

    addsig(SIGCHLD);

    addsig(SIGTERM);

    addsig(SIGINT);

    ...

    while (1) //只是为了逻辑简单

    {

    int number = epoll_wait(epollfd, events, MAX_EVENT_NUM, -1);

    ...

    if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN))

    {

    int sig;

    char signals[ERROR_SIZE];

    ret = recv(pipefd[0], signals, ERROR_SIZE-1, 0);

    if (ret == -1)

    {

    continue;

    }

    else if(ret == 0)

    {

    continue;

    }

    else

    {

    for (int i = 0; i < ret ; ++i) //对接收到信号进行处理

    {

    switch (signals[i]) {

    case SIGCHLD:

    case SIGHUP:

    {

    continue;

    }

    case SIGTERM:

    case SIGINT:

    {

    stop_server = true;

    break;

    }

    }

    }

    }

    }

     }

    }

    四.Reactor模型

    ​ 此处参考博客两种高效的服务器设计模型:Reactor和Proactor模型

    Reactor模型

    ​ Reactor模式是处理并发I/O比较常见的一种模式,用于同步I/O,中心思想是将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中。

    ​ Reactor是一种事件驱动机制,和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的事件发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。用“好莱坞原则”来形容Reactor再合适不过了:不要打电话给我们,我们会打电话通知你。

    ​ 当我们使用同步IO模型对其进行实现Reactor工作流程是:

    1. 主线程像epoll内核事件表注册socket读就绪事件
    2. 主线程调用epol_wait等待可读事件
    3. 当socket上出现可读事件,epoll_waiit通知主线程,主线程将socket可读事件放入请求队列
    4. 睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上写就绪事件
    5. 主线程调用epol_wait等待可写事件
    6. 当socket可写的时候,epoll_wait通知主线程,主线程间socket可写事件放到请求队列
    7. 睡眠在请求队列上的某个工作线程被唤醒,它往socket上写入服务器处理客户请求的结果
    8. 在Reactor模式中,有5个关键的参与者:
    • 描述符(handle):由操作系统提供的资源,用于识别每一个事件,如Socket描述符、文件描述符、信号的值等。在Linux中,它用一个整数来表示。事件可以来自外部,如来自客户端的连接请求、数据等。事件也可以来自内部,如信号、定时器事件。
    • 同步事件多路分离器(event demultiplexer):事件的到来是随机的、异步的,无法预知程序何时收到一个客户连接请求或收到一个信号。所以程序要循环等待并处理事件,这就是事件循环。在事件循环中,等待事件一般使用I/O复用技术实现。在linux系统上一般是select、poll、epol_waitl等系统调用,用来等待一个或多个事件的发生。I/O框架库一般将各种I/O复用系统调用封装成统一的接口,称为事件多路分离器。调用者会被阻塞,直到分离器分离的描述符集上有事件发生。
    • 事件处理器
      (event handler):I/O框架库提供的事件处理器通常是由一个或多个模板函数组成的接口。这些模板函数描述了和应用程序相关的对某个事件的操作,用户需要继承它来实现自己的事件处理器,即具体事件处理器。因此,事件处理器中的回调函数一般声明为虚函数,以支持用户拓展。
    • 具体的事件处理器(concrete event handler):是事件处理器接口的实现。它实现了应用程序提供的某个服务。每个具体的事件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服务。
    • Reactor 管理器(reactor):定义了一些接口,用于应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核心。 Reactor管理器使用同步事件分离器来等待事件的发生。一旦事件发生,Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模 板函数来处理这个事件。
    1. ​ 可以看出,是Reactor管理器并不是应用程序
      负责等待事件、分离事件和调度事件。Reactor并没有被具体的事件处理器调度,而是管理器调度具体的事件处理器,由事件处理器对发生的事件作出处理,这就是Hollywood原则。应用程序要做的仅仅是实现一个具体的事件处理器,然后把它注册到Reactor管理器中。接下来的工作由管理器来完成:如果有相应的事件发生,Reactor会主动调用具体的事件处理器,由事件处理器对发生的事件作出处理。
    2. ps:除此之外还有proacotr模型,暂时没有见到过,就不写了。

    五.有限状态机

    ​ 有的应用层协议包含数据类型的字段,每个类型可以映射为逻辑单元的一种可执行状态,服务器可以根据它来编写处理逻辑。

    STATE_MACHINE(Packge _pack)

    {

    Package _type = _pack.GetType();

    swtich(_type)

    {

    case type_A:

    process_package_A(_pack);

    break;

    case type_B:

    process_package_B(_pack);

    break;

    }

    }

    ​ 当状态机的各个状态不再像上面那样独立,而是可以相互转移,我们也可以用状态机实现

    STATE_MACHINE(Packge _pack)

    {

    type _type = type_A;

    while(_type != type_C)

    {

    Package _pack = _pack.GetType();

    swtich(_type)

    {

    case type_A:

    process_package_A(_pack);

    _type = type_B;

    break;

    case type_B:

    process_package_B(_pack);

    _type = type_C;

    break;

    }

    }

    }

    漫谈IO高复用和高性能服务框架


    分享到:


    相關文章: