Kafka網絡模型基礎-Reactor(下)

Kafka網絡模型基礎-Reactor(下)

Reactor Java NIO 的實現

從Java 1.4 開始,提供了NIO(非阻塞IO)。NIO 提供了 SocketChannel 和 ServerSocketChannel 兩種不同的套接字實現。這兩種新增的通道都支持阻塞和非阻塞兩種模式。當然我們通常使用非阻塞的 NIO 來處理網絡請求響應。NIO 裡面新增的 Selector 等相關的API 可以提供給我們實現 Reactor 模式的網絡請求響應模型。

Server 端實現

首先對於 Server 端的實現時序圖如下所示:

Kafka網絡模型基礎-Reactor(下)

(1)首先打開 ServerSocketChannel,用於監聽客戶端的連接。

(2)綁定監聽端口,設置為非阻塞模式。

(3)創建 Reactor 線程,創建多路複用器並啟動線程。

(4)將 ServerSocketChannel 註冊到 Reactor 線程的多路複用器 Selector 上,監聽 ACCEPT 事件。

(5)多路複用器在線程中,無限循環準備就緒的 Key。

(6)多路複用器監聽到有新的客戶端接入,則處理新的接入請求,完成TCP 3次握手,建立物理連接。

(7)設置客戶端鏈路為非阻塞。

(8)將新接入的客戶端連接註冊到 Reactor 線程的多路複用器上,監聽讀操作,用來讀取客戶端發送的消息。

(9)異步讀取客戶端消息到 ByteBuffer 中。

(10)對ByteBuffer 進行解碼,讀取對應的消息數據。

其中,Server 端的代碼實現如下:

MultiplexerTimeServer

public class MultiplexerTimeServer implements Runnable {
 private Selector selector;
 private ServerSocketChannel serverChannel;
 private volatile boolean stop;
 public MultiplexerTimeServer(int port) {
 try {
 selector = Selector.open();
 serverChannel = ServerSocketChannel.open();
 serverChannel.configureBlocking(false);
 serverChannel.socket().bind(new InetSocketAddress(port), 1024);
 serverChannel.register(selector, SelectionKey.OP_ACCEPT);
 System.out.println("The time server is start in port : " + port);
 } catch (Exception e) {
 e.printStackTrace();
 System.exit(1);
 }
 }
 public void stop() {
 this.stop = true;
 }
 @Override
 public void run() {
 while (!stop) {
 try {
 selector.select(1000);
 Set selectedKeys = selector.selectedKeys();
 Iterator iterator = selectedKeys.iterator();
 SelectionKey key = null;
 while (iterator.hasNext()) {
 key = iterator.next();
 iterator.remove();
 try {
 handleInput(key);
 } catch (Exception e) {
 if (key != null) {
 key.cancel();
 if (key.channel() != null) {
 key.channel().close();
 }
 }
 }
 }
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 if (selector != null) {
 try {
 selector.close();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 }
 private void handleInput(SelectionKey key) throws Exception {
 if (key.isValid()) {
 // 處理新接入的請求消息
 if (key.isAcceptable()) {
 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
 SocketChannel sc = ssc.accept();
 sc.configureBlocking(false);
 sc.register(selector, SelectionKey.OP_READ);
 }
 if (key.isReadable()) {
 // read data
 SocketChannel sc = (SocketChannel) key.channel();
 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 int readBytes = sc.read(readBuffer);
 if (readBytes > 0) {
 readBuffer.flip();
 byte[] bytes = new byte[readBuffer.remaining()];
 readBuffer.get(bytes);
 String body = new String(bytes, "UTF-8");
 System.out.println("The time server receive order : " + body);
 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER";
 doWrite(sc, currentTime);
 } else if (readBytes < 0) {
 // 對端鏈路關閉
 key.cancel();
 sc.close();
 } else {
 // 忽略
 }
 }
 }
 }
 private void doWrite(SocketChannel channel, String response) throws Exception {
 if (response != null && response.trim().length() > 0) {
 byte[] bytes = response.getBytes();
 ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
 writeBuffer.put(bytes);
 writeBuffer.flip();
 channel.write(writeBuffer);
 }
 }
}

TimeServer

public class TimeServer {
 public static void main(String[] args) throws Exception {
 int port = 8080;
 if (args != null && args.length > 0) {
 try {
 port = Integer.valueOf(args[0]);
 } catch (Exception e) {
 }
 }
 MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
 new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
 }
}

Client 端實現

Client 端的時序圖如下所示:

Kafka網絡模型基礎-Reactor(下)

(1)首先打開SocketChannel。

(2)設置SocketChannel為非阻塞,同時設置客戶端連接的TCP 參數。

(3)異步連接服務端。

(4)判斷是否連接成功,如果連接成功,直接註冊讀事件到Selector 中;如果沒有連接成功,則註冊連接事件,重新連接服務端。

(5)向Reactor 線程的多路複用器註冊 OP_CONNECT 狀態位,監聽服務端的TCP 應答。

(6)創建Reactor 線程,創建多路複用器並啟動線程。

(7)多路複用器在線程中,無限循環準備就緒的 Key。

(8)接收connect 事件進行處理。

(9)判斷連接結果,如果連接成功,則註冊讀事件到Selector 上面。

(10)讀取服務端消息到ByteBuffer中。

(11)對ByteBuffer中的數據進行解碼操作。

其中,Client 端的代碼如下所示:

TimeClientHandle

public class TimeClientHandle implements Runnable {
 private String host;
 private int port;
 private Selector selector;
 private SocketChannel socketChannel;
 private volatile boolean stop;
 public TimeClientHandle(String host, int port) {
 this.host = host == null ? "127.0.0.1" : host;
 this.port = port;
 try {
 selector = Selector.open();
 socketChannel = SocketChannel.open();
 socketChannel.configureBlocking(false);
 } catch (Exception e) {
 e.printStackTrace();
 System.exit(1);
 }
 }
 @Override
 public void run() {
 try {
 doConnect();
 } catch (Exception e) {
 e.printStackTrace();
 System.exit(1);
 }
 while (!stop) {
 try {
 selector.select(1000);
 Set selectionKeys = selector.selectedKeys();
 Iterator iterator = selectionKeys.iterator();
 while (iterator.hasNext()) {
 SelectionKey key = iterator.next();
 iterator.remove();
 try {
 handleInput(key);
 } catch (Exception e1) {
 if (key != null) {
 key.cancel();
 if (key.channel() != null) {
 key.channel().close();
 }
 }
 }
 }
 } catch (Exception e) {
 e.printStackTrace();
 System.exit(1);
 }
 }
 // 多路複用器關閉後, 所有註冊在上面的Channel 和 Pipe 等資源都會自動去註冊並關閉, 所以不需要重複釋放資源
 if (selector != null) {
 try {
 selector.close();
 } catch (Exception e) {
 e.printStackTrace();
 }
 }
 }
 private void handleInput(SelectionKey key) throws Exception {
 if (key.isValid()) {
 // 判斷是否連接成功
 SocketChannel sc = (SocketChannel) key.channel();
 if (key.isConnectable()) {
 if (sc.finishConnect()) {
 sc.register(selector, SelectionKey.OP_READ);
 }
 doWrite(sc);
 } else {
 System.exit(1);
 }
 if (key.isReadable()) {
 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
 int readBytes = sc.read(readBuffer);
 if (readBytes > 0) {
 readBuffer.flip();
 byte[] bytes = new byte[readBuffer.remaining()];
 readBuffer.get(bytes);
 String body = new String(bytes, "UTF-8");
 System.out.println("Now is : " + body);
 this.stop = true;
 } else if (readBytes < 0) {
 // 對端鏈路關閉
 key.cancel();
 sc.close();
 } else {
 // 忽略
 }
 }
 }
 }
 private void doConnect() throws Exception {
 // 如果連接成功, 則註冊到多路複用器上, 發送請求消息, 讀取應答消息
 if (socketChannel.connect(new InetSocketAddress(host, port))) {
 socketChannel.register(selector, SelectionKey.OP_READ);
 doWrite(socketChannel);
 } else {
 socketChannel.register(selector, SelectionKey.OP_CONNECT);
 }
 }
 private void doWrite(SocketChannel socketChannel) throws Exception {
 byte[] req = "QUERY TIME ORDER".getBytes();
 ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
 writeBuffer.put(req);
 writeBuffer.flip();
 socketChannel.write(writeBuffer);
 if (!writeBuffer.hasRemaining()) {
 System.out.println("Send order 2 server succeed.");
 }
 }
}
 

TimeClient

public class TimeClient {
 public static void main(String[] args) {
 int port = 8080;
 if (args != null && args.length > 0) {
 try {
 port = Integer.valueOf(args[0]);
 } catch (Exception e) {
 }
 }
 new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();
 }
}

Reactor 模式的三種實現

Reactor 模式可以大致分為:單Reactor單線程、單Reactor多線程、多Reactor多進程。上面我們用Java NIO實現的Reactor 模式中,在Server端多路複用器輪詢到網絡事件後,使用的當前線程處理的業務邏輯。此時就是前面提到的 單Reactor單線程。

單Reactor單線程

Kafka網絡模型基礎-Reactor(下)

優點:模型簡單,沒有多線程之間的競爭問題。

缺點:只有一個線程,無法發揮多核CPU的硬件優勢;在Handle 處理某個事件時候,無法同時處理其他就緒的事件。

單Reactor多線程

Kafka網絡模型基礎-Reactor(下)

優點:能夠充分利用多核CPU的硬件能力。

缺點:多線程之間的競爭問題提高了實現的複雜度;單Reactor 在高併發的場景下可能會是網絡處理的瓶頸。

多Reactor多進程

Kafka網絡模型基礎-Reactor(下)

主進程和子進程的職責非常明確,主進程只負責接收新連接,子進程負責完成後續的業務處理;主進程和子進程的交互很簡單,主進程只需要把新的連接傳遞給子進程,子進程無需返回數據;子進程之間是相互獨立的,無需同步共享之類的處理(這裡僅限於網絡模型相關的 select,read,send等無須同步共享,"業務處理"還是有可能需要同步共享的)。


分享到:


相關文章: