dubbo源碼分析——遠程通信 netty

dubbo 做為 RPC 框架,需要進行跨 JVM 通信,要保證高性、穩定的進行遠程通信。dubbo 底層通信選擇了 netty 這個 nio 框架做為默認的網絡通信框架並且通過自定義協議進行通信。dubbo 支持以下網絡通信框架:

  • Netty(默認)
  • Mina
  • Grizzly

1、 netty 簡介

在網絡編碼領悟,Netty 是 Java 的卓越框架。它封裝了 Java NIO 操作的複雜性,使得開發者能夠使用其提供的簡易 API 就能夠開發出高效的網絡程序。

首先我們來看一下 netty 裡面的關鍵特性:

分類Netty的特性設計統一的 API ,支持多種傳輸類型,阻塞和非阻塞的簡單
簡單而強大的線程模型
真正的無連接數據報套接字支持
鏈接邏輯組件以支持複用易於使用詳實的 javadoc 和大量的示例集
不需要超過 JDK 1.6 的依賴性能擁有比 Java 核心 API 更高的吞吐量以及更低的延遲
得益於池化和複用,擁有更低的資源消耗
最少的內存複製健壯性不會因為慢速、快速或者超載的連接而導致 OutOfMemoryError
消除在高速網絡中 NIO 應用程序常見的不公平讀/寫比率安全性完整的 SSL/TLS 以及 StartTLS 支持
可用於受限環境下,如 Applet 和 OSGI社區驅動發佈快速而頻繁

很多公司包含:Apple、Twitter、Facebook、Google 都在使用 Netty,並且很多流行的開源項目也在使用 Netty如:Vert.x 、Apache Cassandra 和 Elasticsearch,它們所有的核心代碼都是利用了 Netty 強大的網絡抽象。

2、netty 的核心組件

在 netty 中主要包括以下主要的核心構件塊:

  • Channel
  • EventLoop
  • ChannelHandler
  • ChannelFuture

2.1 Channel

Channel 是 Java NIO 裡面的一個基本構造。可以把它看做是傳入或傳出數據的載體,然後通過 NIO 裡面的 Buffer 來進行數據傳遞。

dubbo源碼分析——遠程通信 netty

而在 Netty 中自己也定義了一個 Channel,它的主要作用是:

>連接到網絡套接字或能夠進行輸入輸出的組件的連接操作,如讀、寫、連接和綁定。

通道提供給用戶以下功能:

  • 通道的當前狀態(例如,open?connected?)
  • 通道的 ChannelConfig 配置參數(例如,接收緩衝區大小)
  • 通道支持的輸入/輸出操作(如讀、寫、連接和綁定)
  • 通道處理與通道相關聯的所有輸入/輸出事件和請求

具體的方法可以參看 Netty 的 javadoc 中的 Channel

2.2 EventLoop

EventLoop 是 Netty 的核心概念, netty 裡面的網絡編程處理都是基於事件回調。下面的圖說明了 Channel、EventLoop、Thread 以及 EventLoopGroup 之間的關係

dubbo源碼分析——遠程通信 netty

它們的關係是:

  • 一個 EventLoopGroup 包含一個或者多個 EventLoop;
  • 一個 EventLoop 在它的生命週期內只和一個 Thread 綁定;
  • 所有的 EventLoop 處理的 I/O 事件都將在它專門的 Thread 被處理;
  • 一個 Channel 在它的生命週期內只註冊於一個 EventLoop;
  • 一個 EventLoop 可能會被分配一個或者多個 Channel;

netty 這樣的設計,一個操作 I/O 的 Channel 都會由相同的 Thread 執行的,這樣就能夠消除線程之間的同步。所以 netty 內部是通過回調來處理事件。當一個回調被觸發時相關的事件。

2.3 ChannelHandler

ChannelHandler 在 Netty 裡面的地位,大家可以理解成 Bean 在 Spring 裡面的地址。Spring 把需要操作的 POJO 抽象成 Bean,然後對於對象的管理都是可以通過 spring bean 的生命週期來進行管理。同樣的,我們在操作 Netty 的時候,其實就是操作的就是 ChannelHandler。

在上一個小節闡述了 EventLoop 在一個線程裡面管理 I/O 操作 Channel 的生命週期。而 netty 的內部是通過回調來處理相關的事件。理解 ChannelHandler 其實可以理解成 Java Servlet 裡面的 Filter.

dubbo源碼分析——遠程通信 netty

而 Netty 在處理 Channel 的時候也是類似的。只不過它區別得更加明確, 入站的時候使用是 ChannelHandler 的子接口 ChannelInboundHandler,而出站使用的 ChannelHandler 的子接口 ChannelOutboundHandler,然後 ChannelPipeline 提供了 ChannelHandler 鏈的容器。

dubbo源碼分析——遠程通信 netty

具體使用 Netty 的時候可以註冊哪些回調事件,大家可以參看以下接口對應的 API:

  • ChannelHandler:ChannelHandler 的註冊與刪除
  • ChannelInboundHandler:ChannelHandler 的入站操作
  • ChannelOutboundHandler:ChannelHandler 的出站操作

2.4 ChannelFuture

因為 Netty 裡面的操作都是異步的,所以一個操作可能不會立即返回。這樣我們就需要一種用於在之後的某個時間點確定其結果的方法。因此, Netty 提供了 ChannelFuture 接口。它提供了 addListener() 方法來註冊一個 ChannelFutureListener,以便在某個操作完成時(無論是否成功) 得到通知。

> 可以將 ChannelFuture 看做是將來要執行的操作的結果的佔位符。它究竟什麼時候被執行則可能取決於若干因素,因此不可能準確的預測,但是可以肯定的是它將會被執行。並且,所有屬於同一個 Channel 的操作都被保證其將以它們被調用的順序被執行

3、Netty Demo

在上面的章節介紹了 Netty 裡面的核心概念,下面我們就來編寫一個 Hello World 來加深一下對上面概念的理解。

3.1 Netty Server

編寫 Server 端業務處理類,因為需要響應客戶端傳入的信息,所以需要實現 ChannelInboundHandler 接口。作為一個簡單的 hello world 程序,我們可以繼承 ChannelInboundHandlerAdapter 它提供了 ChannelInboundHandler 的默認實現,我們只需要重寫需要關注的方法就行了。

> EchoServerHandler

@ChannelHandler.Sharablepublic class EchoServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 將接收到的消息寫給發送者,而不沖刷出站消息 ByteBuf in = (ByteBuf) msg; System.out.println("Server received : " + in.toString(CharsetUtil.UTF_8)); ctx.write(in); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // 將未決消息沖刷到遠程節點,並且關閉該 Channel ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 打印異常堆棧 cause.printStackTrace(); // 關閉該 Channel ctx.close();}

下面就需要創建引導服務器,綁定服務器並且監聽並且接收傳入連接請求的端口,配置 Channel ,用來將傳入的入站消息通知給 EchoServerHandler 實例。

> EchoServer

public class EchoServer { private final int port; public EchoServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { new EchoServer(8080).start(); } public void start() throws Exception { final EchoServerHandler serverHandler = new EchoServerHandler(); EventLoopGroup boss = new NioEventLoopGroup(); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .localAddress(new InetSocketAddress(port)) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(serverHandler); } }); ChannelFuture future = bootstrap.bind().sync(); future.channel().closeFuture().sync(); } finally { boss.shutdownGracefully().sync(); worker.shutdownGracefully().sync(); } }}

上面的服務主要做了以下幾件事:

  • main() 方法引導了服務器,引導過程中所需要以下步驟
  • 創建一個 ServerBootstrap 的實例用於引導和綁定服務
  • 創建兩個 EventLoopGroup 實例,boss 用於接收請求,而 worker 用於真正的請求處理
  • 指定服務器綁定到本地的 InetSocketAddress
  • 使用一個 EventLoopGroup 的實例初始化每一個新的 Channel
  • 調用 ServerBootstrap.bind() 方法綁定服務器

這個時候服務器已經初始化好了,可以 接收客戶端發送過來的請求了。

3.2 Netty Client

首先編碼客戶端處理類,客戶端處理類需要發送一個或者多個消息到服務器。並且在發送消息之後接收服務器發回的響應,最後關閉連接。

> EchoClientHandler

@ChannelHandler.Sharablepublic class EchoClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { // 當被通知 Channel 活躍的時候,發送消息 ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rock ! ", CharsetUtil.UTF_8)); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 打印服務端響應的信息 ByteBuf in = (ByteBuf) msg; System.out.println("Client received : " + in.toString(CharsetUtil.UTF_8)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // 打印異常,並關閉 Channel cause.printStackTrace(); ctx.close(); }}

下面我們就需要編寫客戶端引導類,用於連接服務端並且與服務端進行通信。

> EchoClient

public class EchoClient { private final String host; private final int prot; public EchoClient(String host, int prot) { this.host = host; this.prot = prot; } public static void main(String[] args) throws Exception { new EchoClient("localhost", 8080).start(); } public void start() throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(host, prot)) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new EchoClientHandler()); } }); ChannelFuture future = bootstrap.connect().sync(); future.channel().closeFuture().sync(); } finally { group.shutdownGracefully().sync(); } }}

以上代碼做的事如下:

  • 創建一個 Bootstrap 實例以初始化客戶端
  • 創建一個 EventLoopGroup 實例處理創建新的連接以及處理入站和出站數據
  • 為連接服務器創建一個 InetSocketAddress 實例
  • 當連接被建立時,一個 EchoClientHandler 實例會被安裝到 ChannelPipeline 中
  • 一切設置完成後,調用 Bootstrap.connect() 方法連接到遠程節點

下面我們就可以測試程序的正確性了。

3.3 Test

運行服務端引導類 EchoServer,然後運行客戶端引導類 EchoClient。此時在服務端的控制檯打印以下結果:

dubbo源碼分析——遠程通信 netty

接著在客戶端的控制檯打印以下結果:

dubbo源碼分析——遠程通信 netty

是不是很簡單 :)

dubbo源碼分析——遠程通信 netty


分享到:


相關文章: