02.28 Linux select 一網打盡

奇技指南


select, 你可以不用它,但你不能不瞭解它。通過閱讀本文,可以幫你理清select的來龍去脈, 讓你從中瞭解到:


我們常說的select的1024限制指的是什麼 ?怎麼會有這樣的限制?

都說select效率不高,是這樣嗎?為什麼 ?

select使用中有坑嗎?


注:本文的所有內容均指針對 Linux Kernel, 當前使用的源碼版本是 5.3.0


原型

<code>int select (int __nfds, fd_set *__restrict __readfds, fd_set *__restrict __writefds, fd_set *__restrict __exceptfds, struct timeval *__restrict __timeout);/<code>

select是IO多種複用的一種實現,它將需要監控的fd分為讀、寫、異常三類,使用fd_set表示,當其返回時要麼是超時,要麼是有至少一種讀、寫或異常事件發生。


相關數據結構

FD_SET

FD_SET是select最重要的數據結構了,其在內核中的定義如下:

<code>typedef __kernel_fd_set fd_set;#undef __FD_SETSIZE#define __FD_SETSIZE 1024typedef struct { unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];} __kernel_fd_set;/<code>

我們來簡化下,fd_set是一個struct, 其內部只有一個由16個元素組成的unsigned long數組,這個數組一共可以表示16 × 64 = 1024位, 每一位用來表示一個 fd, 這也就是 select針對讀、寫或異常每一類最多隻能有 1024個fd 限制的由來。


相關宏

下面這些宏定義在內核代碼fs/select.c中

FDS_BITPERLONG: 返回每個long有多少位,通常是64bits

<code>#define FDS_BITPERLONG (8*sizeof(long))/<code>FDS_LONGS(nr): 獲取 nr 個fd 需要用幾個long來表示

<code>#define FDS_LONGS(nr) (((nr)+FDS_BITPERLONG-1)/FDS_BITPERLONG)/<code>FD_BYTES(nr): 獲取 nr 個fd 需要用 多少個字節來表示

<code>#define FDS_BYTES(nr) (FDS_LONGS(nr)*sizeof(long))/<code>

下面這些宏可以在gcc源碼中找到

FD_ZERO: 初始化一個fd_set

<code>#define __FD_ZERO(s) \\ do { \\ unsigned int __i; \\ fd_set *__arr = (s); \\ for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i) \\ __FDS_BITS (__arr)[__i] = 0; \\ } while (0)/<code>

將上面所說的由16個元素組成的unsigned long數組每一個元素都設為 0;

__FD_SET(d, s): 將一個fd 賦值到 一個 fd_set

<code>#define __FD_SET(d, s) \\ ((void) (__FDS_BITS (s)[__FD_ELT(d)] |= __FD_MASK(d)))/<code>


分三步:

1、__FD_ELT(d):確定賦值到數組的哪一個元素

<code> #define __FD_ELT(d) \\ __extension__ \\ ({ long int __d = (d); \\ (__builtin_constant_p (__d) \\ ? (0 <= __d && __d < __FD_SETSIZE \\ ? (__d / __NFDBITS) \\ : __fdelt_warn (__d)) \\ : __fdelt_chk (__d)); })/<code>

其中 #define __NFDBITS (8 * (int) sizeof (__fd_mask)) , 即__NFDBITS = 64

這裡實現使用了__builtin_constant_p針對常量作了優化,我也沒有太理解常量與非常量實現方案有什麼不同,我們暫時忽略這個細節看本質。

本質就是 一個 unsigned long有64位,直接 __d / __NFDBITS取模就可以確定用數組的哪一個元素了;

2、__FD_MASK(d):確定賦值到一個 unsigned long的哪一位

<code>#define __FD_MASK(d) ((__fd_mask) (1UL << ((d) % __NFDBITS)))/<code>

直接 (d) % __NFDBITS)取餘後作為 1 左移的位數即可

3、|= :用位或賦值即可


在內核中的實現

調用層級


系統調用入口位置

位於fs/select.c中

<code>SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp, fd_set __user *, exp, struct timeval __user *, tvp){ return kern_select(n, inp, outp, exp, tvp);}/<code>

入口函數 kern_select

<code>static int kern_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timeval __user *tvp){ struct timespec64 end_time, *to = NULL; struct timeval tv; int ret; if (tvp) { if (copy_from_user(&tv, tvp, sizeof(tv))) return -EFAULT; to = &end_time; if (poll_select_set_timeout(to, tv.tv_sec + (tv.tv_usec / USEC_PER_SEC), (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC)) return -EINVAL; } ret = core_sys_select(n, inp, outp, exp, to); return poll_select_finish(&end_time, tvp, PT_TIMEVAL, ret);}/<code>

做三件事:

a. 如果設置了超時,首先準備時間戳 timespec64;

b. 調用 core_sys_select,這個是具體的實現,我們下面會重點介紹

c. poll_select_finish:做的主要工作就是更新用戶調用select時傳進來的超時參數tvp,我列一下關鍵代碼:

<code> ktime_get_ts64(&rts); rts = timespec64_sub(*end_time, rts); if (rts.tv_sec < 0) rts.tv_sec = rts.tv_nsec = 0;... struct timeval rtv; if (sizeof(rtv) > sizeof(rtv.tv_sec) + sizeof(rtv.tv_usec)) memset(&rtv, 0, sizeof(rtv)); rtv.tv_sec = rts.tv_sec; rtv.tv_usec = rts.tv_nsec / NSEC_PER_USEC; if (!copy_to_user(p, &rtv, sizeof(rtv))) return ret;/<code>

可以看到先獲取當前的時間戳,然後通過timespec64_sub和傳入的時間戳(接中傳入的是超時時間,實現時會轉化為時間戳)求出差值,將此差值傳回給用戶,即返回了剩餘的超時時間。所以這個地方是個小陷阱,用戶在調用select時,需要每次重新初始化這個超時時間。

通過 core_sys_select 實現

這個函數主要功能是在實現真正的select功能前,準備好 fd_set ,即從用戶空間將所需的三類 fd_set 複製到內核空間。從下面的代碼中你會看到對於每次的 select系統調用,都需要從用戶空間將所需的三類 fd_set 複製到內核空間,這裡存在性能上的損耗。

<code>int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp, fd_set __user *exp, struct timespec64 *end_time){ fd_set_bits fds; void *bits; int ret, max_fds; size_t size, alloc_size; struct fdtable *fdt; /* Allocate small arguments on the stack to save memory and be faster */ long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; ret = -EINVAL; if (n < 0) goto out_nofds; /* max_fds can increase, so grab it once to avoid race */ rcu_read_lock(); fdt = files_fdtable(current->files); max_fds = fdt->max_fds; rcu_read_unlock(); if (n > max_fds) n = max_fds; /* * We need 6 bitmaps (in/out/ex for both incoming and outgoing), * since we used fdset we need to allocate memory in units of * long-words. */ size = FDS_BYTES(n); bits = stack_fds; if (size > sizeof(stack_fds) / 6) { /* Not enough space in on-stack array; must use kmalloc */ ret = -ENOMEM; if (size > (SIZE_MAX / 6)) goto out_nofds; alloc_size = 6 * size; bits = kvmalloc(alloc_size, GFP_KERNEL); if (!bits) goto out_nofds; } fds.in = bits; fds.out = bits + size; fds.ex = bits + 2*size; fds.res_in = bits + 3*size; fds.res_out = bits + 4*size; fds.res_ex = bits + 5*size; if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp, fds.ex))) goto out; zero_fd_set(n, fds.res_in); zero_fd_set(n, fds.res_out); zero_fd_set(n, fds.res_ex); ret = do_select(n, &fds, end_time); if (ret < 0) goto out; if (!ret) { ret = -ERESTARTNOHAND; if (signal_pending(current)) goto out; ret = 0; } if (set_fd_set(n, inp, fds.res_in) || set_fd_set(n, outp, fds.res_out) || set_fd_set(n, exp, fds.res_ex)) ret = -EFAULT;out: if (bits != stack_fds) kvfree(bits);out_nofds: return ret;}/<code>

代碼中的註釋很清晰,我們這裡簡單過一下:


分五步:

1、規範化select系統調用傳入的第一個參數 n

<code>/* max_fds can increase, so grab it once to avoid race */ rcu_read_lock(); fdt = files_fdtable(current->files); max_fds = fdt->max_fds; rcu_read_unlock(); if (n > max_fds) n = max_fds;/<code>

這個n是三類不同的fd_set中所包括的fd數值的最大值 + 1, linux task打開句柄從0開始,不加1的話可能會少監控fd.

用戶在使用時可以有個偷懶的做法,就是將這個n設置為 FD_SETSIZE,通常是1024,這將監控的範圍擴大到了上限,但實際上遠沒有這麼多fd需要監控,浪費資源。

linux man中的解釋如下:

nfds should be set to the highest-numbered file descriptor in any of the three sets, plus 1. The indicated file descriptors in each set are checked, up to this limit (but see BUGS).


2、計算內核空間所需要的fd_set的空間, 內核態需要三個fd_set來容納用戶態傳遞過來的參數,還需要三個fd_set來容納select調用返回後生成的三灰fd_set, 即一共是6個fd_set

<code> long stack_fds[SELECT_STACK_ALLOC/sizeof(long)]; . . . size = FDS_BYTES(n); bits = stack_fds; if (size > sizeof(stack_fds) / 6) { /* Not enough space in on-stack array; must use kmalloc */ ret = -ENOMEM; if (size > (SIZE_MAX / 6)) goto out_nofds; alloc_size = 6 * size; bits = kvmalloc(alloc_size, GFP_KERNEL); if (!bits) goto out_nofds; } fds.in = bits; fds.out = bits + size; fds.ex = bits + 2*size; fds.res_in = bits + 3*size; fds.res_out = bits + 4*size; fds.res_ex = bits + 5*size;/<code>

這裡有個小技巧,先從內核棧上分配空間,如果不夠用,才使用 kvmalloc分配。

通過 size = FDS_BYTES(n);計算出單一一種fd_set所需字節數,

再能過 alloc_size = 6 * size; 即可計算出所需的全部字節數。

3、初始化用作參數的和用作返回值的兩類fd_set

<code>if ((ret = get_fd_set(n, inp, fds.in)) || (ret = get_fd_set(n, outp, fds.out)) || (ret = get_fd_set(n, exp, fds.ex))) goto out; zero_fd_set(n, fds.res_in); zero_fd_set(n, fds.res_out); zero_fd_set(n, fds.res_ex);/<code>

4、真正實現部分 do_select, 我們在下面詳講

5、返回結果複製回用戶空間

<code>if (set_fd_set(n, inp, fds.res_in) || set_fd_set(n, outp, fds.res_out) || set_fd_set(n, exp, fds.res_ex)) ret = -EFAULT;/<code>

這裡又多了一次內核空間到用戶空間的copy,而且我們看到返回值也是用fd_set結構來表示,這意味著我們在用戶空間處理裡也需要遍歷每一位。


精華所在 do_select

wait queue

這裡用到了Linux裡一個很重要的數據結構 wait queue, 我們暫不打算展開來講,先簡單來說下其用法,比如我們在進程中read時經常要等待數據準備好,我們用偽碼來寫些流程:

<code>// Read 代碼for (true) { if 數據準備好 { 拷貝數據到用戶空間buffer return } else { 創建一個 wait_queue_entry_t wait_entry; wait_entry.func = 自定義函數,被喚醒時會調用 wait_entry.private = 自定義的數據結構 將此 wait_entry 加入要讀取數據的 設備的等待隊列 set_current_state(TASK_INTERRUPTIBLE) // 將當前進程狀態設置為 TASK_INTERRUPTIBLE schedule() // 將當前進程調度走,進行進程切換 }}// 設備驅動端代碼if 設備有數據可讀 { for ( 遍歷其wait_queue ) { 喚醒 每一個 wait_queue_entry 調用 wait_entry.func { 將上面讀取進程狀態設置為 TASK_RUNNING,並加入CPU核的運行隊列,被再次調度後,將讀取到數據 } }}/<code>

do_select源碼走讀

獲取當前三類fd_set中最大的fd

<code> rcu_read_lock(); retval = max_select_fd(n, fds); rcu_read_unlock(); n = retval;/<code>

上面 n = retval中的 n, 即為三類fd_set中最大的fd, 也是下面要介紹的循環體的上限

初始化用作wait queue中的 wait entry 的private數據結構

<code>poll_initwait(&table);/<code>核心循環體

我們講註釋寫在這個代碼塊裡

<code>// 最外面是一個無限循環,它只有在poll到有效的事件,或者超時,或者有中斷髮生時,才會退出for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; bool can_busy_loop = false; // 首先獲取需要監控的三類fd_set, 其實是 unsigned long 數組 inp = fds->in; outp = fds->out; exp = fds->ex; // 初始化用於保存返回值的三類 fd_set對應的unsigned long 數組 rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; // 開始循環遍歷覆蓋的所有fd, 以上面得到的 n 上限 for (i = 0; i < n; ++rinp, ++routp, ++rexp) { unsigned long in, out, ex, all_bits, bit = 1, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; __poll_t mask; in = *inp++; out = *outp++; ex = *exp++; all_bits = in | out | ex; if (all_bits == 0) { // 如果走到這裡,說明在三類fd_set的數組中,與當前下標對應的三個unsigned long的每一位均為0, 即當前 // 不存在任何監控的fd, 開始下一次循環 i += BITS_PER_LONG; continue; } // 前面介紹過 fd_set的數組元素是 unsigned long, 即一個元素可表示64個fd, 這裡依次遍歷這64個bits for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) { struct fd f; if (i >= n) break; if (!(bit & all_bits)) continue; // 走到這裡,說明當前bit位上有監控的fd f = fdget(i); if (f.file) { // 針對當前fd, 設置其需要監控的事件 wait_key_set(wait, in, out, bit, busy_flag); // vfs_poll 是重中之重,我們下一節會單獨講解,這裡先說明它所作的事 // 1. 初始化wait entry, 將其加入到這個fd對應的socket的等待隊列中 // 2. 獲取當前socket是否有讀,寫,異常等事件並返回 mask = vfs_poll(f.file, wait); fdput(f); // 按位與,看是否有相關事件 if ((mask & POLLIN_SET) && (in & bit)) { res_in |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; wait->_qproc = NULL; } /* got something, stop busy polling */ if (retval) { can_busy_loop = false; busy_flag = 0; /* * only remember a returned * POLL_BUSY_LOOP if we asked for it */ } else if (busy_flag & mask) can_busy_loop = true; } } // 按unsigned long賦值給返回值數組元素 if (res_in) *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; // 這裡有兩層for循環,這裡主動出讓CPU, 進行一次調度 cond_resched(); } wait->_qproc = NULL; // 四種情況下會返回 // 1. 任意監控的fd上有事件發生 // 2. 超時 // 3. 有中斷髮生 // 4. wait queue相關操作發生錯誤 if (retval || timed_out || signal_pending(current)) break; if (table.error) { retval = table.error; break; } ...... // 當前監控的fd上沒有事件發生,也沒有超時或中斷髮生, // 將當前進程設置為 TASK_INTERRUPTIBLE, 並調用 schedule // 等待事件發生時,對應的socket將當前進程喚醒後,從 這裡 // 繼續運行 if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; }/<code>

簡單總結如下:

a. 循環遍歷每一個監控的fd;

b. 有下列情況之一則返回:

<code> 1. 任意監控的fd上有事件發生 2. 超時 3. 有中斷髮生 4. wait queue相關操作發生錯誤/<code>

c. 如查無上述情況,將當前進程設置為 TASK_INTERRUPTIBLE, 並調用 schedule作進程切換;

d. 等待socket 事件發生,對應的socket將當前進程喚醒後,當前進程被再次調度切換回來,繼續運行;

細心的你可能已經發現,這個有個影響效率的問題:即使只有一個監控中的fd有事件發生,當前進程就會被喚醒,然後要將所有監控的fd都遍歷一邊,依次調用vfs_poll來獲取其有效事件,好麻煩啊~~~

vfs_poll 講解

調用層級


作用:

<code>1. 初始化wait entry, 將其加入到這個fd對應的socket的等待隊列中2. 獲取當前socket是否有讀,寫,異常等事件並返回/<code>加入等待隊列時,最終會調用 fs_select.c中的 __pollwait

<code>static void __pollwait(struct file *filp, wait_queue_head_t *wait_address, poll_table *p){ struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt); struct poll_table_entry *entry = poll_get_entry(pwq); if (!entry) return; entry->filp = get_file(filp); entry->wait_address = wait_address; entry->key = p->_key; init_waitqueue_func_entry(&entry->wait, pollwake); entry->wait.private = pwq; // 加入到socket的等待隊列 add_wait_queue(wait_address, &entry->wait);}/<code>


總結

select調用中每類fd_set中最多容納1024個fd;每次調用select都需要將三類fd_set從用戶空間複製到內核空間;wait queue是個好東西,select會被當前進程task加入到每一個監控的socket的等待隊列;select進程被喚醒後即使只有一個被監控的fd有事件發生,也會再次將所有的監控fd遍歷一次;在遍歷fd的過程中會調用cond_resched()來主動出讓CPU, 作進程切換;


關於360技術:360技術是360技術團隊打造的技術分享公眾號,每天推送技術乾貨內容,更多技術信息歡迎關注“360技術”微信公眾號