聊聊skywalking的metric-exporter

本文主要研究一下skywalking的metric-exporter


聊聊skywalking的metric-exporter


metric-exporter.proto

skywalking-6.6.0/oap-server/exporter/src/main/proto/metric-exporter.proto

<code>syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.oap.server.exporter.grpc";


service MetricExportService {
  rpc export (stream ExportMetricValue) returns (ExportResponse) {
  }

  rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
  }
}

message ExportMetricValue {
  string metricName = 1;
  string entityName = 2;
  string entityId = 3;
  ValueType type = 4;
  int64 timeBucket = 5;
  int64 longValue = 6;
  double doubleValue = 7;
}

message SubscriptionsResp {
  repeated string metricNames = 1;
}

enum ValueType {
  LONG = 0;
  DOUBLE = 1;
}

message SubscriptionReq {

}

message ExportResponse {
}/<code>
  • metric-exporter.proto定義了MetricExportService服務,它有export、subscription兩個rpc方法

GRPCExporterSetting

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java

<code>@Setter
@Getter
public class GRPCExporterSetting extends ModuleConfig {
  private String targetHost;
  private int targetPort;
  private int bufferChannelSize = 20000;
  private int bufferChannelNum = 2;
}/<code>
  • GRPCExporterSetting定義了targetHost、targetPort、bufferChannelSize、bufferChannelNum屬性

GRPCExporterProvider

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java

<code>public class GRPCExporterProvider extends ModuleProvider {
  private GRPCExporterSetting setting;
  private GRPCExporter exporter;

  @Override public String name() {
      return "grpc";
  }

  @Override public Class extends ModuleDefine> module() {
      return ExporterModule.class;
  }

  @Override public ModuleConfig createConfigBeanIfAbsent() {
      setting = new GRPCExporterSetting();
      return setting;
  }

  @Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
      exporter = new GRPCExporter(setting);
      this.registerServiceImplementation(MetricValuesExportService.class, exporter);
  }

  @Override public void start() throws ServiceNotProvidedException, ModuleStartException {

  }

  @Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
      ModuleServiceHolder serviceHolder = getManager().find(CoreModule.NAME).provider();
      exporter.setServiceInventoryCache(serviceHolder.getService(ServiceInventoryCache.class));
      exporter.setServiceInstanceInventoryCache(serviceHolder.getService(ServiceInstanceInventoryCache.class));
      exporter.setEndpointInventoryCache(serviceHolder.getService(EndpointInventoryCache.class));

      exporter.initSubscriptionList();
  }

  @Override public String[] requiredModules() {
      return new String[] {CoreModule.NAME};
  }
}/<code>
  • GRPCExporterProvider繼承了ModuleProvider,其prepare方法創建GRPCExporter,然後執行registerServiceImplementation;其notifyAfterCompleted方法主要是給exporter設置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然後執行exporter.initSubscriptionList()

MetricFormatter

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java

<code>@Setter
public class MetricFormatter {
  private ServiceInventoryCache serviceInventoryCache;
  private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
  private EndpointInventoryCache endpointInventoryCache;

  protected String getEntityName(MetricsMetaInfo meta) {
      int scope = meta.getScope();
      if (DefaultScopeDefine.inServiceCatalog(scope)) {
          int entityId = Integer.valueOf(meta.getId());
          return serviceInventoryCache.get(entityId).getName();
      } else if (DefaultScopeDefine.inServiceInstanceCatalog(scope)) {
          int entityId = Integer.valueOf(meta.getId());
          return serviceInstanceInventoryCache.get(entityId).getName();
      } else if (DefaultScopeDefine.inEndpointCatalog(scope)) {
          int entityId = Integer.valueOf(meta.getId());
          return endpointInventoryCache.get(entityId).getName();
      } else if (scope == DefaultScopeDefine.ALL) {
          return "";
      } else {
          return null;
      }
  }
}/<code>
  • MetricFormatter提供了getEntityName方法,用於從MetricsMetaInfo提取entityName

MetricValuesExportService

skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java

<code>public interface MetricValuesExportService extends Service {
  /**
    * This method is sync-mode export, the performance effects the persistence result. Queue mode is highly recommended.
    *
    * @param event value is only accurate when the method invokes. Don't cache it.
    */
  void export(ExportEvent event);
}/<code>
  • MetricValuesExportService繼承了Service,它定義了export方法

GRPCExporter

skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java

<code>public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<grpcexporter.exportdata> {
  private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class);

  private GRPCExporterSetting setting;
  private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
  private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
  private final DataCarrier exportBuffer;
  private final Set<string> subscriptionSet;

  public GRPCExporter(GRPCExporterSetting setting) {
      this.setting = setting;
      GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort());
      client.connect();
      ManagedChannel channel = client.getChannel();
      exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
      blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
      exportBuffer = new DataCarrier<exportdata>(setting.getBufferChannelNum(), setting.getBufferChannelSize());
      exportBuffer.consume(this, 1, 200);
      subscriptionSet = new HashSet<>();
  }

  @Override public void export(ExportEvent event) {
      if (ExportEvent.EventType.TOTAL == event.getType()) {
          Metrics metrics = event.getMetrics();
          if (metrics instanceof WithMetadata) {
              MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta();
              if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) {
                  exportBuffer.produce(new ExportData(meta, metrics));
              }
          }
      }
  }

  public void initSubscriptionList() {
      SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build());
      subscription.getMetricNamesList().forEach(subscriptionSet::add);
      logger.debug("Get exporter subscription list, {}", subscriptionSet);
  }

  @Override public void init() {

  }

  @Override public void consume(List<exportdata> data) {
      if (data.size() == 0) {
          return;
      }

      ExportStatus status = new ExportStatus();
      StreamObserver<exportmetricvalue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export(
          new StreamObserver<exportresponse>() {
              @Override public void onNext(ExportResponse response) {

              }

              @Override public void onError(Throwable throwable) {
                  status.done();
              }

              @Override public void onCompleted() {
                  status.done();
              }
          }
      );
      AtomicInteger exportNum = new AtomicInteger();
      data.forEach(row -> {
          ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();

          Metrics metrics = row.getMetrics();
          if (metrics instanceof LongValueHolder) {
              long value = ((LongValueHolder)metrics).getValue();
              builder.setLongValue(value);
              builder.setType(ValueType.LONG);
          } else if (metrics instanceof IntValueHolder) {
              long value = ((IntValueHolder)metrics).getValue();
              builder.setLongValue(value);
              builder.setType(ValueType.LONG);
          } else if (metrics instanceof DoubleValueHolder) {
              double value = ((DoubleValueHolder)metrics).getValue();
              builder.setDoubleValue(value);
              builder.setType(ValueType.DOUBLE);
          } else {
              return;
          }

          MetricsMetaInfo meta = row.getMeta();
          builder.setMetricName(meta.getMetricsName());
          String entityName = getEntityName(meta);
          if (entityName == null) {
              return;
          }
          builder.setEntityName(entityName);
          builder.setEntityId(meta.getId());

          builder.setTimeBucket(metrics.getTimeBucket());

          streamObserver.onNext(builder.build());
          exportNum.getAndIncrement();
      });

      streamObserver.onCompleted();

      long sleepTime = 0;
      long cycle = 100L;
      /**
        * For memory safe of oap, we must wait for the peer confirmation.
        */
      while (!status.isDone()) {
          try {
              sleepTime += cycle;
              Thread.sleep(cycle);
          } catch (InterruptedException e) {
          }

          if (sleepTime > 2000L) {
              logger.warn("Export {} metrics to {}:{}, wait {} milliseconds.",
                  exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);
              cycle = 2000L;
          }
      }

      logger.debug("Exported {} metrics to {}:{} in {} milliseconds.",
          exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);
  }

  @Override public void onError(List<exportdata> data, Throwable t) {
      logger.error(t.getMessage(), t);
  }

  @Override public void onExit() {

  }

  @Getter(AccessLevel.PRIVATE)
  public class ExportData {
      private MetricsMetaInfo meta;
      private Metrics metrics;

      public ExportData(MetricsMetaInfo meta, Metrics metrics) {
          this.meta = meta;
          this.metrics = metrics;
      }
  }

  private class ExportStatus {
      private boolean done = false;

      private void done() {
          done = true;
      }

      public boolean isDone() {
          return done;
      }
  }
}/<exportdata>/<exportresponse>/<exportmetricvalue>/<exportdata>/<exportdata>/<string>/<grpcexporter.exportdata>/<code>
  • GRPCExporter繼承了MetricFormatter,實現了MetricValuesExportService、IConsumer接口;其構造器根據GRPCExporterSetting實例化MetricExportServiceGrpc.MetricExportServiceStub以及MetricExportServiceGrpc.MetricExportServiceBlockingStub,並創建DataCarrier,然後註冊自身的IConsumer到exportBuffer;其export方法主要是執行exportBuffer.produce(new ExportData(meta, metrics));其consume方法主要是構造ExportMetricValue,然後執行streamObserver.onNext

小結

metric-exporter.proto定義了MetricExportService服務,它有export、subscription兩個rpc方法;GRPCExporterProvider繼承了ModuleProvider,其prepare方法創建GRPCExporter,然後執行registerServiceImplementation;其notifyAfterCompleted方法主要是給exporter設置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然後執行exporter.initSubscriptionList()

doc

  • metric-exporter


分享到:


相關文章: