聊聊skywalking的register-receiver-plugin

本文主要研究一下skywalking的register-receiver-plugin


聊聊skywalking的register-receiver-plugin


RegisterModuleProvider

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/RegisterModuleProvider.java

<code>public class RegisterModuleProvider extends ModuleProvider {

  @Override public String name() {
      return "default";
  }

  @Override public Class extends ModuleDefine> module() {
      return RegisterModule.class;
  }

  @Override public ModuleConfig createConfigBeanIfAbsent() {
      return null;
  }

  @Override public void prepare() {
  }

  @Override public void start() {
      GRPCHandlerRegister grpcHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(GRPCHandlerRegister.class);
      grpcHandlerRegister.addHandler(new ApplicationRegisterHandler(getManager()));
      grpcHandlerRegister.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
      grpcHandlerRegister.addHandler(new ServiceNameDiscoveryHandler(getManager()));
      grpcHandlerRegister.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));

      // v2
      grpcHandlerRegister.addHandler(new RegisterServiceHandler(getManager()));
      grpcHandlerRegister.addHandler(new ServiceInstancePingServiceHandler(getManager()));

      JettyHandlerRegister jettyHandlerRegister = getManager().find(SharingServerModule.NAME).provider().getService(JettyHandlerRegister.class);
      jettyHandlerRegister.addHandler(new ApplicationRegisterServletHandler(getManager()));
      jettyHandlerRegister.addHandler(new InstanceDiscoveryServletHandler(getManager()));
      jettyHandlerRegister.addHandler(new InstanceHeartBeatServletHandler(getManager()));
      jettyHandlerRegister.addHandler(new NetworkAddressRegisterServletHandler(getManager()));
      jettyHandlerRegister.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
  }

  @Override public void notifyAfterCompleted() {

  }

  @Override public String[] requiredModules() {
      return new String[] {CoreModule.NAME, SharingServerModule.NAME};
  }
}/<code>
  • RegisterModuleProvider繼承了ModuleProvider,其start方法獲取grpcHandlerRegister,並註冊了ApplicationRegisterHandler、InstanceDiscoveryServiceHandler、ServiceNameDiscoveryHandler、NetworkAddressRegisterServiceHandler、RegisterServiceHandler、ServiceInstancePingServiceHandler;同時也獲取jettyHandlerRegister,並註冊了ApplicationRegisterServletHandler、InstanceDiscoveryServletHandler、InstanceHeartBeatServletHandler、NetworkAddressRegisterServletHandler、ServiceNameDiscoveryServiceHandler

ApplicationRegisterServletHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/ApplicationRegisterServletHandler.java

<code>public class ApplicationRegisterServletHandler extends JettyJsonHandler {

  private static final Logger logger = LoggerFactory.getLogger(ApplicationRegisterServletHandler.class);

  private final IServiceInventoryRegister serviceInventoryRegister;
  private Gson gson = new Gson();
  private static final String APPLICATION_CODE = "c";
  private static final String APPLICATION_ID = "i";

  public ApplicationRegisterServletHandler(ModuleManager moduleManager) {
      serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
  }

  @Override public String pathSpec() {
      return "/application/register";
  }

  @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
      throw new UnsupportedOperationException();
  }

  @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
      JsonArray responseArray = new JsonArray();
      try {
          JsonArray applicationCodes = gson.fromJson(req.getReader(), JsonArray.class);
          for (int i = 0; i < applicationCodes.size(); i++) {
              String applicationCode = applicationCodes.get(i).getAsString();
              int applicationId = serviceInventoryRegister.getOrCreate(applicationCode, null);
              JsonObject mapping = new JsonObject();
              mapping.addProperty(APPLICATION_CODE, applicationCode);
              mapping.addProperty(APPLICATION_ID, applicationId);
              responseArray.add(mapping);
          }
      } catch (IOException e) {
          logger.error(e.getMessage(), e);
      }
      return responseArray;
  }
}/<code>
  • ApplicationRegisterServletHandler繼承了JettyJsonHandler,其doPost方法獲取applicationCode,然後執行serviceInventoryRegister.getOrCreate(applicationCode, null)

InstanceDiscoveryServletHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceDiscoveryServletHandler.java

<code>public class InstanceDiscoveryServletHandler extends JettyJsonHandler {

  private static final Logger logger = LoggerFactory.getLogger(InstanceDiscoveryServletHandler.class);

  private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
  private final ServiceInventoryCache serviceInventoryCache;
  private final Gson gson = new Gson();

  private static final String APPLICATION_ID = "ai";
  private static final String AGENT_UUID = "au";
  private static final String REGISTER_TIME = "rt";
  private static final String INSTANCE_ID = "ii";
  private static final String OS_INFO = "oi";

  public InstanceDiscoveryServletHandler(ModuleManager moduleManager) {
      this.serviceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInventoryCache.class);
      this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class);
  }

  @Override public String pathSpec() {
      return "/instance/register";
  }

  @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
      throw new UnsupportedOperationException();
  }

  @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
      JsonObject responseJson = new JsonObject();
      try {
          JsonObject instance = gson.fromJson(req.getReader(), JsonObject.class);
          int applicationId = instance.get(APPLICATION_ID).getAsInt();
          String agentUUID = instance.get(AGENT_UUID).getAsString();
          long registerTime = instance.get(REGISTER_TIME).getAsLong();
          JsonObject osInfoJson = instance.get(OS_INFO).getAsJsonObject();

          List<string> ipv4sList = new ArrayList<>();
          JsonArray ipv4s = osInfoJson.get("ipv4s").getAsJsonArray();
          ipv4s.forEach(ipv4 -> ipv4sList.add(ipv4.getAsString()));

          ServiceInventory serviceInventory = serviceInventoryCache.get(applicationId);

          JsonObject instanceProperties = new JsonObject();
          instanceProperties.addProperty(ServiceInstanceInventory.PropertyUtil.HOST_NAME, osInfoJson.get("hostName").getAsString());
          instanceProperties.addProperty(ServiceInstanceInventory.PropertyUtil.OS_NAME, osInfoJson.get("osName").getAsString());
          instanceProperties.addProperty(ServiceInstanceInventory.PropertyUtil.PROCESS_NO, osInfoJson.get("processId").getAsInt() + "");
          instanceProperties.addProperty(ServiceInstanceInventory.PropertyUtil.IPV4S, ServiceInstanceInventory.PropertyUtil.ipv4sSerialize(ipv4sList));

          String instanceName = serviceInventory.getName();
          if (instanceProperties.has(PROCESS_NO)) {
              instanceName += "-pid:" + instanceProperties.get(PROCESS_NO).getAsString();
          }
          if (instanceProperties.has(HOST_NAME)) {
              instanceName += "@" + instanceProperties.get(HOST_NAME).getAsString();
          }

          int instanceId = serviceInstanceInventoryRegister.getOrCreate(applicationId, instanceName, agentUUID, registerTime, instanceProperties);
          responseJson.addProperty(APPLICATION_ID, applicationId);
          responseJson.addProperty(INSTANCE_ID, instanceId);
      } catch (IOException e) {
          logger.error(e.getMessage(), e);
      }
      return responseJson;
  }
}/<string>/<code>
  • InstanceDiscoveryServletHandler繼承了JettyJsonHandler,其doPost方法構造instanceProperties然後執行serviceInstanceInventoryRegister.getOrCreate(applicationId, instanceName, agentUUID, registerTime, instanceProperties)

InstanceHeartBeatServletHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/InstanceHeartBeatServletHandler.java

<code>public class InstanceHeartBeatServletHandler extends JettyJsonHandler {

  private static final Logger logger = LoggerFactory.getLogger(InstanceHeartBeatServletHandler.class);

  private final IServiceInstanceInventoryRegister serviceInstanceInventoryRegister;
  private final ServiceInstanceInventoryCache serviceInstanceInventoryCache;
  private final IServiceInventoryRegister serviceInventoryRegister;
  private final Gson gson = new Gson();

  private static final String INSTANCE_ID = "ii";
  private static final String HEARTBEAT_TIME = "ht";

  public InstanceHeartBeatServletHandler(ModuleManager moduleManager) {
      this.serviceInstanceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInstanceInventoryRegister.class);
      this.serviceInstanceInventoryCache = moduleManager.find(CoreModule.NAME).provider().getService(ServiceInstanceInventoryCache.class);
      this.serviceInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(IServiceInventoryRegister.class);
  }

  @Override public String pathSpec() {
      return "/instance/heartbeat";
  }

  @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
      throw new UnsupportedOperationException();
  }

  @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException, IOException {
      JsonObject responseJson = new JsonObject();
      try {
          JsonObject heartBeat = gson.fromJson(req.getReader(), JsonObject.class);
          int instanceId = heartBeat.get(INSTANCE_ID).getAsInt();
          long heartBeatTime = heartBeat.get(HEARTBEAT_TIME).getAsLong();

          serviceInstanceInventoryRegister.heartbeat(instanceId, heartBeatTime);
          ServiceInstanceInventory serviceInstanceInventory = serviceInstanceInventoryCache.get(instanceId);
          if (Objects.nonNull(serviceInstanceInventory)) {
              serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime);
          } else {
              logger.warn("Can't found service by service instance id from cache, service instance id is: {}", instanceId);
          }
      } catch (IOException e) {
          logger.error(e.getMessage(), e);
      }
      return responseJson;
  }
}/<code>
  • InstanceHeartBeatServletHandler繼承了JettyJsonHandler,其doPost方法執行serviceInstanceInventoryRegister.heartbeat(instanceId, heartBeatTime),若serviceInstanceInventory不為null則執行serviceInventoryRegister.heartbeat(serviceInstanceInventory.getServiceId(), heartBeatTime)

NetworkAddressRegisterServletHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/NetworkAddressRegisterServletHandler.java

<code>public class NetworkAddressRegisterServletHandler extends JettyJsonHandler {

  private static final Logger logger = LoggerFactory.getLogger(NetworkAddressRegisterServletHandler.class);

  private final INetworkAddressInventoryRegister networkAddressInventoryRegister;
  private Gson gson = new Gson();
  private static final String NETWORK_ADDRESS = "n";
  private static final String ADDRESS_ID = "i";

  public NetworkAddressRegisterServletHandler(ModuleManager moduleManager) {
      this.networkAddressInventoryRegister = moduleManager.find(CoreModule.NAME).provider().getService(INetworkAddressInventoryRegister.class);
  }

  @Override public String pathSpec() {
      return "/networkAddress/register";
  }

  @Override protected JsonElement doGet(HttpServletRequest req) {
      throw new UnsupportedOperationException();
  }

  @Override protected JsonElement doPost(HttpServletRequest req) {
      JsonArray responseArray = new JsonArray();
      try {
          JsonArray networkAddresses = gson.fromJson(req.getReader(), JsonArray.class);
          for (int i = 0; i < networkAddresses.size(); i++) {
              String networkAddress = networkAddresses.get(i).getAsString();

              if (logger.isDebugEnabled()) {
                  logger.debug("network getAddress register, network getAddress: {}", networkAddress);
              }

              int addressId = networkAddressInventoryRegister.getOrCreate(networkAddress, null);
              JsonObject mapping = new JsonObject();
              mapping.addProperty(ADDRESS_ID, addressId);
              mapping.addProperty(NETWORK_ADDRESS, networkAddress);
              responseArray.add(mapping);
          }
      } catch (IOException e) {
          logger.error(e.getMessage(), e);
      }
      return responseArray;
  }
}/<code>
  • NetworkAddressRegisterServletHandler繼承了JettyJsonHandler,其doPost執行networkAddressInventoryRegister.getOrCreate(networkAddress, null)

ServiceNameDiscoveryServiceHandler

skywalking-6.6.0/oap-server/server-receiver-plugin/skywalking-register-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/register/provider/handler/v5/rest/ServiceNameDiscoveryServiceHandler.java

<code>public class ServiceNameDiscoveryServiceHandler extends JettyJsonHandler {

  private static final Logger logger = LoggerFactory.getLogger(ServiceNameDiscoveryServiceHandler.class);

  private final IEndpointInventoryRegister inventoryService;
  private final Gson gson = new Gson();

  private static final String APPLICATION_ID = "ai";
  private static final String SERVICE_NAME = "sn";
  private static final String SRC_SPAN_TYPE = "st";
  private static final String SERVICE_ID = "si";
  private static final String ELEMENT = "el";

  public ServiceNameDiscoveryServiceHandler(ModuleManager moduleManager) {
      this.inventoryService = moduleManager.find(CoreModule.NAME).provider().getService(IEndpointInventoryRegister.class);
  }

  @Override public String pathSpec() {
      return "/servicename/discovery";
  }

  @Override protected JsonElement doGet(HttpServletRequest req) throws ArgumentsParseException {
      throw new UnsupportedOperationException();
  }

  @Override protected JsonElement doPost(HttpServletRequest req) throws ArgumentsParseException {
      JsonArray responseArray = new JsonArray();
      try {
          JsonArray services = gson.fromJson(req.getReader(), JsonArray.class);
          for (JsonElement service : services) {
              int applicationId = service.getAsJsonObject().get(APPLICATION_ID).getAsInt();
              String serviceName = service.getAsJsonObject().get(SERVICE_NAME).getAsString();
              int srcSpanType = service.getAsJsonObject().get(SRC_SPAN_TYPE).getAsInt();

              SpanType spanType = SpanType.forNumber(srcSpanType);
              if (Objects.nonNull(spanType)) {
                  int serviceId = inventoryService.getOrCreate(applicationId, serviceName, DetectPoint.fromSpanType(spanType));
                  if (serviceId != 0) {
                      JsonObject responseJson = new JsonObject();
                      responseJson.addProperty(SERVICE_ID, serviceId);
                      responseJson.add(ELEMENT, service);
                      responseArray.add(responseJson);
                  }
              }
          }
      } catch (IOException e) {
          logger.error(e.getMessage(), e);
      }
      return responseArray;
  }
}/<code>
  • ServiceNameDiscoveryServiceHandler繼承了JettyJsonHandler,其doPost方法執行inventoryService.getOrCreate(applicationId, serviceName, DetectPoint.fromSpanType(spanType))

小結

RegisterModuleProvider繼承了ModuleProvider,其start方法獲取grpcHandlerRegister,並註冊了ApplicationRegisterHandler、InstanceDiscoveryServiceHandler、ServiceNameDiscoveryHandler、NetworkAddressRegisterServiceHandler、RegisterServiceHandler、ServiceInstancePingServiceHandler;同時也獲取jettyHandlerRegister,並註冊了ApplicationRegisterServletHandler、InstanceDiscoveryServletHandler、InstanceHeartBeatServletHandler、NetworkAddressRegisterServletHandler、ServiceNameDiscoveryServiceHandler

doc

  • RegisterModuleProvider
  • ApplicationRegisterServletHandler
  • InstanceDiscoveryServletHandler
  • InstanceHeartBeatServletHandler
  • NetworkAddressRegisterServletHandler
  • ServiceNameDiscoveryServiceHandler


分享到:


相關文章: