聊聊skywalking的dubbo-2.7.x-plugin

本文主要研究一下skywalking的dubbo-2.7.x-plugin


聊聊skywalking的dubbo-2.7.x-plugin


skywalking-plugin.def

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/dubbo-2.7.x-plugin/src/main/resources/skywalking-plugin.def

<code>dubbo=org.apache.skywalking.apm.plugin.asf.dubbo.DubboInstrumentation/<code>
  • skywalking的dubbo-plugin提供了DubboInstrumentation增强

DubboInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/dubbo-2.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/asf/dubbo/DubboInstrumentation.java

<code>public class DubboInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

  private static final String ENHANCE_CLASS = "org.apache.dubbo.monitor.support.MonitorFilter";

  private static final String INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.asf.dubbo.DubboInterceptor";

  @Override
  protected ClassMatch enhanceClass() {
      return NameMatch.byName(ENHANCE_CLASS);
  }

  @Override
  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return null;
  }

  @Override
  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[] {
          new InstanceMethodsInterceptPoint() {
              @Override
              public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named("invoke");
              }

              @Override
              public String getMethodsInterceptor() {
                  return INTERCEPT_CLASS;
              }

              @Override
              public boolean isOverrideArgs() {
                  return false;
              }
          }
      };
  }
}/<methoddescription>/<code>
  • DubboInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.asf.dubbo.DubboInterceptor来增强org.apache.dubbo.monitor.support.MonitorFilter的invoke方法

DubboInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/dubbo-2.7.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/asf/dubbo/DubboInterceptor.java

<code>public class DubboInterceptor implements InstanceMethodsAroundInterceptor {
  /**
    *

Consumer:

The serialized trace context data will
    * inject to the {@link RpcContext#attachments} for transport to provider side.
    *


    *

Provider:

The serialized trace context data will extract from
    * {@link RpcContext#attachments}. current trace segment will ref if the serialize context data is not null.
    */
  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
      Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {
      Invoker invoker = (Invoker)allArguments[0];
      Invocation invocation = (Invocation)allArguments[1];
      RpcContext rpcContext = RpcContext.getContext();
      boolean isConsumer = rpcContext.isConsumerSide();
      URL requestURL = invoker.getUrl();

      AbstractSpan span;

      final String host = requestURL.getHost();
      final int port = requestURL.getPort();
      if (isConsumer) {
          final ContextCarrier contextCarrier = new ContextCarrier();
          span = ContextManager.createExitSpan(generateOperationName(requestURL, invocation), contextCarrier, host + ":" + port);
          //invocation.getAttachments().put("contextData", contextDataStr);
          //@see https://github.com/alibaba/dubbo/blob/dubbo-2.5.3/dubbo-rpc/dubbo-rpc-api/src/main/java/com/alibaba/dubbo/rpc/RpcInvocation.java#L154-L161
          CarrierItem next = contextCarrier.items();
          while (next.hasNext()) {
              next = next.next();
              rpcContext.getAttachments().put(next.getHeadKey(), next.getHeadValue());
          }
      } else {
          ContextCarrier contextCarrier = new ContextCarrier();
          CarrierItem next = contextCarrier.items();
          while (next.hasNext()) {
              next = next.next();
              next.setHeadValue(rpcContext.getAttachment(next.getHeadKey()));
          }

          span = ContextManager.createEntrySpan(generateOperationName(requestURL, invocation), contextCarrier);
      }

      Tags.URL.set(span, generateRequestURL(requestURL, invocation));
      span.setComponent(ComponentsDefine.DUBBO);
      SpanLayer.asRPCFramework(span);
  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments,
      Class>[] argumentsTypes, Object ret) throws Throwable {
      Result result = (Result)ret;
      if (result != null && result.getException() != null) {
          dealException(result.getException());
      }

      ContextManager.stopSpan();
      return ret;
  }

  @Override
  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
      Class>[] argumentsTypes, Throwable t) {
      dealException(t);
  }

  /**
    * Log the throwable, which occurs in Dubbo RPC service.
    */
  private void dealException(Throwable throwable) {
      AbstractSpan span = ContextManager.activeSpan();
      span.errorOccurred();
      span.log(throwable);
  }

  /**
    * Format operation name. e.g. org.apache.skywalking.apm.plugin.test.Test.test(String)
    *
    * @return operation name.
    */
  private String generateOperationName(URL requestURL, Invocation invocation) {
      StringBuilder operationName = new StringBuilder();
      operationName.append(requestURL.getPath());
      operationName.append("." + invocation.getMethodName() + "(");
      for (Class> classes : invocation.getParameterTypes()) {
          operationName.append(classes.getSimpleName() + ",");
      }

      if (invocation.getParameterTypes().length > 0) {
          operationName.delete(operationName.length() - 1, operationName.length());
      }

      operationName.append(")");

      return operationName.toString();
  }

  /**
    * Format request url.
    * e.g. dubbo://127.0.0.1:20880/org.apache.skywalking.apm.plugin.test.Test.test(String).
    *
    * @return request url.
    */
  private String generateRequestURL(URL url, Invocation invocation) {
      StringBuilder requestURL = new StringBuilder();
      requestURL.append(url.getProtocol() + "://");
      requestURL.append(url.getHost());
      requestURL.append(":" + url.getPort() + "/");
      requestURL.append(generateOperationName(url, invocation));
      return requestURL.toString();
  }
}/<code>
  • DubboInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法针对consumer端会将contextCarrier.items()通过rpcContext的attachment来传递,针对provider端会从rpcContext的attachment取出contextCarrier.items()的值,最后设置url的tag,执行SpanLayer.asRPCFramework(span);其afterMethod方法会执行ContextManager.stopSpan(),并在result.getException()不为null时执行dealException;handleMethodException方法执行dealException;dealException方法主要是执行span.errorOccurred()及span.log(throwable)

小结

DubboInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,它使用org.apache.skywalking.apm.plugin.asf.dubbo.DubboInterceptor来增强org.apache.dubbo.monitor.support.MonitorFilter的invoke方法

doc

  • DubboInstrumentation
  • DubboInterceptor


分享到:


相關文章: