Dubbo + Zipkin + Brave實現全鏈路追蹤

* @modifytime:
*/
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class TracingFilter implements Filter {
private static final Logger log = LoggerFactory.getLogger(TracingFilter.class);


private static Tracing tracing;
private static Tracer tracer;
private static TraceContext.Extractor> extractor;
private static TraceContext.Injector> injector;
static final Propagation.Getter, String> GETTER =
new Propagation.Getter, String>() {
@Override
public String get(Map carrier, String key) {
return carrier.get(key);
}
@Override
public String toString() {
return "Map::get";
}
};
static final Propagation.Setter, String> SETTER =
new Propagation.Setter, String>() {
@Override
public void put(Map carrier, String key, String value) {
carrier.put(key, value);
}
@Override
public String toString() {
return "Map::set";
}
};
static {
// 1
Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
// 2
AsyncReporter asyncReporter = AsyncReporter.builder(sender)
.closeTimeout(500, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);
// 3
tracing = Tracing.newBuilder()
.localServiceName("tracer-client")
.spanReporter(asyncReporter)
.sampler(Sampler.ALWAYS_SAMPLE)
.propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name"))
.build();
tracer = tracing.tracer();
// 4

// 4.1
extractor = tracing.propagation().extractor(GETTER);
// 4.2
injector = tracing.propagation().injector(SETTER);
}
public TracingFilter() {
}
@Override
public Result invoke(Invoker> invoker, Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
// 5
Span.Kind kind = rpcContext.isProviderSide() ? Span.Kind.SERVER : Span.Kind.CLIENT;
final Span span;
if (kind.equals(Span.Kind.CLIENT)) {
//6
span = tracer.nextSpan();
//7
injector.inject(span.context(), invocation.getAttachments());
} else {
//8
TraceContextOrSamplingFlags extracted = extractor.extract(invocation.getAttachments());
//9
span = extracted.context() != null ? tracer.joinSpan(extracted.context()) : tracer.nextSpan(extracted);
}
if (!span.isNoop()) {
span.kind(kind).start();
//10
String service = invoker.getInterface().getSimpleName();
String method = RpcUtils.getMethodName(invocation);
span.kind(kind);
span.name(service + "/" + method);
InetSocketAddress remoteAddress = rpcContext.getRemoteAddress();
span.remoteIpAndPort(
remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : remoteAddress.getHostName(),remoteAddress.getPort());
}
boolean isOneway = false, deferFinish = false;
try (Tracer.SpanInScope scope = tracer.withSpanInScope(span)){
//11
collectArguments(invocation, span, kind);
Result result = invoker.invoke(invocation);
if (result.hasException()) {
onError(result.getException(), span);
}
// 12
isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
// 13
Future future = rpcContext.getFuture();
if (future instanceof FutureAdapter) {
deferFinish = true;

((FutureAdapter) future).getFuture().setCallback(new FinishSpanCallback(span));// 14
}
return result;
} catch (Error | RuntimeException e) {
onError(e, span);
throw e;
} finally {
if (isOneway) { // 15
span.flush();
} else if (!deferFinish) { // 16
span.finish();
}
}
}
static void onError(Throwable error, Span span) {
span.error(error);
if (error instanceof RpcException) {
span.tag("dubbo.error_msg", RpcExceptionEnum.getMsgByCode(((RpcException) error).getCode()));
}
}
static void collectArguments(Invocation invocation, Span span, Span.Kind kind) {
if (kind == Span.Kind.CLIENT) {
StringBuilder fqcn = new StringBuilder();
Object[] args = invocation.getArguments();
if (args != null && args.length > 0) {
try {
fqcn.append(JSON.json(args));
} catch (IOException e) {
log.warn(e.getMessage(), e);
}
}
span.tag("args", fqcn.toString());
}
}
static final class FinishSpanCallback implements ResponseCallback {
final Span span;
FinishSpanCallback(Span span) {
this.span = span;
}
@Override
public void done(Object response) {
span.finish();
}
@Override
public void caught(Throwable exception) {
onError(exception, span);
span.finish();
}
}
// 17

private enum RpcExceptionEnum {
UNKNOWN_EXCEPTION(0, "unknown exception"),
NETWORK_EXCEPTION(1, "network exception"),
TIMEOUT_EXCEPTION(2, "timeout exception"),
BIZ_EXCEPTION(3, "biz exception"),
FORBIDDEN_EXCEPTION(4, "forbidden exception"),
SERIALIZATION_EXCEPTION(5, "serialization exception"),;
private int code;
private String msg;
RpcExceptionEnum(int code, String msg) {
this.code = code;
this.msg = msg;
}
public static String getMsgByCode(int code) {
for (RpcExceptionEnum error : RpcExceptionEnum.values()) {
if (code == error.code) {
return error.msg;
}
}
return null;
}
}
}
  1. 構建客戶端發送工具
  2. 構建異步reporter
  3. 構建tracing上下文
  4. 初始化injector 和 Extractor [tab]4.1 extractor 指數據提取對象,用於在carrier中提取TraceContext相關信息或者採樣標記信息到TraceContextOrSamplingFlags 中 -4.2 injector 用於將TraceContext中的各種數據注入到carrier中,其中carrier一半是指數據傳輸中的載體,類似於Dubbo中Invocation中的attachment(附件集合)
  5. 判斷此次調用是作為服務端還是客戶端
  6. rpc客戶端調用會從ThreadLocal中獲取parent的 TraceContext ,為新生成的Span指定traceId及 parentId如果沒有parent traceContext 則生成的Span為 root span
  7. 將Span綁定的TraceContext中 屬性信息 Copy 到 Invocation中達到遠程參數傳遞的作用
  8. rpc服務提供端 , 從invocation中提取TraceContext相關信息及採樣數據信息
  9. 生成span , 兼容初次服務端調用
  10. 記錄接口信息及遠程IP Port
  11. 將創建的Span 作為當前Span (可以通過Tracer.currentSpan 訪問到它) 並設置查詢範圍
  12. oneway調用即只請求不接受結果
  13. 如果future不為空則為 async 調用 在回調中finish span
  14. 設置異步回調,回調代碼執行span finish() .
  15. oneway調用 因為不需等待返回值 即沒有 cr (Client Receive) 需手動flush()
  16. 同步調用 業務代碼執行完畢後需手動finish()
  17. 設置枚舉類 與 Dubbo中RpcException保持對應

測試項

  • Dubbo sync async oneway 測試
  • RPC異常測試
  • 普通業務異常測試
  • 併發測試

配置方式

POM依賴添加


com.github.baker
Tracing
1.0-SNAPSHOT

資源目錄根路徑下添加tracing.properties文件

Dubbo + Zipkin + Brave實現全鏈路追蹤

一次調用信息

Dubbo + Zipkin + Brave實現全鏈路追蹤

調用鏈

Dubbo + Zipkin + Brave實現全鏈路追蹤

調用成功失敗彙總

Dubbo + Zipkin + Brave實現全鏈路追蹤

zipkinHost 指定zipkin服務器IP:PORT 默認為localhost:9411 serviceName 指定應用名稱 默認為trace-default

調用鏈:

Dubbo + Zipkin + Brave實現全鏈路追蹤

待擴展項

  • 抽象數據傳輸(擴展Kafka數據傳輸)
  • 調用返回值數據打印
  • 更靈活的配置方式


分享到:


相關文章: