Dubbo同步调用和超时源码

同步调用

同步调用是一种阻塞式的调用方式,即 Consumer 端代码一直阻塞等待,直到 Provider 端返回为止;

dubbo默认的协议是netty, Netty 是NIO 异步通讯机制,那么服务调用是怎么转化为同步的呢?

下面看源码:

省略一部分调用链,最终会来到这里 DubboInvoker


<code>protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation)invocation; ... try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, "timeout", 1000); //忽略返回值 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); //异步调用 } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); RpcContext.getContext().setFuture(new FutureAdapter<object>(future)); return new RpcResult(); //同步调用 } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } }}/<object>/<code>

接着我们看同步调用部分,(Result) currentClient.request(inv, timeout).get();

关于上面这句代码,它包含两个动作:先调用currentClient.request方法,通过Netty发送请求数据;然后调用其返回值的get方法,来获取返回值。

1、发送请求

这一步主要是将请求方法封装成Request对象,通过Netty将数据发送到服务端,然后返回一个DefaultFuture对象。 com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel的Request方法:

<code>public ResponseFuture request(Object request, int timeout) throws RemotingException { //如果客户端已断开连接 if (closed) { throw new RemotingException("......."); } //封装请求信息 Request req = new Request(); req.setVersion("2.0.0"); req.setTwoWay(true); req.setData(request); //构建DefaultFuture对象 DefaultFuture future = new DefaultFuture(channel, req, timeout); try { //通过Netty发送网络数据 channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future;}/<code>

如上代码,逻辑很清晰。关于看它的返回值是一个DefaultFuture对象,我们再看它的构造方法。

<code>public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter("timeout", 1000); //当前Future和请求信息的映射 FUTURES.put(id, this); //当前Channel和请求信息的映射 CHANNELS.put(id, channel);}/<code>

同时类加载的时候会启动一个超时扫描线程:

<code>static { Thread th = new Thread(new RemotingInvocationTimeoutScan(), "DubboResponseTimeoutScanTimer"); th.setDaemon(true); th.start();}/<code>

看看线程扫描啥:

<code>private static class RemotingInvocationTimeoutScan implements Runnable { @Override public void run() { while (true) { try { // 就是去扫描DefaultFuture列表 for (DefaultFuture future : FUTURES.values()) { if (future == null || future.isDone()) { continue; } // 如果future未完成,且超时 if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) { // 创建一个异常的Response Response timeoutResponse = new Response(future.getId()); // set timeout status. timeoutResponse.setStatus(future.isSent() ? Response.SERVER_TIMEOUT : Response.CLIENT_TIMEOUT); timeoutResponse.setErrorMessage(future.getTimeoutMessage(true)); // 处理异常 DefaultFuture.received(future.getChannel(), timeoutResponse); } } Thread.sleep(30); } catch (Throwable e) { logger.error("Exception when scan the timeout invocation of remoting.", e); } } } }/<code>

2、获取返回值

我们接着看get方法。

<code>public Object get(int timeout) throws RemotingException { //设置默认超时时间 if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } //判断 如果操作未完成 if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { //通过加锁、等待 while (!isDone()) { done.await(timeout, TimeUnit.MILLISECONDS); if (isDone() || System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } //返回数据 return returnFromResponse();} //获取返回值responseprivate Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } // 正常返回,返回 Result 对象 if (res.getStatus() == Response.OK) { return res.getResult(); } // 超时处理 if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { // 重新抛出异常 throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage());}/<code>

其中的isDone()方法为:

<code> public boolean isDone() { return this.response != null; }/<code>

我们总结下它的运行流程:

判断超时时间,小于0则设置默认值判断操作是否已完成,即response是否为空;如果已完成,获取返回值,并返回如果操作未完成,加锁、等待;获得通知后,再次判断操作是否完成。若完成,获取返回值,并返回。

其中等待用的是 done.await(timeout, TimeUnit.MILLISECONDS);

done为private final Condition done;

因此调用的是Condition的await方法,会释放锁和资源,singl方法会将其唤醒。

get方法后面的returnFromResponse()方法可以看到超时处理,也就是之前那个超时扫描线程对status进行赋值后,returnFromResponse()方法里面对超时扫描线程赋值的status值进行判断是否超时,如果超时就抛出TimeoutException异常

response在哪里被赋值、await在哪里被通知?

在Netty读取到网络数据后,其中会调用到HeaderExchangeHandler中的方法,我们来看一眼就明白了。

<code>public class HeaderExchangeHandler implements ChannelHandlerDelegate { //处理返回信息 static void handleResponse(Channel channel, Response response) throws RemotingException { if (response != null && !response.isHeartbeat()) { DefaultFuture.received(channel, response); } }}/<code>

如果response 不为空,并且不是心跳数据,就调用DefaultFuture.received,在这个方法里面,主要就是根据返回信息的ID找到对应的Future,然后通知。

<code>public static void received(Channel channel, Response response) try { //根据返回信息中的ID找到对应的Future DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { //通知方法 future.doReceived(response); } else { logger.warn("......"); } } finally { //处理完成,删除Future CHANNELS.remove(response.getId()); }}/<code>

future.doReceived(response);就很简单了,它就回答了我们上面的那两个小问题。赋值response和await通知。

<code>private void doReceived(Response res) { lock.lock(); try { //赋值response response = res; if (done != null) { //通知方法 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); }}/<code>

异步调用的话用到了RpcContext.

RpcContext是Dubbo中的一个上下文信息,它是一个 ThreadLocal 的临时状态记录器