epoll 原理(三)poll 實現

針對 select 系統調用的三個不足,poll 解決的是第一個、最多 1024 個 FD 限制的問題。

其實現思路是:

1. 不再使用位圖來傳遞事件和結果,而是使用 pollfd 。 結構體數組來傳遞。

2. 在內部實現時,以 poll_list 鏈表的形式來分批次保存 pollfd 。不像 select 那樣一次申請完整的一大塊內存。

3. 通過從進程的信號量裡獲取能打開的最大文件數量,解決 1024 個限制的問題。

0. 基本數據結構

// 源碼位置:include/uapi/asm-generic/poll.h

struct pollfd {

int fd; // FD

short events; // 輸入的敢興趣事件

short revents; // 輸出的結果

};

// 源碼位置:fs/select.c

struct poll_list {

struct poll_list *next;

// entries 指向的數組裡 pollfd 的數量

int len;

// 指向 pollfd 數組的指針

struct pollfd entries[0];

};

pollfd 結構體用來傳遞單個FD的輸入事件、輸出結果。

poll_list 是一個鏈表,其節點指向 pollfd 結構體的數組,這個數組要麼是在棧上預分配、要麼是按內存頁分配(保持頁對齊)。

1. poll 系統調用主邏輯

源碼位置:fs/select.c

SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigned int, nfds,

int, timeout_msecs)

{

struct timespec64 end_time, *to = NULL;

int ret;

if (timeout_msecs >= 0) {

to = &end_time;

poll_select_set_timeout(to, timeout_msecs / MSEC_PER_SEC,

NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));

}

ret = do_sys_poll(ufds, nfds, to);

if (ret == -EINTR) {

struct restart_block *restart_block;

restart_block = &current->restart_block;

restart_block->fn = do_restart_poll;

restart_block->poll.ufds = ufds;

restart_block->poll.nfds = nfds;

if (timeout_msecs >= 0) {

restart_block->poll.tv_sec = end_time.tv_sec;

restart_block->poll.tv_nsec = end_time.tv_nsec;

restart_block->poll.has_timeout = 1;

} else

restart_block->poll.has_timeout = 0;

ret = -ERESTART_RESTARTBLOCK;

}

return ret;

}

static int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,

struct timespec64 *end_time)

{

struct poll_wqueues table;

int err = -EFAULT, fdcount, len, size;

/* Allocate small arguments on the stack to save memory and be

faster - use long to make sure the buffer is aligned properly

on 64 bit archs to avoid unaligned access */

// 創建 256 個字節大小的數組

long stack_pps[POLL_STACK_ALLOC/sizeof(long)];

struct poll_list *const head = (struct poll_list *)stack_pps;

struct poll_list *walk = head;

unsigned long todo = nfds;

// 如果超過能打開的最大文件數則返回

// 這個 rlimit 通過判等進程信息裡的信號量來實現的

// 因此修改文件能打開的最大文件數不需要重新編譯,可以實時修改

if (nfds > rlimit(RLIMIT_NOFILE))

return -EINVAL;

// N_STACK_PPS 是用於計算 stack_pps 裡能存放多少個 pollfd 結構體

len = min_t(unsigned int, nfds, N_STACK_PPS);

// 從用戶空間拷貝 pollfd 數組到內核空間

// 分批拷貝,不同批次之間用 poll_list 鏈表維護起來

for (;;) {

walk->next = NULL;

walk->len = len;

if (!len)

break;

if (copy_from_user(walk->entries, ufds + nfds-todo,

sizeof(struct pollfd) * walk->len))

goto out_fds;

todo -= walk->len;

if (!todo)

break;

// POLLFD_PER_PAGE 是一個頁面能存放多少個 pollfd 結構體

len = min(todo, POLLFD_PER_PAGE);

size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;

walk = walk->next = kmalloc(size, GFP_KERNEL);

if (!walk) {

err = -ENOMEM;

goto out_fds;

}

}

// 初始化 poll_wqueues,設置隊列處理函數等

poll_initwait(&table);

// 主邏輯:調用目標文件的 poll 函數

fdcount = do_poll(head, &table, end_time);

// 刪除主邏輯裡添加到目標文件的等待節點

poll_freewait(&table);

// 把結果拷貝到用戶空間

for (walk = head; walk; walk = walk->next) {

struct pollfd *fds = walk->entries;

int j;

for (j = 0; j < walk->len; j++, ufds++)

if (__put_user(fds[j].revents, &ufds->revents))

goto out_fds;

}

err = fdcount;

out_fds:

// 釋放申請的內存

walk = head->next;

while (walk) {

struct poll_list *pos = walk;

walk = walk->next;

kfree(pos);

}

return err;

}

static int do_poll(struct poll_list *list, struct poll_wqueues *wait,

struct timespec64 *end_time)

{

poll_table* pt = &wait->pt;

ktime_t expire, *to = NULL;

int timed_out = 0, count = 0;

u64 slack = 0;

unsigned int busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0;

unsigned long busy_start = 0;

/* Optimise the no-wait case */

if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {

pt->_qproc = NULL;

timed_out = 1;

}

if (end_time && !timed_out)

slack = select_estimate_accuracy(end_time);

for (;;) {

struct poll_list *walk;

bool can_busy_loop = false;

// 遍歷鏈表

for (walk = list; walk != NULL; walk = walk->next) {

struct pollfd * pfd, * pfd_end;

pfd = walk->entries;

pfd_end = pfd + walk->len;

// 遍歷節點裡的數組

for (; pfd != pfd_end; pfd++) {

/*

* Fish for events. If we found one, record it

* and kill poll_table->_qproc, so we don't

* needlessly register any other waiters after

* this. They'll get immediately deregistered

* when we break out and return.

*/

// 處理單個 pollfd

if (do_pollfd(pfd, pt, &can_busy_loop,

busy_flag)) {

// 有事件發生了

count++;

// 後續的文件即使沒有事件發生也不需要等待了。

pt->_qproc = NULL;

/* found something, stop busy polling */

busy_flag = 0;

can_busy_loop = false;

}

}

}

// 注意:上面的遍歷循環裡,並沒有像 select 那樣,在小批次poll後進行睡眠。

/*

* All waiters have already been registered, so don't provide

* a poll_table->_qproc to them on the next loop iteration.

*/

pt->_qproc = NULL;

if (!count) {

count = wait->error;

if (signal_pending(current))

count = -EINTR;

}

if (count || timed_out)

break;

/* only if found POLL_BUSY_LOOP sockets && not out of time */

if (can_busy_loop && !need_resched()) {

if (!busy_start) {

busy_start = busy_loop_current_time();

continue;

}

if (!busy_loop_timeout(busy_start))

continue;

}

busy_flag = 0;

/*

* If this is the first loop and we have a timeout

* given, then we convert to ktime_t and set the to

* pointer to the expiry value.

*/

if (end_time && !to) {

expire = timespec64_to_ktime(*end_time);

to = &expire;

}

// 睡眠等待直至超時或被喚醒

if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack))

timed_out = 1;

}

return count;

}

static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait,

bool *can_busy_poll,

unsigned int busy_flag)

{

unsigned int mask;

int fd;

mask = 0;

fd = pollfd->fd;

if (fd >= 0) {

struct fd f = fdget(fd);

mask = POLLNVAL;

if (f.file) {

mask = DEFAULT_POLLMASK;

if (f.file->f_op->poll) {

pwait->_key = pollfd->events|POLLERR|POLLHUP;

pwait->_key |= busy_flag;

mask = f.file->f_op->poll(f.file, pwait);

if (mask & busy_flag)

*can_busy_poll = true;

}

/* Mask out unneeded events. */

// 清除不需要的事件

mask &= pollfd->events | POLLERR | POLLHUP;

fdput(f);

}

}

pollfd->revents = mask;

return mask;

由於採用 pollfd 結構體來傳遞文件的事件,do_poll 遍歷所有文件時的邏輯更清晰些、有層次,對某個 pollfd 的處理提取成 do_pollfd 函數。

2. 等待與喚醒

poll 系統調用的等待與喚醒邏輯與 select 系統調用是一樣的,調用的代碼是一樣的。

3. 小結

  1. poll 系統調用解決了 select 系統調用最多 1024 個文件的限制。
  2. 每次調用仍然需要拷貝所有文件的事件。
  3. 其實現上,每次喚醒後仍然需要 poll 所有文件,效率依賴於 poll 的文件數量。在網絡的服務端應用裡,要監聽的 socket 連接的事件是比較穩定的、其數量也是比較穩定的,不會每次 select/poll 都會發生變化,因此第2、3點的問題有很大的優化空間,這在 epoll 系統調用裡解決。


分享到:


相關文章: