聊聊skywalking的grpc-configuration-sync

本文主要研究一下skywalking的grpc-configuration-sync


聊聊skywalking的grpc-configuration-sync


configuration-service.proto

skywalking-6.6.0/oap-server/server-configuration/grpc-configuration-sync/src/main/proto/configuration-service.proto

<code>syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.apache.skywalking.oap.server.configuration.service";

service ConfigurationService {
  rpc call (ConfigurationRequest) returns (ConfigurationResponse) {
  }
}

message ConfigurationRequest {
  // Logic name of this cluster,
  // in case the remote configuration center implementation support
  // configuration management for multiple clusters.
  string clusterName = 1;
}

message ConfigurationResponse {
  // Include all config items.
  // All config name should be not empty,
  // the name is composed by "module name"."provider name"."item name".
  // Each watcher implementor provides this, and it will be notified when the value changed.
  //
  // If the config center wants to set the value to NULL or empty,
  // must set the name with empty value explicitly.
  repeated Config configTable = 1;
}

message Config {
  string name = 1;
  string value = 2;
}/<code>
  • configuration-service.proto定義了ConfigurationService,它有一個call方法,參數為ConfigurationRequest,返回ConfigurationResponse

GRPCConfigurationProvider

skywalking-6.6.0/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigurationProvider.java

<code>public class GRPCConfigurationProvider extends AbstractConfigurationProvider {
  private RemoteEndpointSettings settings;

  public GRPCConfigurationProvider() {
      settings = new RemoteEndpointSettings();
  }

  @Override public String name() {
      return "grpc";
  }

  @Override public ModuleConfig createConfigBeanIfAbsent() {
      return settings;
  }

  @Override protected ConfigWatcherRegister initConfigReader() throws ModuleStartException {
      if (Strings.isNullOrEmpty(settings.getHost())) {
          throw new ModuleStartException("No host setting.");
      }
      if (settings.getPort() < 1) {
          throw new ModuleStartException("No port setting.");
      }

      return new GRPCConfigWatcherRegister(settings);
  }
}/<code>
  • GRPCConfigurationProvider繼承了AbstractConfigurationProvider,它的initConfigReader根據RemoteEndpointSettings創建GRPCConfigWatcherRegister

RemoteEndpointSettings

skywalking-6.6.0/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/RemoteEndpointSettings.java

<code>@Setter
@Getter
public class RemoteEndpointSettings extends ModuleConfig {
  private String host;
  private int port;
  private String clusterName = "default";
  // Sync configuration per 60 seconds.
  private int period = 60;

  @Override public String toString() {
      return "RemoteEndpointSettings{" +
          "host='" + host + '\\'' +
          ", port=" + port +
          ", clusterName='" + clusterName + '\\'' +
          '}';
  }
}/<code>
  • public class RemoteEndpointSettings定義了host、port、clusterName、period屬性

GRPCConfigWatcherRegister

skywalking-6.6.0/oap-server/server-configuration/grpc-configuration-sync/src/main/java/org/apache/skywalking/oap/server/configuration/grpc/GRPCConfigWatcherRegister.java

<code>public class GRPCConfigWatcherRegister extends ConfigWatcherRegister {
  private static final Logger logger = LoggerFactory.getLogger(GRPCConfigWatcherRegister.class);

  private RemoteEndpointSettings settings;
  private ConfigurationServiceGrpc.ConfigurationServiceBlockingStub stub;

  public GRPCConfigWatcherRegister(RemoteEndpointSettings settings) {
      super(settings.getPeriod());
      this.settings = settings;
      stub = ConfigurationServiceGrpc.newBlockingStub(NettyChannelBuilder.forAddress(settings.getHost(), settings.getPort()).usePlaintext().build());
  }

  @Override public ConfigTable readConfig(Set<string> keys) {
      ConfigTable table = new ConfigTable();
      try {
          ConfigurationResponse response = stub.call(ConfigurationRequest.newBuilder().setClusterName(settings.getClusterName()).build());
          response.getConfigTableList().forEach(config -> {
              final String name = config.getName();
              if (keys.contains(name)) {
                  table.add(new ConfigTable.ConfigItem(name, config.getValue()));
              }
          });
      } catch (Exception e) {
          logger.error("Remote config center [" + settings + "] is not available.", e);
      }
      return table;
  }
}/<string>/<code>
  • GRPCConfigWatcherRegister繼承了ConfigWatcherRegister,它的構造器根據RemoteEndpointSettings構造ConfigurationServiceGrpc.ConfigurationServiceBlockingStub;其readConfig通過stub.call方法獲取configTableList然後添加到table中

小結

GRPCConfigurationProvider繼承了AbstractConfigurationProvider,它的initConfigReader根據RemoteEndpointSettings創建GRPCConfigWatcherRegister;GRPCConfigWatcherRegister繼承了ConfigWatcherRegister,它的構造器根據RemoteEndpointSettings構造ConfigurationServiceGrpc.ConfigurationServiceBlockingStub;其readConfig通過stub.call方法獲取configTableList然後添加到table中

doc

  • GRPCConfigurationProvider


分享到:


相關文章: