02.28 Linux select 一網打盡

奇技指南


select, 你可以不用它,但你不能不瞭解它。通過閱讀本文,可以幫你理清select的來龍去脈, 讓你從中瞭解到:


我們常說的select的1024限制指的是什麼 ?怎麼會有這樣的限制?

都說select效率不高,是這樣嗎?為什麼 ?

select使用中有坑嗎?


注:本文的所有內容均指針對 Linux Kernel, 當前使用的源碼版本是 5.3.0


Linux select 一網打盡

原型

<code>int select (int __nfds, fd_set *__restrict __readfds,           fd_set *__restrict __writefds,           fd_set *__restrict __exceptfds,           struct timeval *__restrict __timeout);/<code>

select是IO多種複用的一種實現,它將需要監控的fd分為讀、寫、異常三類,使用fd_set表示,當其返回時要麼是超時,要麼是有至少一種讀、寫或異常事件發生。


相關數據結構

FD_SET

FD_SET是select最重要的數據結構了,其在內核中的定義如下:

<code>typedef __kernel_fd_set     fd_set;#undef __FD_SETSIZE#define __FD_SETSIZE    1024typedef struct {    unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(long))];} __kernel_fd_set;/<code>

我們來簡化下,fd_set是一個struct, 其內部只有一個由16個元素組成的unsigned long數組,這個數組一共可以表示16 × 64 = 1024位, 每一位用來表示一個 fd, 這也就是 select針對讀、寫或異常每一類最多隻能有 1024個fd 限制的由來。


相關宏

下面這些宏定義在內核代碼fs/select.c中

  • FDS_BITPERLONG: 返回每個long有多少位,通常是64bits
<code>#define FDS_BITPERLONG    (8*sizeof(long))/<code>
  • FDS_LONGS(nr): 獲取 nr 個fd 需要用幾個long來表示
<code>#define FDS_LONGS(nr) (((nr)+FDS_BITPERLONG-1)/FDS_BITPERLONG)/<code>
  • FD_BYTES(nr): 獲取 nr 個fd 需要用 多少個字節來表示
<code>#define FDS_BYTES(nr) (FDS_LONGS(nr)*sizeof(long))/<code>

下面這些宏可以在gcc源碼中找到

  • FD_ZERO: 初始化一個fd_set
<code>#define __FD_ZERO(s) \\  do {                                          \\    unsigned int __i;                               \\    fd_set *__arr = (s);                            \\    for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i)        \\      __FDS_BITS (__arr)[__i] = 0;                          \\  } while (0)/<code>

將上面所說的由16個元素組成的unsigned long數組每一個元素都設為 0;

  • __FD_SET(d, s): 將一個fd 賦值到 一個 fd_set
<code>#define __FD_SET(d, s) \\  ((void) (__FDS_BITS (s)[__FD_ELT(d)] |= __FD_MASK(d)))/<code>


分三步:

1、__FD_ELT(d):確定賦值到數組的哪一個元素

<code> #define   __FD_ELT(d) \\   __extension__                                   \\   ({ long int __d = (d);                          \\      (__builtin_constant_p (__d)                      \\       ? (0 <= __d && __d < __FD_SETSIZE                       \\    ? (__d / __NFDBITS)                            \\    : __fdelt_warn (__d))                          \\       : __fdelt_chk (__d)); })/<code>

其中 #define __NFDBITS (8 * (int) sizeof (__fd_mask)) , 即__NFDBITS = 64

這裡實現使用了__builtin_constant_p針對常量作了優化,我也沒有太理解常量與非常量實現方案有什麼不同,我們暫時忽略這個細節看本質。

本質就是 一個 unsigned long有64位,直接 __d / __NFDBITS取模就可以確定用數組的哪一個元素了;

2、__FD_MASK(d):確定賦值到一個 unsigned long的哪一位

<code>#define   __FD_MASK(d)    ((__fd_mask) (1UL << ((d) % __NFDBITS)))/<code>

直接 (d) % __NFDBITS)取餘後作為 1 左移的位數即可

3、|= :用位或賦值即可


在內核中的實現

調用層級

Linux select 一網打盡


系統調用入口位置

位於fs/select.c中

<code>SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,        fd_set __user *, exp, struct timeval __user *, tvp){    return kern_select(n, inp, outp, exp, tvp);}/<code>

入口函數 kern_select

<code>static int kern_select(int n, fd_set __user *inp, fd_set __user *outp,               fd_set __user *exp, struct timeval __user *tvp){    struct timespec64 end_time, *to = NULL;    struct timeval tv;    int ret;    if (tvp) {        if (copy_from_user(&tv, tvp, sizeof(tv)))            return -EFAULT;        to = &end_time;        if (poll_select_set_timeout(to,                tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),                (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))            return -EINVAL;    }    ret = core_sys_select(n, inp, outp, exp, to);    return poll_select_finish(&end_time, tvp, PT_TIMEVAL, ret);}/<code>

做三件事:

a. 如果設置了超時,首先準備時間戳 timespec64;

b. 調用 core_sys_select,這個是具體的實現,我們下面會重點介紹

c. poll_select_finish:做的主要工作就是更新用戶調用select時傳進來的超時參數tvp,我列一下關鍵代碼:

<code>    ktime_get_ts64(&rts);    rts = timespec64_sub(*end_time, rts);    if (rts.tv_sec < 0)        rts.tv_sec = rts.tv_nsec = 0;...    struct timeval rtv;            if (sizeof(rtv) > sizeof(rtv.tv_sec) + sizeof(rtv.tv_usec))                memset(&rtv, 0, sizeof(rtv));            rtv.tv_sec = rts.tv_sec;            rtv.tv_usec = rts.tv_nsec / NSEC_PER_USEC;            if (!copy_to_user(p, &rtv, sizeof(rtv)))                return ret;/<code>

可以看到先獲取當前的時間戳,然後通過timespec64_sub和傳入的時間戳(接中傳入的是超時時間,實現時會轉化為時間戳)求出差值,將此差值傳回給用戶,即返回了剩餘的超時時間。所以這個地方是個小陷阱,用戶在調用select時,需要每次重新初始化這個超時時間。

通過 core_sys_select 實現

這個函數主要功能是在實現真正的select功能前,準備好 fd_set ,即從用戶空間將所需的三類 fd_set 複製到內核空間。從下面的代碼中你會看到對於每次的 select系統調用,都需要從用戶空間將所需的三類 fd_set 複製到內核空間,這裡存在性能上的損耗。

<code>int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,               fd_set __user *exp, struct timespec64 *end_time){    fd_set_bits fds;    void *bits;    int ret, max_fds;    size_t size, alloc_size;    struct fdtable *fdt;    /* Allocate small arguments on the stack to save memory and be faster */    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];    ret = -EINVAL;    if (n < 0)        goto out_nofds;    /* max_fds can increase, so grab it once to avoid race */    rcu_read_lock();    fdt = files_fdtable(current->files);    max_fds = fdt->max_fds;    rcu_read_unlock();    if (n > max_fds)        n = max_fds;    /*     * We need 6 bitmaps (in/out/ex for both incoming and outgoing),     * since we used fdset we need to allocate memory in units of     * long-words.     */    size = FDS_BYTES(n);    bits = stack_fds;    if (size > sizeof(stack_fds) / 6) {        /* Not enough space in on-stack array; must use kmalloc */        ret = -ENOMEM;        if (size > (SIZE_MAX / 6))            goto out_nofds;        alloc_size = 6 * size;        bits = kvmalloc(alloc_size, GFP_KERNEL);        if (!bits)            goto out_nofds;    }    fds.in      = bits;    fds.out     = bits +   size;    fds.ex      = bits + 2*size;    fds.res_in  = bits + 3*size;    fds.res_out = bits + 4*size;    fds.res_ex  = bits + 5*size;    if ((ret = get_fd_set(n, inp, fds.in)) ||        (ret = get_fd_set(n, outp, fds.out)) ||        (ret = get_fd_set(n, exp, fds.ex)))        goto out;    zero_fd_set(n, fds.res_in);    zero_fd_set(n, fds.res_out);    zero_fd_set(n, fds.res_ex);    ret = do_select(n, &fds, end_time);    if (ret < 0)        goto out;    if (!ret) {        ret = -ERESTARTNOHAND;        if (signal_pending(current))            goto out;        ret = 0;    }    if (set_fd_set(n, inp, fds.res_in) ||        set_fd_set(n, outp, fds.res_out) ||        set_fd_set(n, exp, fds.res_ex))        ret = -EFAULT;out:    if (bits != stack_fds)        kvfree(bits);out_nofds:    return ret;}/<code>

代碼中的註釋很清晰,我們這裡簡單過一下:


分五步:

1、規範化select系統調用傳入的第一個參數 n

<code>/* max_fds can increase, so grab it once to avoid race */  rcu_read_lock();  fdt = files_fdtable(current->files);  max_fds = fdt->max_fds;  rcu_read_unlock();  if (n > max_fds)      n = max_fds;/<code>

這個n是三類不同的fd_set中所包括的fd數值的最大值 + 1, linux task打開句柄從0開始,不加1的話可能會少監控fd.

用戶在使用時可以有個偷懶的做法,就是將這個n設置為 FD_SETSIZE,通常是1024,這將監控的範圍擴大到了上限,但實際上遠沒有這麼多fd需要監控,浪費資源。

linux man中的解釋如下:

nfds should be set to the highest-numbered file descriptor in any of the three sets, plus 1. The indicated file descriptors in each set are checked, up to this limit (but see BUGS).


2、計算內核空間所需要的fd_set的空間, 內核態需要三個fd_set來容納用戶態傳遞過來的參數,還需要三個fd_set來容納select調用返回後生成的三灰fd_set, 即一共是6個fd_set

<code>   long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];   . . .   size = FDS_BYTES(n);  bits = stack_fds;  if (size > sizeof(stack_fds) / 6) {      /* Not enough space in on-stack array; must use kmalloc */      ret = -ENOMEM;      if (size > (SIZE_MAX / 6))          goto out_nofds;      alloc_size = 6 * size;      bits = kvmalloc(alloc_size, GFP_KERNEL);      if (!bits)          goto out_nofds;  }  fds.in      = bits;  fds.out     = bits +   size;  fds.ex      = bits + 2*size;  fds.res_in  = bits + 3*size;  fds.res_out = bits + 4*size;  fds.res_ex  = bits + 5*size;/<code>

這裡有個小技巧,先從內核棧上分配空間,如果不夠用,才使用 kvmalloc分配。

通過 size = FDS_BYTES(n);計算出單一一種fd_set所需字節數,

再能過 alloc_size = 6 * size; 即可計算出所需的全部字節數。

3、初始化用作參數的和用作返回值的兩類fd_set

<code>if ((ret = get_fd_set(n, inp, fds.in)) ||      (ret = get_fd_set(n, outp, fds.out)) ||      (ret = get_fd_set(n, exp, fds.ex)))      goto out;  zero_fd_set(n, fds.res_in);  zero_fd_set(n, fds.res_out);  zero_fd_set(n, fds.res_ex);/<code>

4、真正實現部分 do_select, 我們在下面詳講

5、返回結果複製回用戶空間

<code>if (set_fd_set(n, inp, fds.res_in) ||      set_fd_set(n, outp, fds.res_out) ||      set_fd_set(n, exp, fds.res_ex))      ret = -EFAULT;/<code>

這裡又多了一次內核空間到用戶空間的copy,而且我們看到返回值也是用fd_set結構來表示,這意味著我們在用戶空間處理裡也需要遍歷每一位。


  • 精華所在 do_select

wait queue

這裡用到了Linux裡一個很重要的數據結構 wait queue, 我們暫不打算展開來講,先簡單來說下其用法,比如我們在進程中read時經常要等待數據準備好,我們用偽碼來寫些流程:

<code>// Read 代碼for (true) {    if 數據準備好 {        拷貝數據到用戶空間buffer        return    } else {       創建一個 wait_queue_entry_t  wait_entry;       wait_entry.func = 自定義函數,被喚醒時會調用       wait_entry.private = 自定義的數據結構       將此 wait_entry 加入要讀取數據的 設備的等待隊列       set_current_state(TASK_INTERRUPTIBLE) // 將當前進程狀態設置為 TASK_INTERRUPTIBLE       schedule() // 將當前進程調度走,進行進程切換    }}// 設備驅動端代碼if 設備有數據可讀 {   for ( 遍歷其wait_queue ) {      喚醒 每一個 wait_queue_entry      調用 wait_entry.func {           將上面讀取進程狀態設置為 TASK_RUNNING,並加入CPU核的運行隊列,被再次調度後,將讀取到數據      }   }}/<code>

do_select源碼走讀

  • 獲取當前三類fd_set中最大的fd
<code>    rcu_read_lock();  retval = max_select_fd(n, fds);  rcu_read_unlock();    n = retval;/<code>

上面 n = retval中的 n, 即為三類fd_set中最大的fd, 也是下面要介紹的循環體的上限

  • 初始化用作wait queue中的 wait entry 的private數據結構
<code>poll_initwait(&table);/<code>
  • 核心循環體

我們講註釋寫在這個代碼塊裡

<code>// 最外面是一個無限循環,它只有在poll到有效的事件,或者超時,或者有中斷髮生時,才會退出for (;;) {      unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;      bool can_busy_loop = false;       // 首先獲取需要監控的三類fd_set, 其實是 unsigned long 數組      inp = fds->in; outp = fds->out; exp = fds->ex;       // 初始化用於保存返回值的三類 fd_set對應的unsigned long 數組      rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;        // 開始循環遍歷覆蓋的所有fd, 以上面得到的 n 上限      for (i = 0; i < n; ++rinp, ++routp, ++rexp) {          unsigned long in, out, ex, all_bits, bit = 1, j;          unsigned long res_in = 0, res_out = 0, res_ex = 0;          __poll_t mask;                      in = *inp++; out = *outp++; ex = *exp++;          all_bits = in | out | ex;          if (all_bits == 0) {                //  如果走到這裡,說明在三類fd_set的數組中,與當前下標對應的三個unsigned long的每一位均為0, 即當前                // 不存在任何監控的fd, 開始下一次循環              i += BITS_PER_LONG;              continue;          }            // 前面介紹過 fd_set的數組元素是 unsigned long, 即一個元素可表示64個fd, 這裡依次遍歷這64個bits          for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit <<= 1) {              struct fd f;              if (i >= n)                  break;              if (!(bit & all_bits))                  continue;                // 走到這裡,說明當前bit位上有監控的fd                              f = fdget(i);              if (f.file) {                    // 針對當前fd, 設置其需要監控的事件                  wait_key_set(wait, in, out, bit,                           busy_flag);                                        // vfs_poll 是重中之重,我們下一節會單獨講解,這裡先說明它所作的事                    // 1.  初始化wait entry, 將其加入到這個fd對應的socket的等待隊列中                    // 2.  獲取當前socket是否有讀,寫,異常等事件並返回                  mask = vfs_poll(f.file, wait);                  fdput(f);                    // 按位與,看是否有相關事件                  if ((mask & POLLIN_SET) && (in & bit)) {                      res_in |= bit;                      retval++;                      wait->_qproc = NULL;                  }                  if ((mask & POLLOUT_SET) && (out & bit)) {                      res_out |= bit;                      retval++;                      wait->_qproc = NULL;                  }                  if ((mask & POLLEX_SET) && (ex & bit)) {                      res_ex |= bit;                      retval++;                      wait->_qproc = NULL;                  }                                      /* got something, stop busy polling */                  if (retval) {                      can_busy_loop = false;                      busy_flag = 0;                  /*                   * only remember a returned                   * POLL_BUSY_LOOP if we asked for it                   */                  } else if (busy_flag & mask)                      can_busy_loop = true;              }          }                        // 按unsigned long賦值給返回值數組元素          if (res_in)              *rinp = res_in;          if (res_out)              *routp = res_out;          if (res_ex)              *rexp = res_ex;                        // 這裡有兩層for循環,這裡主動出讓CPU, 進行一次調度          cond_resched();      }      wait->_qproc = NULL;            // 四種情況下會返回       // 1.  任意監控的fd上有事件發生       //  2. 超時       //  3. 有中斷髮生       //  4. wait queue相關操作發生錯誤      if (retval || timed_out || signal_pending(current))          break;      if (table.error) {          retval = table.error;          break;      }    ......                //  當前監控的fd上沒有事件發生,也沒有超時或中斷髮生,       //   將當前進程設置為 TASK_INTERRUPTIBLE, 並調用 schedule       //   等待事件發生時,對應的socket將當前進程喚醒後,從 這裡      //   繼續運行      if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE,                     to, slack))          timed_out = 1;  }/<code> 

簡單總結如下:

a. 循環遍歷每一個監控的fd;

b. 有下列情況之一則返回:

<code>       1.  任意監控的fd上有事件發生       2. 超時       3. 有中斷髮生       4. wait queue相關操作發生錯誤/<code>

c. 如查無上述情況,將當前進程設置為 TASK_INTERRUPTIBLE, 並調用 schedule作進程切換;

d. 等待socket 事件發生,對應的socket將當前進程喚醒後,當前進程被再次調度切換回來,繼續運行;

細心的你可能已經發現,這個有個影響效率的問題:即使只有一個監控中的fd有事件發生,當前進程就會被喚醒,然後要將所有監控的fd都遍歷一邊,依次調用vfs_poll來獲取其有效事件,好麻煩啊~~~

vfs_poll 講解

  • 調用層級
Linux select 一網打盡


  • 作用:
<code>1.  初始化wait entry, 將其加入到這個fd對應的socket的等待隊列中2.  獲取當前socket是否有讀,寫,異常等事件並返回/<code>
  • 加入等待隊列時,最終會調用 fs_select.c中的 __pollwait
<code>static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,              poll_table *p){  struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);  struct poll_table_entry *entry = poll_get_entry(pwq);  if (!entry)      return;  entry->filp = get_file(filp);  entry->wait_address = wait_address;  entry->key = p->_key;  init_waitqueue_func_entry(&entry->wait, pollwake);  entry->wait.private = pwq;        // 加入到socket的等待隊列  add_wait_queue(wait_address, &entry->wait);}/<code>


總結

  • select調用中每類fd_set中最多容納1024個fd;
  • 每次調用select都需要將三類fd_set從用戶空間複製到內核空間;
  • wait queue是個好東西,select會被當前進程task加入到每一個監控的socket的等待隊列;
  • select進程被喚醒後即使只有一個被監控的fd有事件發生,也會再次將所有的監控fd遍歷一次;
  • 在遍歷fd的過程中會調用cond_resched()來主動出讓CPU, 作進程切換;


關於360技術:360技術是360技術團隊打造的技術分享公眾號,每天推送技術乾貨內容,更多技術信息歡迎關注“360技術”微信公眾號


分享到:


相關文章: