聊聊skywalking的kafka-plugin

本文主要研究一下skywalking的kafka-plugin


聊聊skywalking的kafka-plugin


skywalking-plugin.def

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/resources/skywalking-plugin.def

<code>kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.CallbackInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaConsumerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaProducerMapInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateInstrumentation
kafka-0.11.x/1.x/2.x=org.apache.skywalking.apm.plugin.kafka.define.KafkaTemplateCallbackInstrumentation/<code>
  • skywalking的kafka-plugin提供了CallbackInstrumentation、KafkaConsumerInstrumentation、KafkaProducerInstrumentation、KafkaProducerMapInstrumentation、KafkaTemplateInstrumentation、KafkaTemplateCallbackInstrumentation这几个增强

AbstractKafkaInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaInstrumentation.java

<code>public abstract class AbstractKafkaInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

  @Override protected String[] witnessClasses() {
      return new String[]{"org.apache.kafka.clients.ApiVersions"};
  }
}/<code>
  • AbstractKafkaInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其witnessClasses为org.apache.kafka.clients.ApiVersions

AbstractKafkaTemplateInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/AbstractKafkaTemplateInstrumentation.java

<code>public abstract class AbstractKafkaTemplateInstrumentation extends ClassInstanceMethodsEnhancePluginDefine {

  @Override protected String[] witnessClasses() {
      return new String[]{"org.springframework.kafka.core.KafkaTemplate"};
  }
}/<code>
  • AbstractKafkaTemplateInstrumentation继承了ClassInstanceMethodsEnhancePluginDefine,其witnessClasses为org.springframework.kafka.core.KafkaTemplate

CallbackInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/CallbackInstrumentation.java

<code>public class CallbackInstrumentation extends AbstractKafkaInstrumentation {

  public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.Callback";
  public static final String ENHANCE_METHOD = "onCompletion";
  public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor";

  @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return new ConstructorInterceptPoint[0];
  }

  @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[] {
          new InstanceMethodsInterceptPoint() {
              @Override public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named(ENHANCE_METHOD);
              }

              @Override public String getMethodsInterceptor() {
                  return INTERCEPTOR_CLASS;
              }

              @Override public boolean isOverrideArgs() {
                  return false;
              }
          }
      };
  }

  @Override protected ClassMatch enhanceClass() {
      return byHierarchyMatch(new String[] {ENHANCE_CLASS});
  }
}/<methoddescription>/<code>
  • CallbackInstrumentation继承了AbstractKafkaInstrumentation,它使用org.apache.skywalking.apm.plugin.kafka.CallbackInterceptor增强实现了org.apache.kafka.clients.producer.Callback接口的类的onCompletion方法

CallbackInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackInterceptor.java

<code>public class CallbackInterceptor implements InstanceMethodsAroundInterceptor {

  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
                            MethodInterceptResult result) throws Throwable {
      CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
      if (null != cache) {
          ContextSnapshot snapshot = getSnapshot(cache);
          RecordMetadata metadata = (RecordMetadata) allArguments[0];
          AbstractSpan activeSpan = ContextManager.createLocalSpan("Kafka/Producer/Callback");
          activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);
          Tags.MQ_TOPIC.set(activeSpan, metadata.topic());
          ContextManager.continued(snapshot);
      }
  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
                            Object ret) throws Throwable {
      CallbackCache cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
      if (null != cache) {
          ContextSnapshot snapshot = getSnapshot(cache);
          if (null != snapshot) {
              Exception exceptions = (Exception) allArguments[1];
              if (exceptions != null) {
                  ContextManager.activeSpan().errorOccurred().log(exceptions);
              }
              ContextManager.stopSpan();
          }
      }
      return ret;
  }

  @Override
  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                    Class>[] argumentsTypes, Throwable t) {
      ContextManager.activeSpan().errorOccurred().log(t);
  }

  private ContextSnapshot getSnapshot(CallbackCache cache) {
      ContextSnapshot snapshot = cache.getSnapshot();
      if (snapshot == null) {
          snapshot = ((CallbackCache) ((EnhancedInstance) cache.getCallback()).getSkyWalkingDynamicField()).getSnapshot();
      }
      return snapshot;
  }
}/<code>
  • CallbackInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法设置Tags.MQ_TOPIC;其afterMethod方法有异常时执行ContextManager.activeSpan().errorOccurred().log(exceptions),然后执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

KafkaConsumerInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaConsumerInstrumentation.java

<code>public class KafkaConsumerInstrumentation extends AbstractKafkaInstrumentation {

  public static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerConfig";
  public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ConsumerConstructorInterceptor";
  public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor";
  public static final String ENHANCE_METHOD = "pollOnce";
  public static final String ENHANCE_COMPATIBLE_METHOD = "pollForFetches";
  public static final String ENHANCE_CLASS = "org.apache.kafka.clients.consumer.KafkaConsumer";
  public static final String SUBSCRIBE_METHOD = "subscribe";
  public static final String SUBSCRIBE_INTERCEPT_TYPE = "org.apache.kafka.clients.consumer.ConsumerRebalanceListener";
  public static final String SUBSCRIBE_INTERCEPT_CLASS = "org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor";

  @Override public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return new ConstructorInterceptPoint[] {
          new ConstructorInterceptPoint() {
              @Override public ElementMatcher<methoddescription> getConstructorMatcher() {
                  return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
              }

              @Override public String getConstructorInterceptor() {
                  return CONSTRUCTOR_INTERCEPTOR_CLASS;
              }
          }
      };
  }

  @Override public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[] {
          new InstanceMethodsInterceptPoint() {
              @Override public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named(ENHANCE_METHOD).or(named(ENHANCE_COMPATIBLE_METHOD));
              }

              @Override public String getMethodsInterceptor() {
                  return INTERCEPTOR_CLASS;
              }

              @Override public boolean isOverrideArgs() {
                  return false;
              }
          },
          new InstanceMethodsInterceptPoint() {
              @Override public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named(SUBSCRIBE_METHOD).and(takesArgumentWithType(1, SUBSCRIBE_INTERCEPT_TYPE));

              }

              @Override public String getMethodsInterceptor() {
                  return SUBSCRIBE_INTERCEPT_CLASS;
              }

              @Override public boolean isOverrideArgs() {
                  return false;
              }
          }
      };
  }

  @Override protected ClassMatch enhanceClass() {
      return byName(ENHANCE_CLASS);
  }
}/<methoddescription>/<methoddescription>/<methoddescription>/<code>
  • KafkaConsumerInstrumentation继承了AbstractKafkaInstrumentation,它增强的是org.apache.kafka.clients.consumer.KafkaConsumer类;它使用org.apache.skywalking.apm.plugin.kafka.ConsumerConstructorInterceptor增强其参数为org.apache.kafka.clients.consumer.ConsumerConfig的构造器;它使用org.apache.skywalking.apm.plugin.kafka.KafkaConsumerInterceptor增强pollOnce、pollForFetches方法;它使用org.apache.skywalking.apm.plugin.kafka.SubscribeMethodInterceptor增强第二个参数为org.apache.kafka.clients.consumer.ConsumerRebalanceListener的subscribe方法

ConsumerConstructorInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ConsumerConstructorInterceptor.java

<code>public class ConsumerConstructorInterceptor implements InstanceConstructorInterceptor {

  @Override public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
      ConsumerConfig config = (ConsumerConfig)allArguments[0];
      // set the bootstrap server address
      ConsumerEnhanceRequiredInfo requiredInfo = new ConsumerEnhanceRequiredInfo();
      requiredInfo.setBrokerServers(config.getList("bootstrap.servers"));
      requiredInfo.setGroupId(config.getString("group.id"));
      objInst.setSkyWalkingDynamicField(requiredInfo);
  }
}/<code>
  • ConsumerConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法创建ConsumerEnhanceRequiredInfo并设置到objInst的skyWalkingDynamicField

KafkaConsumerInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaConsumerInterceptor.java

<code>public class KafkaConsumerInterceptor implements InstanceMethodsAroundInterceptor {

  public static final String OPERATE_NAME_PREFIX = "Kafka/";
  public static final String CONSUMER_OPERATE_NAME = "/Consumer/";

  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
      MethodInterceptResult result) throws Throwable {
      ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
      requiredInfo.setStartTime(System.currentTimeMillis());
  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
      Object ret) throws Throwable {
      Map<topicpartition>>> records = (Map<topicpartition>>>)ret;
      //
      // The entry span will only be created when the consumer received at least one message.
      //
      if (records.size() > 0) {
          ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
          AbstractSpan activeSpan = ContextManager.createEntrySpan(OPERATE_NAME_PREFIX + requiredInfo.getTopics() + CONSUMER_OPERATE_NAME + requiredInfo.getGroupId(), null).start(requiredInfo.getStartTime());

          activeSpan.setComponent(ComponentsDefine.KAFKA_CONSUMER);
          SpanLayer.asMQ(activeSpan);
          Tags.MQ_BROKER.set(activeSpan, requiredInfo.getBrokerServers());
          Tags.MQ_TOPIC.set(activeSpan, requiredInfo.getTopics());

          for (List<consumerrecord>> consumerRecords : records.values()) {
              for (ConsumerRecord, ?> record : consumerRecords) {
                  ContextCarrier contextCarrier = new ContextCarrier();

                  CarrierItem next = contextCarrier.items();
                  while (next.hasNext()) {
                      next = next.next();
                      Iterator<header> iterator = record.headers().headers(next.getHeadKey()).iterator();
                      if (iterator.hasNext()) {
                          next.setHeadValue(new String(iterator.next().value()));
                      }
                  }
                  ContextManager.extract(contextCarrier);
              }
          }
          ContextManager.stopSpan();
      }
      return ret;
  }

  @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
      Class>[] argumentsTypes, Throwable t) {
      ContextManager.activeSpan().errorOccurred().log(t);
  }
}/<header>/<consumerrecord>/<topicpartition>/<topicpartition>/<code>
  • KafkaConsumerInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法获取ConsumerEnhanceRequiredInfo并设置其startTime;其afterMethod方法在records.size()大于0的时候设置Tags.MQ_BROKER、Tags.MQ_TOPIC,然后从ConsumerRecord提取contextCarrier.items()指定的header,然后执行ContextManager.extract(contextCarrier),最后执行ContextManager.stopSpan();其handleMethodException方法执行ContextManager.activeSpan().errorOccurred().log(t)

SubscribeMethodInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/SubscribeMethodInterceptor.java

<code>public class SubscribeMethodInterceptor implements InstanceMethodsAroundInterceptor {
  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
      MethodInterceptResult result) throws Throwable {
      ConsumerEnhanceRequiredInfo requiredInfo = (ConsumerEnhanceRequiredInfo)objInst.getSkyWalkingDynamicField();
      requiredInfo.setTopics((Collection<string>)allArguments[0]);

      objInst.setSkyWalkingDynamicField(requiredInfo);
  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
      Object ret) throws Throwable {
      return ret;
  }

  @Override public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
      Class>[] argumentsTypes, Throwable t) {
      ContextManager.activeSpan().errorOccurred().log(t);
  }
}/<string>/<code>
  • SubscribeMethodInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法创建ConsumerEnhanceRequiredInfo并设置到objInst的skyWalkingDynamicField;其handleMethodException执行ContextManager.activeSpan().errorOccurred().log(t)

KafkaProducerInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerInstrumentation.java

<code>public class KafkaProducerInstrumentation extends AbstractKafkaInstrumentation {

  public static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaProducerInterceptor";
  public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer";
  public static final String ENHANCE_METHOD = "doSend";
  public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ProducerConstructorInterceptor";
  public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "org.apache.kafka.clients.producer.ProducerConfig";

  @Override
  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return new ConstructorInterceptPoint[]{
          new ConstructorInterceptPoint() {
              @Override
              public ElementMatcher<methoddescription> getConstructorMatcher() {
                  return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
              }

              @Override
              public String getConstructorInterceptor() {
                  return CONSTRUCTOR_INTERCEPTOR_CLASS;
              }
          }
      };
  }

  @Override
  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[]{
          new InstanceMethodsInterceptPoint() {
              @Override
              public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named(ENHANCE_METHOD);
              }

              @Override
              public String getMethodsInterceptor() {
                  return INTERCEPTOR_CLASS;
              }

              @Override
              public boolean isOverrideArgs() {
                  return false;
              }
          }
      };
  }

  @Override
  protected ClassMatch enhanceClass() {

      return byName(ENHANCE_CLASS);
  }
}/<methoddescription>/<methoddescription>/<code>
  • KafkaProducerInstrumentation继承了AbstractKafkaInstrumentation,它增强的是org.apache.kafka.clients.producer.KafkaProducer类;它使用org.apache.skywalking.apm.plugin.kafka.ProducerConstructorInterceptor增强了其参数为org.apache.kafka.clients.producer.ProducerConfig的构造器;它使用org.apache.skywalking.apm.plugin.kafka.KafkaProducerInterceptor增强其doSend方法

ProducerConstructorInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorInterceptor.java

<code>public class ProducerConstructorInterceptor implements InstanceConstructorInterceptor {

  @Override
  public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
      ProducerConfig config = (ProducerConfig) allArguments[0];
      objInst.setSkyWalkingDynamicField(StringUtil.join(';', config.getList("bootstrap.servers").toArray(new String[0])));
  }
}/<code>
  • ProducerConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct将bootstrap.servers信息设置到objInst的skyWalkingDynamicField

KafkaProducerInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaProducerInterceptor.java

<code>public class KafkaProducerInterceptor implements InstanceMethodsAroundInterceptor {

  public static final String OPERATE_NAME_PREFIX = "Kafka/";
  public static final String PRODUCER_OPERATE_NAME_SUFFIX = "/Producer";

  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
                            MethodInterceptResult result) throws Throwable {

      ContextCarrier contextCarrier = new ContextCarrier();

      ProducerRecord record = (ProducerRecord) allArguments[0];
      String topicName = record.topic();
      AbstractSpan activeSpan = ContextManager.createExitSpan(OPERATE_NAME_PREFIX + topicName + PRODUCER_OPERATE_NAME_SUFFIX, contextCarrier, (String) objInst.getSkyWalkingDynamicField());

      Tags.MQ_BROKER.set(activeSpan, (String) objInst.getSkyWalkingDynamicField());
      Tags.MQ_TOPIC.set(activeSpan, topicName);

      SpanLayer.asMQ(activeSpan);
      activeSpan.setComponent(ComponentsDefine.KAFKA_PRODUCER);

      CarrierItem next = contextCarrier.items();
      while (next.hasNext()) {
          next = next.next();
          record.headers().add(next.getHeadKey(), next.getHeadValue().getBytes());
      }
      EnhancedInstance callbackInstance = (EnhancedInstance) allArguments[1];
      if (null != callbackInstance) {
          ContextSnapshot snapshot = ContextManager.capture();
          if (null != snapshot) {
              CallbackCache cache = new CallbackCache();
              cache.setSnapshot(snapshot);
              callbackInstance.setSkyWalkingDynamicField(cache);
          }
      }
  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes,
                            Object ret) throws Throwable {
      ContextManager.stopSpan();
      return ret;
  }

  @Override
  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments,
                                    Class>[] argumentsTypes, Throwable t) {

  }
}/<code>
  • KafkaProducerInterceptor实现了InstanceMethodsAroundInterceptor接口,其beforeMethod方法设置Tags.MQ_BROKER、Tags.MQ_TOPIC,然后将contextCarrier.items()的信息设置到ProducerRecord的header中,另外若ContextSnapshot不为null则执行callbackInstance.setSkyWalkingDynamicField(cache);其afterMethod方法执行ContextManager.stopSpan()

KafkaProducerMapInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaProducerMapInstrumentation.java

<code>public class KafkaProducerMapInstrumentation extends AbstractKafkaInstrumentation {

  public static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer";
  public static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.ProducerConstructorMapInterceptor";
  public static final String CONSTRUCTOR_INTERCEPTOR_FLAG = "java.util.Map";

  @Override
  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return new ConstructorInterceptPoint[]{
          new ConstructorInterceptPoint() {
              @Override
              public ElementMatcher<methoddescription> getConstructorMatcher() {
                  return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPTOR_FLAG);
              }

              @Override
              public String getConstructorInterceptor() {
                  return CONSTRUCTOR_INTERCEPTOR_CLASS;
              }
          }
      };
  }

  @Override
  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[0];
  }

  @Override
  protected ClassMatch enhanceClass() {
      return byName(ENHANCE_CLASS);
  }
}/<methoddescription>/<code>
  • KafkaProducerMapInstrumentation继承了AbstractKafkaInstrumentation,它使用org.apache.skywalking.apm.plugin.kafka.ProducerConstructorMapInterceptor增强了org.apache.kafka.clients.producer.KafkaProducer参数为java.util.Map的构造器

ProducerConstructorMapInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/ProducerConstructorMapInterceptor.java

<code>public class ProducerConstructorMapInterceptor implements InstanceConstructorInterceptor {

  @Override
  public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
      Map<string> config = (Map<string>) allArguments[0];
      // prevent errors caused by secondary interception in kafkaTemplate
      if (objInst.getSkyWalkingDynamicField() == null) {
          objInst.setSkyWalkingDynamicField(StringUtil.join(';', ((String) config.get("bootstrap.servers")).split(",")));
      }
  }
}/<string>/<string>/<code>
  • ProducerConstructorMapInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法将bootstrap.servers信息设置到objInst的skyWalkingDynamicField

KafkaTemplateInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateInstrumentation.java

<code>public class KafkaTemplateInstrumentation extends AbstractKafkaTemplateInstrumentation {

  private static final String ENHANCE_CLASS = "org.springframework.kafka.core.KafkaTemplate";
  private static final String ENHANCE_METHOD = "buildCallback";
  private static final String INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.KafkaTemplateCallbackInterceptor";


  @Override
  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return new ConstructorInterceptPoint[0];
  }

  @Override
  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[]{
          new InstanceMethodsInterceptPoint() {
              @Override
              public ElementMatcher<methoddescription> getMethodsMatcher() {
                  return named(ENHANCE_METHOD);
              }

              @Override
              public String getMethodsInterceptor() {
                  return INTERCEPTOR_CLASS;
              }

              @Override
              public boolean isOverrideArgs() {
                  return false;
              }
          }
      };
  }

  @Override
  protected ClassMatch enhanceClass() {
      return byName(ENHANCE_CLASS);
  }
}/<methoddescription>/<code>
  • KafkaTemplateInstrumentation继承了AbstractKafkaTemplateInstrumentation;它使用org.apache.skywalking.apm.plugin.kafka.KafkaTemplateCallbackInterceptor增强了org.springframework.kafka.core.KafkaTemplate的buildCallback方法

KafkaTemplateCallbackInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/KafkaTemplateCallbackInterceptor.java

<code>public class KafkaTemplateCallbackInterceptor implements InstanceMethodsAroundInterceptor {
  @Override
  public void beforeMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, MethodInterceptResult result) throws Throwable {

  }

  @Override
  public Object afterMethod(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Object ret) throws Throwable {
      return new CallbackAdapter((org.apache.kafka.clients.producer.Callback) ret, objInst);
  }

  @Override
  public void handleMethodException(EnhancedInstance objInst, Method method, Object[] allArguments, Class>[] argumentsTypes, Throwable t) {

  }
}/<code>
  • KafkaTemplateCallbackInterceptor实现了InstanceMethodsAroundInterceptor接口,其afterMethod方法将org.apache.kafka.clients.producer.Callback包装为CallbackAdapter返回

KafkaTemplateCallbackInstrumentation

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/define/KafkaTemplateCallbackInstrumentation.java

<code>public class KafkaTemplateCallbackInstrumentation extends AbstractKafkaTemplateInstrumentation {

  private static final String ENHANCE_CLASS = "org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback";
  private static final String CONSTRUCTOR_INTERCEPT_TYPE = "org.apache.kafka.clients.producer.Callback";
  private static final String CONSTRUCTOR_INTERCEPTOR_CLASS = "org.apache.skywalking.apm.plugin.kafka.CallbackConstructorInterceptor";

  @Override
  public ConstructorInterceptPoint[] getConstructorsInterceptPoints() {
      return new ConstructorInterceptPoint[]{
          new ConstructorInterceptPoint() {
              @Override
              public ElementMatcher<methoddescription> getConstructorMatcher() {
                  return takesArgumentWithType(0, CONSTRUCTOR_INTERCEPT_TYPE);
              }

              @Override
              public String getConstructorInterceptor() {
                  return CONSTRUCTOR_INTERCEPTOR_CLASS;

              }
          }
      };
  }

  @Override
  public InstanceMethodsInterceptPoint[] getInstanceMethodsInterceptPoints() {
      return new InstanceMethodsInterceptPoint[0];
  }

  @Override
  protected ClassMatch enhanceClass() {
      return byName(ENHANCE_CLASS);
  }
}/<methoddescription>/<code>
  • KafkaTemplateCallbackInstrumentation继承了AbstractKafkaTemplateInstrumentation;它使用org.apache.skywalking.apm.plugin.kafka.CallbackConstructorInterceptor增强了org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback的参数为org.apache.kafka.clients.producer.Callback的构造器

CallbackConstructorInterceptor

skywalking-6.6.0/apm-sniffer/apm-sdk-plugin/kafka-plugin/src/main/java/org/apache/skywalking/apm/plugin/kafka/CallbackConstructorInterceptor.java

<code>public class CallbackConstructorInterceptor implements InstanceConstructorInterceptor {

  @Override
  public void onConstruct(EnhancedInstance objInst, Object[] allArguments) {
      Callback callback = (Callback) allArguments[0];
      CallbackCache cache;
      if (null != objInst.getSkyWalkingDynamicField()) {
          cache = (CallbackCache) objInst.getSkyWalkingDynamicField();
      } else {
          cache = new CallbackCache();
      }
      cache.setCallback(callback);
      objInst.setSkyWalkingDynamicField(cache);
  }
}/<code>
  • CallbackConstructorInterceptor实现了InstanceConstructorInterceptor接口,其onConstruct方法将CallbackCache设置到objInst的skyWalkingDynamicField

小结

skywalking的kafka-plugin提供了CallbackInstrumentation、KafkaConsumerInstrumentation、KafkaProducerInstrumentation、KafkaProducerMapInstrumentation、KafkaTemplateInstrumentation、KafkaTemplateCallbackInstrumentation这几个增强

doc

  • AbstractKafkaInstrumentation
  • AbstractKafkaTemplateInstrumentation
  • CallbackInstrumentation
  • KafkaConsumerInstrumentation
  • KafkaProducerInstrumentation
  • KafkaProducerMapInstrumentation
  • KafkaTemplateInstrumentation
  • KafkaTemplateCallbackInstrumentation


分享到:


相關文章: