我们需要重点关注epoll_event
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
epoll中的events和poll中的没有区别,只是在前面加了个E就可以了,两个额外的,我会在后面介绍。
在epoll_data中我们可以指定fd,也可以用ptr成员来指定一个fd相关的数据,但是我们又不能弃用fd,但我们可以将ptr和fd进行绑定,即ptr所指向的数据中包含fd。
然后我们就可以用epoll_wait来监听事件集合了。
#include
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
下面我们来说说特殊的LT/ET问题
简而言之就是在LT(默认工作模式)下,epoll_wait检测到有事件发生并将事件通知应用程序后,若应用程序未能来得及处理这个事件,当应用程序下次调用epoll_wait的时候,epoll_wait还会向应用程序通告这个事件。但是当处于ET模式(event |= EPOLLET),应用程序必须立即处理该事件,因为后续epoll_wait不在会通告这个事件。
假设我们读取一个很大的buf,我们用代码就能看出其中的不同
LT:
if (events[i].events & EPOLLIN)
{
printf("event trigger once\n");
memset(buff, 0, BUFFER_SIZE);
int ret = recv(sockfd, buff, BUFFER_SIZE, 0);
if (ret <= 0)
{
close(sockfd);
continue;
}
printf("get %d bytes of content : %s\n", ret, buff);
}
ET:
if (events[i].events & EPOLLIN)
{
printf("event trigger once\n");
while(1)
{
memset(buff, 0, BUFFER_SIZE);
int ret = recv(sockfd, buff, BUFFER_SIZE, 0);
if (ret < 0)
{
if ((errno == EAGAIN) || (errno == EWOULDBLOCK))
{
printf("read later\n");
break;
}
close(sockfd);
break;
}
else if (ret == 0)
{
close(sockfd);
}
else
{
printf("get %d bytes of content : %s\n", ret, buff);
}
}
}
可见ET模式很大程度上降低了同一个事件的重复触发,执行效率要比ET高一些。
EPOLLONESHOP事件
即使我们使用了ET模式,同一个fd可以触发多个事件,这里就会有race condition,假如一个线程正在读取信息,另一个线程却准备关掉链接,这就会非常糟糕,我们期望一个socket在任何时刻只有一个线程处理它。这一点我们就可以用EPOLLONESHOP实现。
对于注册了EPOLLONESHOP的文件描述符,操作系统最多触发一次其上注册的可读、可写或者异常事件,且中触发一次,除非我们重置EPOLLONESHOP事件。
三.统一事件源
信号是一个异步的事件,它的处理函数和主循环是分开的,但是信号要越早处理越好,以确保该信号不被屏蔽(为了避免一些竞争条件,信号在处理期间,提供不会再次触发它。我们可以将其和其他IO事件统一起来处理,即统一事件源。
主要思想是把信号的处理函数放在程序的主循环中,当信号被触发时候,它只是简单通知循环程序接受信号,然后信号处理函数将信号通过管道发送给主循环,主循环在接受信号执行相应的逻辑代码。
信号回调函数主要是将信号发送给主循环。
void sig_handler(int sig)
{
int save_errno = errno;
int msg = sig;
send(pipefd[1], (char *)&msg, 1, 0); //信号的回调函数将信号发送给主循环
errno = save_errno; //保证函数的可重入性
}
int addsig(int sig)
{
struct sigaction sa;
memset(&sa, 0, sizeof(sa));
sa.sa_handler = sig_handler; //注册回调函数
sa.sa_flags |= SA_RESTART;
sigfillset(&sa.sa_mask);
assert(sigaction(sig, &sa, NULL) != -1);
}
主函数部分
int main(int argc, char *argv[])
{
...
int epollfd = epoll_create(5);
...
int ret = socketpair(PF_UNIX, SOCK_STREAM, 0, pipefd); //创建管道
setnoblocking(pipefd[1]);
addfd(epollfd, pipefd[0]); //注册管道的可读事件
addsig(SIGHUP); //设置信号
addsig(SIGCHLD);
addsig(SIGTERM);
addsig(SIGINT);
...
while (1) //只是为了逻辑简单
{
int number = epoll_wait(epollfd, events, MAX_EVENT_NUM, -1);
...
if ((sockfd == pipefd[0]) && (events[i].events & EPOLLIN))
{
int sig;
char signals[ERROR_SIZE];
ret = recv(pipefd[0], signals, ERROR_SIZE-1, 0);
if (ret == -1)
{
continue;
}
else if(ret == 0)
{
continue;
}
else
{
for (int i = 0; i < ret ; ++i) //对接收到信号进行处理
{
switch (signals[i]) {
case SIGCHLD:
case SIGHUP:
{
continue;
}
case SIGTERM:
case SIGINT:
{
stop_server = true;
break;
}
}
}
}
}
}
}
四.Reactor模型
此处参考博客两种高效的服务器设计模型:Reactor和Proactor模型
Reactor模型
Reactor模式是处理并发I/O比较常见的一种模式,用于同步I/O,中心思想是将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中。
Reactor是一种事件驱动机制,和普通函数调用的不同之处在于:应用程序不是主动的调用某个API完成处理,而是恰恰相反,Reactor逆置了事件处理流程,应用程序需要提供相应的接口并注册到Reactor上,如果相应的事件发生,Reactor将主动调用应用程序注册的接口,这些接口又称为“回调函数”。用“好莱坞原则”来形容Reactor再合适不过了:不要打电话给我们,我们会打电话通知你。
当我们使用同步IO模型对其进行实现Reactor工作流程是:
- 主线程像epoll内核事件表注册socket读就绪事件
- 主线程调用epol_wait等待可读事件
- 当socket上出现可读事件,epoll_waiit通知主线程,主线程将socket可读事件放入请求队列
- 睡眠在请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上写就绪事件
- 主线程调用epol_wait等待可写事件
- 当socket可写的时候,epoll_wait通知主线程,主线程间socket可写事件放到请求队列
- 睡眠在请求队列上的某个工作线程被唤醒,它往socket上写入服务器处理客户请求的结果
- 在Reactor模式中,有5个关键的参与者:
- 描述符(handle):由操作系统提供的资源,用于识别每一个事件,如Socket描述符、文件描述符、信号的值等。在Linux中,它用一个整数来表示。事件可以来自外部,如来自客户端的连接请求、数据等。事件也可以来自内部,如信号、定时器事件。
- 同步事件多路分离器(event demultiplexer):事件的到来是随机的、异步的,无法预知程序何时收到一个客户连接请求或收到一个信号。所以程序要循环等待并处理事件,这就是事件循环。在事件循环中,等待事件一般使用I/O复用技术实现。在linux系统上一般是select、poll、epol_waitl等系统调用,用来等待一个或多个事件的发生。I/O框架库一般将各种I/O复用系统调用封装成统一的接口,称为事件多路分离器。调用者会被阻塞,直到分离器分离的描述符集上有事件发生。
- 事件处理器 (event handler):I/O框架库提供的事件处理器通常是由一个或多个模板函数组成的接口。这些模板函数描述了和应用程序相关的对某个事件的操作,用户需要继承它来实现自己的事件处理器,即具体事件处理器。因此,事件处理器中的回调函数一般声明为虚函数,以支持用户拓展。
- 具体的事件处理器(concrete event handler):是事件处理器接口的实现。它实现了应用程序提供的某个服务。每个具体的事件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服务。
- Reactor 管理器(reactor):定义了一些接口,用于应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核心。 Reactor管理器使用同步事件分离器来等待事件的发生。一旦事件发生,Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模 板函数来处理这个事件。
- 可以看出,是Reactor管理器并不是应用程序 负责等待事件、分离事件和调度事件。Reactor并没有被具体的事件处理器调度,而是管理器调度具体的事件处理器,由事件处理器对发生的事件作出处理,这就是Hollywood原则。应用程序要做的仅仅是实现一个具体的事件处理器,然后把它注册到Reactor管理器中。接下来的工作由管理器来完成:如果有相应的事件发生,Reactor会主动调用具体的事件处理器,由事件处理器对发生的事件作出处理。
- ps:除此之外还有proacotr模型,暂时没有见到过,就不写了。
五.有限状态机
有的应用层协议包含数据类型的字段,每个类型可以映射为逻辑单元的一种可执行状态,服务器可以根据它来编写处理逻辑。
STATE_MACHINE(Packge _pack)
{
Package _type = _pack.GetType();
swtich(_type)
{
case type_A:
process_package_A(_pack);
break;
case type_B:
process_package_B(_pack);
break;
}
}
当状态机的各个状态不再像上面那样独立,而是可以相互转移,我们也可以用状态机实现
STATE_MACHINE(Packge _pack)
{
type _type = type_A;
while(_type != type_C)
{
Package _pack = _pack.GetType();
swtich(_type)
{
case type_A:
process_package_A(_pack);
_type = type_B;
break;
case type_B:
process_package_B(_pack);
_type = type_C;
break;
}
}
}
閱讀更多 程序猿的內心獨白 的文章