序
本文主要研究一下skywalking的metric-exporter
metric-exporter.proto
skywalking-6.6.0/oap-server/exporter/src/main/proto/metric-exporter.proto
<code>syntax = "proto3";
option java_multiple_files = true;
option java_package = "org.apache.skywalking.oap.server.exporter.grpc";
service MetricExportService {
rpc export (stream ExportMetricValue) returns (ExportResponse) {
}
rpc subscription (SubscriptionReq) returns (SubscriptionsResp) {
}
}
message ExportMetricValue {
string metricName = 1;
string entityName = 2;
string entityId = 3;
ValueType type = 4;
int64 timeBucket = 5;
int64 longValue = 6;
double doubleValue = 7;
}
message SubscriptionsResp {
repeated string metricNames = 1;
}
enum ValueType {
LONG = 0;
DOUBLE = 1;
}
message SubscriptionReq {
}
message ExportResponse {
}/<code>
- metric-exporter.proto定義了MetricExportService服務,它有export、subscription兩個rpc方法
GRPCExporterSetting
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterSetting.java
<code>@Setter
@Getter
public class GRPCExporterSetting extends ModuleConfig {
private String targetHost;
private int targetPort;
private int bufferChannelSize = 20000;
private int bufferChannelNum = 2;
}/<code>
- GRPCExporterSetting定義了targetHost、targetPort、bufferChannelSize、bufferChannelNum屬性
GRPCExporterProvider
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporterProvider.java
<code>public class GRPCExporterProvider extends ModuleProvider {
private GRPCExporterSetting setting;
private GRPCExporter exporter;
@Override public String name() {
return "grpc";
}
@Override public Class extends ModuleDefine> module() {
return ExporterModule.class;
}
@Override public ModuleConfig createConfigBeanIfAbsent() {
setting = new GRPCExporterSetting();
return setting;
}
@Override public void prepare() throws ServiceNotProvidedException, ModuleStartException {
exporter = new GRPCExporter(setting);
this.registerServiceImplementation(MetricValuesExportService.class, exporter);
}
@Override public void start() throws ServiceNotProvidedException, ModuleStartException {
}
@Override public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
ModuleServiceHolder serviceHolder = getManager().find(CoreModule.NAME).provider();
exporter.setServiceInventoryCache(serviceHolder.getService(ServiceInventoryCache.class));
exporter.setServiceInstanceInventoryCache(serviceHolder.getService(ServiceInstanceInventoryCache.class));
exporter.setEndpointInventoryCache(serviceHolder.getService(EndpointInventoryCache.class));
exporter.initSubscriptionList();
}
@Override public String[] requiredModules() {
return new String[] {CoreModule.NAME};
}
}/<code>
- GRPCExporterProvider繼承了ModuleProvider,其prepare方法創建GRPCExporter,然後執行registerServiceImplementation;其notifyAfterCompleted方法主要是給exporter設置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然後執行exporter.initSubscriptionList()
MetricFormatter
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/MetricFormatter.java
<code>@Setter
public class MetricFormatter {
private ServiceInventoryCache serviceInventoryCache;
private ServiceInstanceInventoryCache serviceInstanceInventoryCache;
private EndpointInventoryCache endpointInventoryCache;
protected String getEntityName(MetricsMetaInfo meta) {
int scope = meta.getScope();
if (DefaultScopeDefine.inServiceCatalog(scope)) {
int entityId = Integer.valueOf(meta.getId());
return serviceInventoryCache.get(entityId).getName();
} else if (DefaultScopeDefine.inServiceInstanceCatalog(scope)) {
int entityId = Integer.valueOf(meta.getId());
return serviceInstanceInventoryCache.get(entityId).getName();
} else if (DefaultScopeDefine.inEndpointCatalog(scope)) {
int entityId = Integer.valueOf(meta.getId());
return endpointInventoryCache.get(entityId).getName();
} else if (scope == DefaultScopeDefine.ALL) {
return "";
} else {
return null;
}
}
}/<code>
- MetricFormatter提供了getEntityName方法,用於從MetricsMetaInfo提取entityName
MetricValuesExportService
skywalking-6.6.0/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/exporter/MetricValuesExportService.java
<code>public interface MetricValuesExportService extends Service {
/**
* This method is sync-mode export, the performance effects the persistence result. Queue mode is highly recommended.
*
* @param event value is only accurate when the method invokes. Don't cache it.
*/
void export(ExportEvent event);
}/<code>
- MetricValuesExportService繼承了Service,它定義了export方法
GRPCExporter
skywalking-6.6.0/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java
<code>public class GRPCExporter extends MetricFormatter implements MetricValuesExportService, IConsumer<grpcexporter.exportdata> {
private static final Logger logger = LoggerFactory.getLogger(GRPCExporter.class);
private GRPCExporterSetting setting;
private final MetricExportServiceGrpc.MetricExportServiceStub exportServiceFutureStub;
private final MetricExportServiceGrpc.MetricExportServiceBlockingStub blockingStub;
private final DataCarrier exportBuffer;
private final Set<string> subscriptionSet;
public GRPCExporter(GRPCExporterSetting setting) {
this.setting = setting;
GRPCClient client = new GRPCClient(setting.getTargetHost(), setting.getTargetPort());
client.connect();
ManagedChannel channel = client.getChannel();
exportServiceFutureStub = MetricExportServiceGrpc.newStub(channel);
blockingStub = MetricExportServiceGrpc.newBlockingStub(channel);
exportBuffer = new DataCarrier<exportdata>(setting.getBufferChannelNum(), setting.getBufferChannelSize());
exportBuffer.consume(this, 1, 200);
subscriptionSet = new HashSet<>();
}
@Override public void export(ExportEvent event) {
if (ExportEvent.EventType.TOTAL == event.getType()) {
Metrics metrics = event.getMetrics();
if (metrics instanceof WithMetadata) {
MetricsMetaInfo meta = ((WithMetadata)metrics).getMeta();
if (subscriptionSet.size() == 0 || subscriptionSet.contains(meta.getMetricsName())) {
exportBuffer.produce(new ExportData(meta, metrics));
}
}
}
}
public void initSubscriptionList() {
SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build());
subscription.getMetricNamesList().forEach(subscriptionSet::add);
logger.debug("Get exporter subscription list, {}", subscriptionSet);
}
@Override public void init() {
}
@Override public void consume(List<exportdata> data) {
if (data.size() == 0) {
return;
}
ExportStatus status = new ExportStatus();
StreamObserver<exportmetricvalue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export(
new StreamObserver<exportresponse>() {
@Override public void onNext(ExportResponse response) {
}
@Override public void onError(Throwable throwable) {
status.done();
}
@Override public void onCompleted() {
status.done();
}
}
);
AtomicInteger exportNum = new AtomicInteger();
data.forEach(row -> {
ExportMetricValue.Builder builder = ExportMetricValue.newBuilder();
Metrics metrics = row.getMetrics();
if (metrics instanceof LongValueHolder) {
long value = ((LongValueHolder)metrics).getValue();
builder.setLongValue(value);
builder.setType(ValueType.LONG);
} else if (metrics instanceof IntValueHolder) {
long value = ((IntValueHolder)metrics).getValue();
builder.setLongValue(value);
builder.setType(ValueType.LONG);
} else if (metrics instanceof DoubleValueHolder) {
double value = ((DoubleValueHolder)metrics).getValue();
builder.setDoubleValue(value);
builder.setType(ValueType.DOUBLE);
} else {
return;
}
MetricsMetaInfo meta = row.getMeta();
builder.setMetricName(meta.getMetricsName());
String entityName = getEntityName(meta);
if (entityName == null) {
return;
}
builder.setEntityName(entityName);
builder.setEntityId(meta.getId());
builder.setTimeBucket(metrics.getTimeBucket());
streamObserver.onNext(builder.build());
exportNum.getAndIncrement();
});
streamObserver.onCompleted();
long sleepTime = 0;
long cycle = 100L;
/**
* For memory safe of oap, we must wait for the peer confirmation.
*/
while (!status.isDone()) {
try {
sleepTime += cycle;
Thread.sleep(cycle);
} catch (InterruptedException e) {
}
if (sleepTime > 2000L) {
logger.warn("Export {} metrics to {}:{}, wait {} milliseconds.",
exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);
cycle = 2000L;
}
}
logger.debug("Exported {} metrics to {}:{} in {} milliseconds.",
exportNum.get(), setting.getTargetHost(), setting.getTargetPort(), sleepTime);
}
@Override public void onError(List<exportdata> data, Throwable t) {
logger.error(t.getMessage(), t);
}
@Override public void onExit() {
}
@Getter(AccessLevel.PRIVATE)
public class ExportData {
private MetricsMetaInfo meta;
private Metrics metrics;
public ExportData(MetricsMetaInfo meta, Metrics metrics) {
this.meta = meta;
this.metrics = metrics;
}
}
private class ExportStatus {
private boolean done = false;
private void done() {
done = true;
}
public boolean isDone() {
return done;
}
}
}/<exportdata>/<exportresponse>/<exportmetricvalue>/<exportdata>/<exportdata>/<string>/<grpcexporter.exportdata>/<code>
- GRPCExporter繼承了MetricFormatter,實現了MetricValuesExportService、IConsumer接口;其構造器根據GRPCExporterSetting實例化MetricExportServiceGrpc.MetricExportServiceStub以及MetricExportServiceGrpc.MetricExportServiceBlockingStub,並創建DataCarrier,然後註冊自身的IConsumer到exportBuffer;其export方法主要是執行exportBuffer.produce(new ExportData(meta, metrics));其consume方法主要是構造ExportMetricValue,然後執行streamObserver.onNext
小結
metric-exporter.proto定義了MetricExportService服務,它有export、subscription兩個rpc方法;GRPCExporterProvider繼承了ModuleProvider,其prepare方法創建GRPCExporter,然後執行registerServiceImplementation;其notifyAfterCompleted方法主要是給exporter設置serviceInventoryCache、serviceInstanceInventoryCache、endpointInventoryCache,然後執行exporter.initSubscriptionList()
doc
- metric-exporter
閱讀更多 碼匠亂燉 的文章