Dubbo同步調用和超時源碼

同步調用

同步調用是一種阻塞式的調用方式,即 Consumer 端代碼一直阻塞等待,直到 Provider 端返回為止;

dubbo默認的協議是netty, Netty 是NIO 異步通訊機制,那麼服務調用是怎麼轉化為同步的呢?

下面看源碼:

省略一部分調用鏈,最終會來到這裡 DubboInvoker

Dubbo同步調用和超時源碼


<code>protected Result doInvoke(final Invocation invocation) throws Throwable {     RpcInvocation inv = (RpcInvocation)invocation;        ...        try {        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);        int timeout = getUrl().getMethodParameter(methodName, "timeout", 1000);                //忽略返回值        if (isOneway) {            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);            currentClient.send(inv, isSent);            RpcContext.getContext().setFuture(null);            return new RpcResult();        //異步調用        } else if (isAsync) {            ResponseFuture future = currentClient.request(inv, timeout);            RpcContext.getContext().setFuture(new FutureAdapter<object>(future));            return new RpcResult();        //同步調用        } else {            RpcContext.getContext().setFuture(null);            return (Result) currentClient.request(inv, timeout).get();        }    }}/<object>/<code>

接著我們看同步調用部分,(Result) currentClient.request(inv, timeout).get();

關於上面這句代碼,它包含兩個動作:先調用currentClient.request方法,通過Netty發送請求數據;然後調用其返回值的get方法,來獲取返回值。

1、發送請求

這一步主要是將請求方法封裝成Request對象,通過Netty將數據發送到服務端,然後返回一個DefaultFuture對象。 com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel的Request方法:

<code>public ResponseFuture request(Object request, int timeout) throws RemotingException {     //如果客戶端已斷開連接    if (closed) {        throw new RemotingException(".......");    }    //封裝請求信息    Request req = new Request();    req.setVersion("2.0.0");    req.setTwoWay(true);    req.setData(request);        //構建DefaultFuture對象    DefaultFuture future = new DefaultFuture(channel, req, timeout);    try {        //通過Netty發送網絡數據        channel.send(req);    } catch (RemotingException e) {        future.cancel();        throw e;    }    return future;}/<code>

如上代碼,邏輯很清晰。關於看它的返回值是一個DefaultFuture對象,我們再看它的構造方法。

<code>public DefaultFuture(Channel channel, Request request, int timeout) {    this.channel = channel;    this.request = request;    this.id = request.getId();    this.timeout = timeout > 0 ? timeout :                 channel.getUrl().getPositiveParameter("timeout", 1000);    //當前Future和請求信息的映射    FUTURES.put(id, this);    //當前Channel和請求信息的映射    CHANNELS.put(id, channel);}/<code>

同時類加載的時候會啟動一個超時掃描線程:

<code>static {    Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer");    th.setDaemon(true);    th.start();}/<code>

看看線程掃描啥:

<code>private static class RemotingInvocationTimeoutScan implements Runnable {         @Override        public void run() {            while (true) {                try {                    // 就是去掃描DefaultFuture列表                    for (DefaultFuture future : FUTURES.values()) {                        if (future == null || future.isDone()) {                            continue;                        }                        // 如果future未完成,且超時                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {                            // 創建一個異常的Response                            Response timeoutResponse = new Response(future.getId());                            // set timeout status.                            timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT);                            timeoutResponse.setErrorMessage(future.getTimeoutMessage(true));                            // 處理異常                            DefaultFuture.received(future.getChannel(), timeoutResponse);                        }                    }                    Thread.sleep(30);                } catch (Throwable e) {                    logger.error("Exception when scan the timeout invocation of remoting.", e);                }            }        }    }/<code>

2、獲取返回值

我們接著看get方法。

<code>public Object get(int timeout) throws RemotingException {    //設置默認超時時間    if (timeout <= 0) {        timeout = Constants.DEFAULT_TIMEOUT;    }    //判斷 如果操作未完成    if (!isDone()) {        long start = System.currentTimeMillis();        lock.lock();        try {            //通過加鎖、等待            while (!isDone()) {                done.await(timeout, TimeUnit.MILLISECONDS);                if (isDone() || System.currentTimeMillis() - start > timeout) {                    break;                }            }        } catch (InterruptedException e) {            throw new RuntimeException(e);        } finally {            lock.unlock();        }        if (!isDone()) {            throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));        }    }    //返回數據    return returnFromResponse();} //獲取返回值responseprivate Object returnFromResponse() throws RemotingException {    Response res = response;    if (res == null) {        throw new IllegalStateException("response cannot be null");    }    // 正常返回,返回 Result 對象    if (res.getStatus() == Response.OK) {        return res.getResult();    }    // 超時處理    if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {        // 重新拋出異常        throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());    }    throw new RemotingException(channel, res.getErrorMessage());}/<code>

其中的isDone()方法為:

<code>    public boolean isDone() {        return this.response != null;    }/<code>

我們總結下它的運行流程:

  • 判斷超時時間,小於0則設置默認值
  • 判斷操作是否已完成,即response是否為空;如果已完成,獲取返回值,並返回
  • 如果操作未完成,加鎖、等待;獲得通知後,再次判斷操作是否完成。若完成,獲取返回值,並返回。

其中等待用的是 done.await(timeout, TimeUnit.MILLISECONDS);

done為private final Condition done;

因此調用的是Condition的await方法,會釋放鎖和資源,singl方法會將其喚醒。

get方法後面的returnFromResponse()方法可以看到超時處理,也就是之前那個超時掃描線程對status進行賦值後,returnFromResponse()方法裡面對超時掃描線程賦值的status值進行判斷是否超時,如果超時就拋出TimeoutException異常

response在哪裡被賦值、await在哪裡被通知?

在Netty讀取到網絡數據後,其中會調用到HeaderExchangeHandler中的方法,我們來看一眼就明白了。

<code>public class HeaderExchangeHandler implements ChannelHandlerDelegate {        //處理返回信息    static void handleResponse(Channel channel, Response response) throws RemotingException {        if (response != null && !response.isHeartbeat()) {            DefaultFuture.received(channel, response);        }    }}/<code>

如果response 不為空,並且不是心跳數據,就調用DefaultFuture.received,在這個方法裡面,主要就是根據返回信息的ID找到對應的Future,然後通知。

<code>public static void received(Channel channel, Response response)         try {        //根據返回信息中的ID找到對應的Future        DefaultFuture future = FUTURES.remove(response.getId());        if (future != null) {            //通知方法            future.doReceived(response);        } else {            logger.warn("......");        }    } finally {        //處理完成,刪除Future        CHANNELS.remove(response.getId());    }}/<code>

future.doReceived(response);就很簡單了,它就回答了我們上面的那兩個小問題。賦值response和await通知。

<code>private void doReceived(Response res) {    lock.lock();    try {        //賦值response        response = res;        if (done != null) {            //通知方法            done.signal();        }    } finally {        lock.unlock();    }    if (callback != null) {        invokeCallback(callback);    }}/<code>

異步調用的話用到了RpcContext.

RpcContext是Dubbo中的一個上下文信息,它是一個 ThreadLocal 的臨時狀態記錄器


分享到:


相關文章: