同步調用
同步調用是一種阻塞式的調用方式,即 Consumer 端代碼一直阻塞等待,直到 Provider 端返回為止;
dubbo默認的協議是netty, Netty 是NIO 異步通訊機制,那麼服務調用是怎麼轉化為同步的呢?
下面看源碼:
省略一部分調用鏈,最終會來到這裡 DubboInvoker
<code>protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation)invocation; ... try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, "timeout", 1000); //忽略返回值 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); //異步調用 } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<object>(future)); return new RpcResult(); //同步調用 } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } }}/<object>/<code>
接著我們看同步調用部分,(Result) currentClient.request(inv, timeout).get();
關於上面這句代碼,它包含兩個動作:先調用currentClient.request方法,通過Netty發送請求數據;然後調用其返回值的get方法,來獲取返回值。
1、發送請求
這一步主要是將請求方法封裝成Request對象,通過Netty將數據發送到服務端,然後返回一個DefaultFuture對象。 com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel的Request方法:
<code>public ResponseFuture request(Object request, int timeout) throws RemotingException { //如果客戶端已斷開連接 if (closed) { throw new RemotingException("......."); } //封裝請求信息 Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); //構建DefaultFuture對象 DefaultFuture future = new DefaultFuture(channel, req, timeout); try { //通過Netty發送網絡數據 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future;}/<code>
如上代碼,邏輯很清晰。關於看它的返回值是一個DefaultFuture對象,我們再看它的構造方法。
<code>public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter("timeout", 1000); //當前Future和請求信息的映射 FUTURES.put(id, this); //當前Channel和請求信息的映射 CHANNELS.put(id, channel);}/<code>
同時類加載的時候會啟動一個超時掃描線程:
<code>static { Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer"); th.setDaemon(true); th.start();}/<code>
看看線程掃描啥:
<code>private static class RemotingInvocationTimeoutScan implements Runnable { @Override public void run() { while (true) { try { // 就是去掃描DefaultFuture列表 for (DefaultFuture future : FUTURES.values()) { if (future == null || future.isDone()) { continue; } // 如果future未完成,且超時 if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) { // 創建一個異常的Response Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // 處理異常 DefaultFuture.received(future.getChannel(), timeoutResponse); } } Thread.sleep(30); } catch (Throwable e) { logger.error("Exception when scan the timeout invocation of remoting.", e); } } } }/<code>
2、獲取返回值
我們接著看get方法。
<code>public Object get(int timeout) throws RemotingException { //設置默認超時時間 if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } //判斷 如果操作未完成 if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { //通過加鎖、等待 while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } //返回數據 return returnFromResponse();} //獲取返回值responseprivate Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } // 正常返回,返回 Result 對象 if (res.getStatus() == Response.OK) { return res.getResult(); } // 超時處理 if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 重新拋出異常 throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage());}/<code>
其中的isDone()方法為:
<code> public boolean isDone() { return this.response != null; }/<code>
我們總結下它的運行流程:
- 判斷超時時間,小於0則設置默認值
- 判斷操作是否已完成,即response是否為空;如果已完成,獲取返回值,並返回
- 如果操作未完成,加鎖、等待;獲得通知後,再次判斷操作是否完成。若完成,獲取返回值,並返回。
其中等待用的是 done.await(timeout, TimeUnit.MILLISECONDS);
done為private final Condition done;
因此調用的是Condition的await方法,會釋放鎖和資源,singl方法會將其喚醒。
get方法後面的returnFromResponse()方法可以看到超時處理,也就是之前那個超時掃描線程對status進行賦值後,returnFromResponse()方法裡面對超時掃描線程賦值的status值進行判斷是否超時,如果超時就拋出TimeoutException異常
response在哪裡被賦值、await在哪裡被通知?
在Netty讀取到網絡數據後,其中會調用到HeaderExchangeHandler中的方法,我們來看一眼就明白了。
<code>public class HeaderExchangeHandler implements ChannelHandlerDelegate { //處理返回信息 static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }}/<code>
如果response 不為空,並且不是心跳數據,就調用DefaultFuture.received,在這個方法裡面,主要就是根據返回信息的ID找到對應的Future,然後通知。
<code>public static void received(Channel channel, Response response) try { //根據返回信息中的ID找到對應的Future DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { //通知方法 future.doReceived(response); } else { logger.warn("......"); } } finally { //處理完成,刪除Future CHANNELS.remove(response.getId()); }}/<code>
future.doReceived(response);就很簡單了,它就回答了我們上面的那兩個小問題。賦值response和await通知。
<code>private void doReceived(Response res) { lock.lock(); try { //賦值response response = res; if (done != null) { //通知方法 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); }}/<code>
異步調用的話用到了RpcContext.
RpcContext是Dubbo中的一個上下文信息,它是一個 ThreadLocal 的臨時狀態記錄器
閱讀更多 java高端開發 的文章
關鍵字: DefaultFuture Response 源碼