Reactor Java NIO 的實現
從Java 1.4 開始,提供了NIO(非阻塞IO)。NIO 提供了 SocketChannel 和 ServerSocketChannel 兩種不同的套接字實現。這兩種新增的通道都支持阻塞和非阻塞兩種模式。當然我們通常使用非阻塞的 NIO 來處理網絡請求響應。NIO 裡面新增的 Selector 等相關的API 可以提供給我們實現 Reactor 模式的網絡請求響應模型。
Server 端實現
首先對於 Server 端的實現時序圖如下所示:
(1)首先打開 ServerSocketChannel,用於監聽客戶端的連接。
(2)綁定監聽端口,設置為非阻塞模式。
(3)創建 Reactor 線程,創建多路複用器並啟動線程。
(4)將 ServerSocketChannel 註冊到 Reactor 線程的多路複用器 Selector 上,監聽 ACCEPT 事件。
(5)多路複用器在線程中,無限循環準備就緒的 Key。
(6)多路複用器監聽到有新的客戶端接入,則處理新的接入請求,完成TCP 3次握手,建立物理連接。
(7)設置客戶端鏈路為非阻塞。
(8)將新接入的客戶端連接註冊到 Reactor 線程的多路複用器上,監聽讀操作,用來讀取客戶端發送的消息。
(9)異步讀取客戶端消息到 ByteBuffer 中。
(10)對ByteBuffer 進行解碼,讀取對應的消息數據。
其中,Server 端的代碼實現如下:
MultiplexerTimeServer
public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel serverChannel; private volatile boolean stop; public MultiplexerTimeServer(int port) { try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port), 1024); serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } @Override public void run() { while (!stop) { try { selector.select(1000); Set selectedKeys = selector.selectedKeys(); Iterator iterator = selectedKeys.iterator(); SelectionKey key = null; while (iterator.hasNext()) { key = iterator.next(); iterator.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Exception e) { e.printStackTrace(); } } if (selector != null) { try { selector.close(); } catch (Exception e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws Exception { if (key.isValid()) { // 處理新接入的請求消息 if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // read data SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else { // 忽略 } } } } private void doWrite(SocketChannel channel, String response) throws Exception { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
TimeServer
public class TimeServer { public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (Exception e) { } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start(); } }
Client 端實現
Client 端的時序圖如下所示:
(1)首先打開SocketChannel。
(2)設置SocketChannel為非阻塞,同時設置客戶端連接的TCP 參數。
(3)異步連接服務端。
(4)判斷是否連接成功,如果連接成功,直接註冊讀事件到Selector 中;如果沒有連接成功,則註冊連接事件,重新連接服務端。
(5)向Reactor 線程的多路複用器註冊 OP_CONNECT 狀態位,監聽服務端的TCP 應答。
(6)創建Reactor 線程,創建多路複用器並啟動線程。
(7)多路複用器在線程中,無限循環準備就緒的 Key。
(8)接收connect 事件進行處理。
(9)判斷連接結果,如果連接成功,則註冊讀事件到Selector 上面。
(10)讀取服務端消息到ByteBuffer中。
(11)對ByteBuffer中的數據進行解碼操作。
其中,Client 端的代碼如下所示:
TimeClientHandle
public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host, int port) { this.host = host == null ? "127.0.0.1" : host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (Exception e) { e.printStackTrace(); System.exit(1); } } @Override public void run() { try { doConnect(); } catch (Exception e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { selector.select(1000); Set selectionKeys = selector.selectedKeys(); Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); iterator.remove(); try { handleInput(key); } catch (Exception e1) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } // 多路複用器關閉後, 所有註冊在上面的Channel 和 Pipe 等資源都會自動去註冊並關閉, 所以不需要重複釋放資源 if (selector != null) { try { selector.close(); } catch (Exception e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws Exception { if (key.isValid()) { // 判斷是否連接成功 SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); } doWrite(sc); } else { System.exit(1); } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); this.stop = true; } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else { // 忽略 } } } } private void doConnect() throws Exception { // 如果連接成功, 則註冊到多路複用器上, 發送請求消息, 讀取應答消息 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel socketChannel) throws Exception { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); socketChannel.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("Send order 2 server succeed."); } } }
TimeClient
public class TimeClient { public static void main(String[] args) { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (Exception e) { } } new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start(); } }
Reactor 模式的三種實現
Reactor 模式可以大致分為:單Reactor單線程、單Reactor多線程、多Reactor多進程。上面我們用Java NIO實現的Reactor 模式中,在Server端多路複用器輪詢到網絡事件後,使用的當前線程處理的業務邏輯。此時就是前面提到的 單Reactor單線程。
單Reactor單線程
優點:模型簡單,沒有多線程之間的競爭問題。
缺點:只有一個線程,無法發揮多核CPU的硬件優勢;在Handle 處理某個事件時候,無法同時處理其他就緒的事件。
單Reactor多線程
優點:能夠充分利用多核CPU的硬件能力。
缺點:多線程之間的競爭問題提高了實現的複雜度;單Reactor 在高併發的場景下可能會是網絡處理的瓶頸。
多Reactor多進程
主進程和子進程的職責非常明確,主進程只負責接收新連接,子進程負責完成後續的業務處理;主進程和子進程的交互很簡單,主進程只需要把新的連接傳遞給子進程,子進程無需返回數據;子進程之間是相互獨立的,無需同步共享之類的處理(這裡僅限於網絡模型相關的 select,read,send等無須同步共享,"業務處理"還是有可能需要同步共享的)。
關鍵字: public sc ByteBuffer