Java NIO系列 之Selector(選擇器)

一 Selector(選擇器)介紹

Selector 一般稱 為選擇器 ,當然你也可以翻譯為 多路複用器 。它是Java NIO核心組件中的一個,用於檢查一個或多個NIO Channel(通道)的狀態是否處於可讀、可寫。如此可以實現單線程管理多個channels,也就是可以管理多個網絡鏈接。

使用Selector的好處在於: 使用更少的線程來就可以來處理通道了, 相比使用多個線程,避免了線程上下文切換帶來的開銷。

二 Selector(選擇器)的使用方法介紹

1. Selector的創建

通過調用Selector.open()方法創建一個Selector對象,如下:

  1. Selector selector = Selector.open();

這裡需要說明一下

2. 註冊Channel到Selector

  1. channel.configureBlocking(false);
  2. SelectionKey key = channel.register(selector, Selectionkey.OP_READ);

Channel必須是非阻塞的。 所以FileChannel不適用Selector,因為FileChannel不能切換為非阻塞模式,更準確的來說是因為FileChannel沒有繼承SelectableChannel。Socket channel可以正常使用。

SelectableChannel抽象類 有一個 configureBlocking() 方法用於使通道處於阻塞模式或非阻塞模式。

  1. abstract SelectableChannel configureBlocking(boolean block)

注意:

SelectableChannel抽象類configureBlocking() 方法是由 AbstractSelectableChannel抽象類實現的,SocketChannel、ServerSocketChannel、DatagramChannel都是直接繼承了 AbstractSelectableChannel抽象類 。 大家有興趣可以看看NIO的源碼,各種抽象類和抽象類上層的抽象類。我本人暫時不準備研究NIO源碼,因為還有很多事情要做,需要研究的同學可以自行看看。

register() 方法的第二個參數。這是一個“ interest集合 ”,意思是在通過Selector監聽Channel時對什麼事件感興趣。可以監聽四種不同類型的事件:

  • Connect
  • Accept
  • Read
  • Write

通道觸發了一個事件意思是該事件已經就緒。比如某個Channel成功連接到另一個服務器稱為“ 連接就緒 ”。一個Server Socket Channel準備好接收新進入的連接稱為“ 接收就緒”。一個有數據可讀的通道可以說是“ 讀就緒 ”。等待寫數據的通道可以說是“ 寫就緒

”。

這四種事件用SelectionKey的四個常量來表示:

  1. SelectionKey.OP_CONNECT
  2. SelectionKey.OP_ACCEPT
  3. SelectionKey.OP_READ
  4. SelectionKey.OP_WRITE

如果你對不止一種事件感興趣,使用或運算符即可,如下:

  1. int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;

3. SelectionKey介紹

一個SelectionKey鍵表示了一個特定的通道對象和一個特定的選擇器對象之間的註冊關係。

  1. key.attachment(); //返回SelectionKey的attachment,attachment可以在註冊channel的時候指定。
  2. key.channel(); // 返回該SelectionKey對應的channel。
  3. key.selector(); // 返回該SelectionKey對應的Selector。
  4. key.interestOps(); //返回代表需要Selector監控的IO操作的bit mask
  5. key.readyOps(); // 返回一個bit mask,代表在相應channel上可以進行的IO操作。

key.interestOps():

我們可以通過以下方法來判斷Selector是否對Channel的某種事件感興趣

  1. int interestSet = selectionKey.interestOps();
  2. boolean isInterestedInAccept = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;
  3. boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
  4. boolean isInterestedInRead = interestSet & SelectionKey.OP_READ;
  5. boolean isInterestedInWrite = interestSet & SelectionKey.OP_WRITE;

key.readyOps()

ready 集合是通道已經準備就緒的操作的集合。JAVA中定義以下幾個方法用來檢查這些操作是否就緒.

Java NIO系列 之Selector(選擇器)

  1. //創建ready集合的方法
  2. int readySet = selectionKey.readyOps();
  3. //檢查這些操作是否就緒的方法
  4. key.isAcceptable();//是否可讀,是返回 true
  5. boolean isWritable()://是否可寫,是返回 true
  6. boolean isConnectable()://是否可連接,是返回 true
  7. boolean isAcceptable()://是否可接收,是返回 true

從SelectionKey訪問Channel和Selector很簡單。如下:

  1. Channel channel = key.channel();
  2. Selector selector = key.selector();
  3. key.attachment();

可以將一個對象或者更多信息附著到SelectionKey上,這樣就能方便的識別某個給定的通道。例如,可以附加 與通道一起使用的Buffer,或是包含聚集數據的某個對象。使用方法如下:

  1. key.attach(theObject);
  2. Object attachedObj = key.attachment();

還可以在用register()方法向Selector註冊Channel的時候附加對象。如:

  1. SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

4. 從Selector中選擇channel(Selecting Channels via a Selector)

選擇器維護註冊過的通道的集合,並且這種註冊關係都被封裝在SelectionKey當中.

Selector維護的三種類型SelectionKey集合:

  • 已註冊的鍵的集合(Registered key set)
  • 所有與選擇器關聯的通道所生成的鍵的集合稱為已經註冊的鍵的集合。並不是所有註冊過的鍵都仍然有效。這個集合通過 keys() 方法返回,並且可能是空的。這個已註冊的鍵的集合不是可以直接修改的;試圖這麼做的話將引發java.lang.UnsupportedOperationException。
  • 已選擇的鍵的集合(Selected key set)
  • 所有與選擇器關聯的通道所生成的鍵的集合稱為已經註冊的鍵的集合。並不是所有註冊過的鍵都仍然有效。這個集合通過 keys() 方法返回,並且可能是空的。這個已註冊的鍵的集合不是可以直接修改的;試圖這麼做的話將引發java.lang.UnsupportedOperationException。
  • 已取消的鍵的集合(Cancelled key set)
  • 已註冊的鍵的集合的子集,這個集合包含了 cancel() 方法被調用過的鍵(這個鍵已經被無效化),但它們還沒有被註銷。這個集合是選擇器對象的私有成員,因而無法直接訪問。
  • 注意:當鍵被取消( 可以通過isValid( ) 方法來判斷)時,它將被放在相關的選擇器的已取消的鍵的集合裡。註冊不會立即被取消,但鍵會立即失效。當再次調用
    select( ) 方法時(或者一個正在進行的select()調用結束時),已取消的鍵的集合中的被取消的鍵將被清理掉,並且相應的註銷也將完成。通道會被註銷,而新的SelectionKey將被返回。當通道關閉時,所有相關的鍵會自動取消(記住,一個通道可以被註冊到多個選擇器上)。當選擇器關閉時,所有被註冊到該選擇器的通道都將被註銷,並且相關的鍵將立即被無效化(取消)。一旦鍵被無效化,調用它的與選擇相關的方法就將拋出CancelledKeyException。

select()方法介紹:

在剛初始化的Selector對象中,這三個集合都是空的。 通過Selector的select()方法可以選擇已經準備就緒的通道 (這些通道包含你感興趣的的事件)。比如你對讀就緒的通道感興趣,那麼select()方法就會返回讀事件已經就緒的那些通道。下面是Selector幾個重載的select()方法:

  • int select():阻塞到至少有一個通道在你註冊的事件上就緒了。
  • int select(long timeout):和select()一樣,但最長阻塞時間為timeout毫秒。
  • int selectNow():非阻塞,只要有通道就緒就立刻返回。

select()方法返回的int值表示有多少通道已經就緒,是自上次調用select()方法後有多少通道變成就緒狀態。之前在select()調用時進入就緒的通道不會在本次調用中被記入,而在前一次select()調用進入就緒但現在已經不在處於就緒的通道也不會被記入。例如:首次調用select()方法,如果有一個通道變成就緒狀態,返回了1,若再次調用select()方法,如果另一個通道就緒了,它會再次返回1。如果對第一個就緒的channel沒有做任何操作,現在就有兩個就緒的通道,但在每次select()方法調用之間,只有一個通道就緒了。

一旦調用select()方法,並且返回值不為0時,則 可以通過調用Selector的selectedKeys()方法來訪問已選擇鍵集合

Java NIO系列 之Selector(選擇器)

如下: Set selectedKeys=selector.selectedKeys(); 進而可以放到和某SelectionKey關聯的Selector和Channel。如下所示:

  1. Set selectedKeys = selector.selectedKeys();
  2. Iterator keyIterator = selectedKeys.iterator();
  3. while(keyIterator.hasNext()) {
  4. SelectionKey key = keyIterator.next();
  5. if(key.isAcceptable()) {
  6. // a connection was accepted by a ServerSocketChannel.
  7. } else if (key.isConnectable()) {
  8. // a connection was established with a remote server.
  9. } else if (key.isReadable()) {
  10. // a channel is ready for reading
  11. } else if (key.isWritable()) {
  12. // a channel is ready for writing
  13. }
  14. keyIterator.remove();
  15. }

5. 停止選擇的方法

選擇器執行選擇的過程,系統底層會依次詢問每個通道是否已經就緒,這個過程可能會造成調用線程進入阻塞狀態,那麼我們有以下三種方式可以喚醒在select()方法中阻塞的線程。

  • wakeup()方法 :通過調用Selector對象的wakeup()方法讓處在阻塞狀態的select()方法立刻返回 該方法使得選擇器上的第一個還沒有返回的選擇操作立即返回。如果當前沒有進行中的選擇操作,那麼下一次對select()方法的一次調用將立即返回。
  • close()方法 :通過close()方法關閉Selector, 該方法使得任何一個在選擇操作中阻塞的線程都被喚醒(類似wakeup()),同時使得註冊到該Selector的所有Channel被註銷,所有的鍵將被取消,但是Channel本身並不會關閉。
Java NIO系列 之Selector(選擇器)

三 模板代碼

一個服務端的模板代碼:

有了模板代碼我們在編寫程序時,大多數時間都是在模板代碼中添加相應的業務代碼

  1. ServerSocketChannel ssc = ServerSocketChannel.open();
  2. ssc.socket().bind(new InetSocketAddress("localhost", 8080));
  3. ssc.configureBlocking(false);
  4. Selector selector = Selector.open();
  5. ssc.register(selector, SelectionKey.OP_ACCEPT);
  6. while(true) {
  7. int readyNum = selector.select();
  8. if (readyNum == 0) {
  9. continue;
  10. }
  11. Set selectedKeys = selector.selectedKeys();
  12. Iterator it = selectedKeys.iterator();
  13. while(it.hasNext()) {
  14. SelectionKey key = it.next();
  15. if(key.isAcceptable()) {
  16. // 接受連接
  17. } else if (key.isReadable()) {
  18. // 通道可讀
  19. } else if (key.isWritable()) {
  20. // 通道可寫
  21. }
  22. it.remove();
  23. }
  24. }

四 客戶端與服務端簡單交互實例

服務端:

  1. package selector;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SelectionKey;
  6. import java.nio.channels.Selector;
  7. import java.nio.channels.ServerSocketChannel;
  8. import java.nio.channels.SocketChannel;
  9. import java.util.Iterator;
  10. import java.util.Set;
  11. public class WebServer {
  12. public static void main(String[] args) {
  13. try {
  14. ServerSocketChannel ssc = ServerSocketChannel.open();
  15. ssc.socket().bind(new InetSocketAddress("127.0.0.1", 8000));
  16. ssc.configureBlocking(false);
  17. Selector selector = Selector.open();
  18. // 註冊 channel,並且指定感興趣的事件是 Accept
  19. ssc.register(selector, SelectionKey.OP_ACCEPT);
  20. ByteBuffer readBuff = ByteBuffer.allocate(1024);
  21. ByteBuffer writeBuff = ByteBuffer.allocate(128);
  22. writeBuff.put("received".getBytes());
  23. writeBuff.flip();
  24. while (true) {
  25. int nReady = selector.select();
  26. Set keys = selector.selectedKeys();
  27. Iterator it = keys.iterator();
  28. while (it.hasNext()) {
  29. SelectionKey key = it.next();
  30. it.remove();
  31. if (key.isAcceptable()) {
  32. // 創建新的連接,並且把連接註冊到selector上,而且,
  33. // 聲明這個channel只對讀操作感興趣。
  34. SocketChannel socketChannel = ssc.accept();
  35. socketChannel.configureBlocking(false);
  36. socketChannel.register(selector, SelectionKey.OP_READ);
  37. }
  38. else if (key.isReadable()) {
  39. SocketChannel socketChannel = (SocketChannel) key.channel();
  40. readBuff.clear();
  41. socketChannel.read(readBuff);
  42. readBuff.flip();
  43. System.out.println("received : " + new String(readBuff.array()));
  44. key.interestOps(SelectionKey.OP_WRITE);
  45. }
  46. else if (key.isWritable()) {
  47. writeBuff.rewind();
  48. SocketChannel socketChannel = (SocketChannel) key.channel();
  49. socketChannel.write(writeBuff);
  50. key.interestOps(SelectionKey.OP_READ);
  51. }
  52. }
  53. }
  54. } catch (IOException e) {
  55. e.printStackTrace();
  56. }
  57. }
  58. }

客戶端:

  1. package selector;
  2. import java.io.IOException;
  3. import java.net.InetSocketAddress;
  4. import java.nio.ByteBuffer;
  5. import java.nio.channels.SocketChannel;
  6. public class WebClient {
  7. public static void main(String[] args) throws IOException {
  8. try {
  9. SocketChannel socketChannel = SocketChannel.open();
  10. socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000));
  11. ByteBuffer writeBuffer = ByteBuffer.allocate(32);
  12. ByteBuffer readBuffer = ByteBuffer.allocate(32);
  13. writeBuffer.put("hello".getBytes());
  14. writeBuffer.flip();
  15. while (true) {
  16. writeBuffer.rewind();
  17. socketChannel.write(writeBuffer);
  18. readBuffer.clear();
  19. socketChannel.read(readBuffer);
  20. }
  21. } catch (IOException e) {
  22. }
  23. }
  24. }

運行結果:

先運行服務端,再運行客戶端,服務端會不斷收到客戶端發送過來的消息。


分享到:


相關文章: