聊聊skywalking的jvm-receiver-plugin

本文主要研究一下skywalking的jvm-receiver-plugin


聊聊skywalking的jvm-receiver-plugin


JVMModuleProvider

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/JVMModuleProvider.java

<code>public class JVMModuleProvider extends ModuleProvider {

  @Override public String name() {
      return "default";
  }

  @Override public Class extends ModuleDefine> module() {
      return JVMModule.class;
  }

  @Override public ModuleConfig createConfigBeanIfAbsent() {
      return null;
  }

  @Override public void prepare() {
  }

  @Override public void start() {
      GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
      grpcHandlerRegister.addHandler(new JVMMetricsServiceHandler(getManager()));
      grpcHandlerRegister.addHandler(new JVMMetricReportServiceHandler(getManager()));
  }

  @Override public void notifyAfterCompleted() {

  }

  @Override public String[] requiredModules() {
      return new String[] {CoreModule.NAME, SharingServerModule.NAME};
  }
}/<code>
  • JVMModuleProvider继承了ModuleProvider,其start方法获取grpcHandlerRegister然后添加JVMMetricsServiceHandler、JVMMetricReportServiceHandler

JVMMetricsService.proto

skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/language-agent/JVMMetricsService.proto

<code>syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.apm.network.language.agent";
option csharp_namespace = "SkyWalking.NetworkProtocol";

import "language-agent/Downstream.proto";
import "common/JVM.proto";

service JVMMetricsService {
  rpc collect (JVMMetrics) returns (Downstream) {
  }
}

message JVMMetrics {
  repeated JVMMetric metrics = 1;
  int32 applicationInstanceId = 2;
}/<code>
  • JVMMetricsService.proto定义了JVMMetricsService服务,它有一个collect方法接收JVMMetrics类型的参数

JVMMetricsServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMMetricsServiceHandler.java

<code>public class JVMMetricsServiceHandler extends JVMMetricsServiceGrpc.JVMMetricsServiceImplBase implements GRPCHandler {

  private static final Logger logger = LoggerFactory.getLogger(JVMMetricsServiceHandler.class);

  private final JVMSourceDispatcher jvmSourceDispatcher;

  public JVMMetricsServiceHandler(ModuleManager moduleManager) {
      this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager);
  }

  @Override public void collect(JVMMetrics request, StreamObserver<downstream> responseObserver) {
      int serviceInstanceId = request.getApplicationInstanceId();

      if (logger.isDebugEnabled()) {
          logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);
      }

      request.getMetricsList().forEach(metrics -> {
          long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
          jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);
      });

      responseObserver.onNext(Downstream.newBuilder().build());
      responseObserver.onCompleted();
  }

}/<downstream>/<code>
  • JVMMetricsServiceHandler继承了JVMMetricsServiceGrpc.JVMMetricsServiceImplBase,其构造器创建JVMSourceDispatcher;其collect方法遍历request.getMetricsList()挨个执行jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics)

JVMMetric.proto

skywalking-6.6.0/apm-protocol/apm-network/src/main/proto/language-agent-v2/JVMMetric.proto

<code>syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.apm.network.language.agent.v2";
option csharp_namespace = "SkyWalking.NetworkProtocol";

import "common/common.proto";
import "common/JVM.proto";

service JVMMetricReportService {
  rpc collect (JVMMetricCollection) returns (Commands) {
  }
}

message JVMMetricCollection {
  repeated JVMMetric metrics = 1;
  int32 serviceInstanceId = 2;
}/<code>
  • JVMMetric.proto定义了JVMMetricReportService服务,它有一个collect方法接收JVMMetricCollection类型的参数

JVMMetricReportServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMMetricReportServiceHandler.java

<code>public class JVMMetricReportServiceHandler extends JVMMetricReportServiceGrpc.JVMMetricReportServiceImplBase implements GRPCHandler {

  private static final Logger logger = LoggerFactory.getLogger(JVMMetricReportServiceHandler.class);

  private final JVMSourceDispatcher jvmSourceDispatcher;

  public JVMMetricReportServiceHandler(ModuleManager moduleManager) {
      this.jvmSourceDispatcher = new JVMSourceDispatcher(moduleManager);
  }

  @Override public void collect(JVMMetricCollection request, StreamObserver<commands> responseObserver) {
      int serviceInstanceId = request.getServiceInstanceId();

      if (logger.isDebugEnabled()) {
          logger.debug("receive the jvm metrics from service instance, id: {}", serviceInstanceId);
      }

      request.getMetricsList().forEach(metrics -> {
          long minuteTimeBucket = TimeBucket.getMinuteTimeBucket(metrics.getTime());
          jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics);
      });

      responseObserver.onNext(Commands.newBuilder().build());
      responseObserver.onCompleted();
  }

}/<commands>/<code>
  • JVMMetricReportServiceHandler继承了JVMMetricReportServiceGrpc.JVMMetricReportServiceImplBase,其构造器创建JVMSourceDispatcher;其collect方法遍历request.getMetricsList()挨个执行jvmSourceDispatcher.sendMetric(serviceInstanceId, minuteTimeBucket, metrics)

JVMSourceDispatcher

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-jvm-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/jvm/provider/handler/JVMSourceDispatcher.java

<code>public class JVMSourceDispatcher {
  private static final Logger logger = LoggerFactory.getLogger(JVMSourceDispatcher.class);
  private final SourceReceiver sourceReceiver;
  private final ServiceInstanceInventoryCache instanceInventoryCache;

  public JVMSourceDispatcher(ModuleManager moduleManager) {
      this.sourceReceiver = moduleManager.find(CoreModule.NAME).provider().getService(SourceReceiver.class);
      this.instanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
  }

  void sendMetric(int serviceInstanceId, long minuteTimeBucket, JVMMetric metrics) {
      ServiceInstanceInventory serviceInstanceInventory = instanceInventoryCache.get(serviceInstanceId);
      int serviceId;
      if (Objects.nonNull(serviceInstanceInventory)) {
          serviceId = serviceInstanceInventory.getServiceId();
      } else {
          logger.warn("Can't find service by service instance id from cache, service instance id is: {}", serviceInstanceId);
          return;
      }

      this.sendToCpuMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getCpu());
      this.sendToMemoryMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryList());
      this.sendToMemoryPoolMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getMemoryPoolList());
      this.sendToGCMetricProcess(serviceId, serviceInstanceId, minuteTimeBucket, metrics.getGcList());
  }

  private void sendToCpuMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, CPU cpu) {
      ServiceInstanceJVMCPU serviceInstanceJVMCPU = new ServiceInstanceJVMCPU();
      serviceInstanceJVMCPU.setId(serviceInstanceId);
      serviceInstanceJVMCPU.setName(Const.EMPTY_STRING);
      serviceInstanceJVMCPU.setServiceId(serviceId);
      serviceInstanceJVMCPU.setServiceName(Const.EMPTY_STRING);
      serviceInstanceJVMCPU.setUsePercent(cpu.getUsagePercent());
      serviceInstanceJVMCPU.setTimeBucket(timeBucket);
      sourceReceiver.receive(serviceInstanceJVMCPU);
  }

  private void sendToGCMetricProcess(int serviceId, int serviceInstanceId, long timeBucket, List gcs) {
      gcs.forEach(gc -> {
          ServiceInstanceJVMGC serviceInstanceJVMGC = new ServiceInstanceJVMGC();
          serviceInstanceJVMGC.setId(serviceInstanceId);
          serviceInstanceJVMGC.setName(Const.EMPTY_STRING);
          serviceInstanceJVMGC.setServiceId(serviceId);
          serviceInstanceJVMGC.setServiceName(Const.EMPTY_STRING);

          switch (gc.getPhrase()) {
              case NEW:
                  serviceInstanceJVMGC.setPhrase(GCPhrase.NEW);
                  break;
              case OLD:
                  serviceInstanceJVMGC.setPhrase(GCPhrase.OLD);
                  break;
          }

          serviceInstanceJVMGC.setTime(gc.getTime());
          serviceInstanceJVMGC.setCount(gc.getCount());
          serviceInstanceJVMGC.setTimeBucket(timeBucket);
          sourceReceiver.receive(serviceInstanceJVMGC);
      });
  }

  private void sendToMemoryMetricProcess(int serviceId, int serviceInstanceId, long timeBucket,
      List<memory> memories) {
      memories.forEach(memory -> {
          ServiceInstanceJVMMemory serviceInstanceJVMMemory = new ServiceInstanceJVMMemory();
          serviceInstanceJVMMemory.setId(serviceInstanceId);
          serviceInstanceJVMMemory.setName(Const.EMPTY_STRING);
          serviceInstanceJVMMemory.setServiceId(serviceId);
          serviceInstanceJVMMemory.setServiceName(Const.EMPTY_STRING);
          serviceInstanceJVMMemory.setHeapStatus(memory.getIsHeap());
          serviceInstanceJVMMemory.setInit(memory.getInit());
          serviceInstanceJVMMemory.setMax(memory.getMax());
          serviceInstanceJVMMemory.setUsed(memory.getUsed());
          serviceInstanceJVMMemory.setCommitted(memory.getCommitted());
          serviceInstanceJVMMemory.setTimeBucket(timeBucket);
          sourceReceiver.receive(serviceInstanceJVMMemory);
      });
  }

  private void sendToMemoryPoolMetricProcess(int serviceId, int serviceInstanceId, long timeBucket,
      List<memorypool> memoryPools) {

      memoryPools.forEach(memoryPool -> {
          ServiceInstanceJVMMemoryPool serviceInstanceJVMMemoryPool = new ServiceInstanceJVMMemoryPool();
          serviceInstanceJVMMemoryPool.setId(serviceInstanceId);
          serviceInstanceJVMMemoryPool.setName(Const.EMPTY_STRING);
          serviceInstanceJVMMemoryPool.setServiceId(serviceId);
          serviceInstanceJVMMemoryPool.setServiceName(Const.EMPTY_STRING);

          switch (memoryPool.getType()) {
              case NEWGEN_USAGE:
                  serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.NEWGEN_USAGE);
                  break;
              case OLDGEN_USAGE:
                  serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.OLDGEN_USAGE);
                  break;
              case PERMGEN_USAGE:
                  serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.PERMGEN_USAGE);
                  break;
              case SURVIVOR_USAGE:
                  serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.SURVIVOR_USAGE);
                  break;
              case METASPACE_USAGE:
                  serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.METASPACE_USAGE);
                  break;
              case CODE_CACHE_USAGE:
                  serviceInstanceJVMMemoryPool.setPoolType(MemoryPoolType.CODE_CACHE_USAGE);
                  break;
          }

          serviceInstanceJVMMemoryPool.setInit(memoryPool.getInit());
          serviceInstanceJVMMemoryPool.setMax(memoryPool.getMax());
          serviceInstanceJVMMemoryPool.setUsed(memoryPool.getUsed());
          serviceInstanceJVMMemoryPool.setCommitted(memoryPool.getCommited());
          serviceInstanceJVMMemoryPool.setTimeBucket(timeBucket);
          sourceReceiver.receive(serviceInstanceJVMMemoryPool);
      });
  }
}/<memorypool>/<memory>
/<code>
  • JVMSourceDispatcher主要是提供了sendMetric方法,该方法执行sendToCpuMetricProcess、sendToMemoryMetricProcess、sendToMemoryPoolMetricProcess、sendToGCMetricProcess方法;sendToCpuMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMCPU);sendToMemoryMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMMemory);sendToMemoryPoolMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMMemoryPool);sendToGCMetricProcess方法执行sourceReceiver.receive(serviceInstanceJVMGC)

小结

JVMModuleProvider继承了ModuleProvider,其start方法获取grpcHandlerRegister然后添加JVMMetricsServiceHandler、JVMMetricReportServiceHandler;前者使用的是JVMMetricsService.proto,后者使用的是agent-v2的JVMMetric.proto

doc

  • JVMModuleProvider
  • JVMMetricsServiceHandler
  • JVMMetricReportServiceHandler


分享到:


相關文章: