螞蟻金服分佈式鏈路跟蹤組件SOFATracer中Disruptor 實踐

OFAStack(S

calable Open Financial Architecture Stack)是螞蟻金服自主研發的金融級雲原生架構,包含了構建金融級雲原生架構所需的各個組件,是在金融場景裡錘鍊出來的最佳實踐。

SOFATracer 是一個用於分佈式系統調用跟蹤的組件,通過統一的 TraceId 將調用鏈路中的各種網絡調用情況以日誌的方式記錄下來,以達到透視化網絡調用的目的,這些鏈路數據可用於故障的快速發現,服務治理等。

Disruptor 簡介

Disruptor 旨在在異步事件處理體系結構中提供低延遲,高吞吐量的工作隊列。它確保任何數據僅由一個線程擁有以進行寫訪問,因此與其他結構相比,減少了寫爭用。目前,包括 Apache Storm、Camel、Log4j 2 在內的很多知名項目都應用了 Disruptor 以獲取高性能。

SOFATracer 也是基於 Disruptor 高性能無鎖循環隊列來提供異步打印日誌到本地磁盤能力的,SOFATracer 提供兩種類似的日誌打印類型即摘要日誌和統計日誌,摘要日誌:每一次調用均會落地磁盤的日誌;統計日誌:每隔一定時間間隔進行統計輸出的日誌;無論是哪種日誌的輸出,對於 SOFATracer 來說都需要保證較高的性能,以降低對於業務整體流程耗時的影響。

關於 Disruptor 的 一些原理分析可以參考:Disruptor 。

A High Performance Inter-Thread Messaging Library 高性能的線程間消息傳遞庫

案例

先通過 Disruptor 的一個小例子來有個直觀的認識;先看下它的構造函數:

<code>public Disruptor(
final EventFactory eventFactory,
final int ringBufferSize,
final ThreadFactory threadFactory,
final ProducerType producerType,
final WaitStrategy waitStrategy)
{
this(
RingBuffer.create(producerType, eventFactory, ringBufferSize, waitStrategy),
new BasicExecutor(threadFactory));
}
複製代碼
/<code>
  • eventFactory : 在環形緩衝區中創建事件的 factory;
  • ringBufferSize:環形緩衝區的大小,必須是2的冪;
  • threadFactory:用於為處理器創建線程;
  • producerType:生成器類型以支持使用正確的sequencer和publisher創建RingBuffer;枚舉類型,SINGLE、MULTI兩個項。對應於 SingleProducerSequencer和MultiProducerSequencer兩種Sequencer;
  • waitStrategy : 等待策略;

如果我們想構造一個 disruptor,那麼我們就需要上面的這些組件。從 eventFactory 來看,還需要一個具體的 Event 來作為消息事件的載體。【下面按照官方給的案例進行簡單的修改作為示例】

消息事件 LongEvent ,能夠被消費的數據載體

<code>public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
public long getValue() {
return value;
}
}
複製代碼/<code>

創建消息事件的 factory

<code>public class LongEventFactory implements EventFactory<longevent> {
@Override
public LongEvent newInstance() {
return new LongEvent();
}
}
複製代碼/<longevent>/<code>

ConsumerThreadFactory

<code>public class ConsumerThreadFactory implements ThreadFactory {
private final AtomicInteger index = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "disruptor-thread-" + index.getAndIncrement());
}
}
複製代碼/<code>

OK ,上面的這些可以滿足創建一個 disruptor 了:

<code>private int ringBufferCapacity = 8; 

//消息事件生產Factory
LongEventFactory longEventFactory = new LongEventFactory();
//執行事件處理器線程Factory
ConsumerThreadFactory consumerThreadFactory = new ConsumerThreadFactory();
//用於環形緩衝區的等待策略。
WaitStrategy waitStrategy = new BlockingWaitStrategy();

//構建disruptor
Disruptor<longevent> disruptor = new Disruptor<>(
longEventFactory,
ringBufferCapacity,
longEventThreadFactory,
ProducerType.SINGLE,
waitStrategy);
複製代碼/<longevent>/<code>

現在是已經有了 disruptor 了,然後通過:start 來啟動:

<code>//啟動 disruptor
disruptor.start();
複製代碼/<code>

到這裡,已經構建了一個disruptor;但是目前怎麼使用它來發布消息和消費消息呢?

發佈消息

下面在 for 循環中 發佈 5 條數據:

<code>RingBuffer<longevent> ringBuffer = disruptor.getRingBuffer();
for (long l = 0; l < 5; l++)
{
long sequence = ringBuffer.next();
LongEvent event = ringBuffer.get(sequence);
event.set(100+l);
System.out.println("publish event :" + l);
ringBuffer.publish(sequence);
Thread.sleep(1000);
}
複製代碼/<longevent>/<code>

消息已經發布,下面需要設定當前 disruptor 的消費處理器。前面已經有個 LongEvent 和 EventFactory ; 在 disruptor 中是通過 EventHandler 來進行消息消費的。

編寫消費者代碼

<code>public class LongEventHandler implements EventHandler<longevent> {
@Override
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
System.out.println("Event: " + event.getValue()+" -> " + Thread.currentThread().getName());
Thread.sleep(2000);
}
}
複製代碼/<longevent>/<code>

將 eventHandler 設置到 disruptor 的處理鏈上:

<code>//將處理事件的事件處理程序 -> 消費事件的處理程序
LongEventHandler longEventHandler = new LongEventHandler();
disruptor.handleEventsWith(longEventHandler);
複製代碼/<code>

運行結果(這裡)

<code>publish event :0
Event: 0 -> disruptor-thread-1
-------------------------------->
publish event :1
Event: 1 -> disruptor-thread-1
-------------------------------->
publish event :2
Event: 2 -> disruptor-thread-1
-------------------------------->
publish event :3
Event: 3 -> disruptor-thread-1
-------------------------------->
publish event :4
Event: 4 -> disruptor-thread-1
-------------------------------->
複製代碼/<code>

基本概念和原理

Disruptor

整個基於 ringBuffer 實現的生產者消費者模式的容器。主要屬性:

<code>private final RingBuffer ringBuffer;
private final Executor executor;
private final ConsumerRepository consumerRepository = new ConsumerRepository<>();
private final AtomicBoolean started = new AtomicBoolean(false);
private ExceptionHandler super T> exceptionHandler = new ExceptionHandlerWrapper<>();
複製代碼
/<code>
  • ringBuffer:內部持有一個 RingBuffer 對象,Disruptor 內部的事件發佈都是依賴這個 RingBuffer 對象完成的;
  • executor:消費事件的線程池;
  • consumerRepository:提供存儲庫機制,用於將 EventHandler 與 EventProcessor 關聯起來;
  • started : 用於標誌當前 Disruptor 是否已經啟動;
  • exceptionHandler : 異常處理器,用於處理 BatchEventProcessor 事件週期中 uncaught exceptions;

RingBuffer

環形隊列【實現上是一個數組】,可以類比為 BlockingQueue 之類的隊列,ringBuffer 的使用,使得內存被循環使用,減少了某些場景的內存分配回收擴容等耗時操作。

<code>public final class RingBuffer extends RingBufferFields 
implements Cursored, EventSequencer, EventSink
複製代碼
/<code>
  • E:在事件的交換或並行協調期間存儲用於共享的數據的實現 -> 消息事件;

Sequencer

RingBuffer 中生產者的頂級父接口,其直接實現有 SingleProducerSequencer 和 MultiProducerSequencer;對應 SINGLE、MULTI 兩個枚舉值。


螞蟻金服分佈式鏈路跟蹤組件SOFATracer中Disruptor 實踐


EventHandler

事件處置器,改接口用於對外擴展來實現具體的消費邏輯。如上面 Demo 中的 LongEventHandler ;

<code>//回調接口,用於處理{@link RingBuffer}中可用的事件
public interface EventHandler {
void onEvent(T event, long sequence, boolean endOfBatch) throws Exception;
}
複製代碼
/<code>
  • event : RingBuffer 已經發布的事件;
  • sequence : 正在處理的事件的序列號;
  • endOfBatch : 用來標識否是來自 RingBuffer 的批次中的最後一個事件;

SequenceBarrier

消費者路障,規定了消費者如何向下走。事實上,該路障算是變向的鎖。

<code>final class ProcessingSequenceBarrier implements SequenceBarrier {
//當等待(探測)的需要不可用時,等待的策略
private final WaitStrategy waitStrategy;
//依賴的其它Consumer的序號,這個用於依賴的消費的情況,
//比如A、B兩個消費者,只有A消費完,B才能消費。

private final Sequence dependentSequence;
private volatile boolean alerted = false;
//Ringbuffer的寫入指針
private final Sequence cursorSequence;
//RingBuffer對應的Sequencer
private final Sequencer sequencer;
//exclude method
}
複製代碼/<code>

waitStrategy 決定了消費者採用何種等待策略。

WaitStrategy

Strategy employed for making {@link EventProcessor}s wait on a cursor {@link Sequence}.

EventProcessor 的等待策略;具體實現在 disruptor 中有 8 種:


螞蟻金服分佈式鏈路跟蹤組件SOFATracer中Disruptor 實踐


這些等待策略不同的核心體現是在如何實現 waitFor 這個方法上。

EventProcessor

事件處理器,實際上可以理解為消費者模型的框架,實現了線程 Runnable 的 run 方法,將循環判斷等操作封在了裡面。該接口有三個實現類:

1、BatchEventProcessor

<code>public final class BatchEventProcessor implements EventProcessor {
private final AtomicBoolean running = new AtomicBoolean(false);
private ExceptionHandler super T> exceptionHandler = new FatalExceptionHandler();
private final DataProvider dataProvider;
private final SequenceBarrier sequenceBarrier;
private final EventHandler super T> eventHandler;
private final Sequence sequence = new Sequence( Sequencer.INITIAL_CURSOR_VALUE);
private final TimeoutHandler timeoutHandler;
//exclude method
}
複製代碼
/<code>
  • ExceptionHandler:異常處理器;
  • DataProvider:數據來源,對應 RingBuffer;
  • EventHandler:處理 Event 的回調對象;
  • SequenceBarrier:對應的序號屏障;
  • TimeoutHandler:超時處理器,默認情況為空,如果要設置,只需要要將關聯的 EventHandler 實現 TimeOutHandler 即可;

如果我們選擇使用 EventHandler 的時候,默認使用的就是 BatchEventProcessor,它與 EventHandler 是一一對應,並且是單線程執行。

如果某個 RingBuffer 有多個 BatchEventProcessor,那麼就會每個 BatchEventProcessor 對應一個線程。

2、WorkProcessor

<code>public final class WorkProcessor implements EventProcessor {
private final AtomicBoolean running = new AtomicBoolean(false);
private final Sequence sequence = new Sequence(Sequencer.INITIAL_CURSOR_VALUE);
private final RingBuffer ringBuffer;
private final SequenceBarrier sequenceBarrier;
private final WorkHandler super T> workHandler;
private final ExceptionHandler super T> exceptionHandler;
private final Sequence workSequence;

private final EventReleaser eventReleaser = new EventReleaser() {
@Override
public void release() {
sequence.set(Long.MAX_VALUE);
}
};
private final TimeoutHandler timeoutHandler;
}
複製代碼
/<code>

基本和 BatchEventProcessor 類似,不同在於用於處理 Event 的回調對象是 WorkHandler。

原理圖


螞蟻金服分佈式鏈路跟蹤組件SOFATracer中Disruptor 實踐


無消費者情況下,生產者保持生產,但是 remainingCapacity 保持不變。

在寫 Demo 的過程中,本來想通過不設定消費者來觀察 RingBuffer 可用容量變化的。但是驗證過程中,一直得不到預期的結果,(注:沒有設置消費者,只有生產者),先看結果:

<code>publish event :0
bufferSie:8
remainingCapacity:8
cursor:0
-------------------------------->
publish event :1
bufferSie:8
remainingCapacity:8
cursor:1
-------------------------------->
publish event :2
bufferSie:8
remainingCapacity:8
cursor:2
-------------------------------->
publish event :3
bufferSie:8
remainingCapacity:8
cursor:3
-------------------------------->
publish event :4
bufferSie:8
remainingCapacity:8
cursor:4
-------------------------------->
publish event :5
bufferSie:8
remainingCapacity:8
cursor:5
-------------------------------->
publish event :6
bufferSie:8
remainingCapacity:8
cursor:6
-------------------------------->
publish event :7
bufferSie:8
remainingCapacity:8
cursor:7
-------------------------------->
publish event :8
bufferSie:8
remainingCapacity:8
cursor:8
-------------------------------->
publish event :9
bufferSie:8
remainingCapacity:8
cursor:9
-------------------------------->

複製代碼/<code>

從結果來看,remainingCapacity 的值應該隨著 發佈的數量 遞減的;但是實際上它並沒有發生任何變化。

來看下 ringBuffer.remainingCapacity() 這個方法:

<code>/**
* Get the remaining capacity for this ringBuffer.
*
* @return The number of slots remaining.
*/
public long remainingCapacity()
{
return sequencer.remainingCapacity();
}
複製代碼/<code>

這裡面又使用 sequencer.remainingCapacity() 這個方法來計算的。上面的例子中使用的是 ProducerType.SINGLE,那來看 SingleProducerSequencer 這個裡面 remainingCapacity 的實現。

<code>@Override
public long remainingCapacity()
{
//上次申請完畢的序列值
long nextValue = this.nextValue;
//計算當前已經消費到的序列值
long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
//當前生產到的序列值
long produced = nextValue;
return getBufferSize() - (produced - consumed);
}
複製代碼/<code>

來解釋下這段代碼的含義:

假設當前 ringBuffer 的 bufferSize 是 8 ;上次申請到的序列號是 5,其實也就是說已經生產過佔用的序列號是5;假設當前已經消費到的序列號是 3,那麼剩餘的容量為: 8-(5-2) = 5。

螞蟻金服分佈式鏈路跟蹤組件SOFATracer中Disruptor 實踐

因為這裡我們可以確定 bufferSize 和 produced 的值了,那麼 remainingCapacity 的結果就取決於 getMinimumSequence 的計算結果了。

<code>public static long getMinimumSequence(final Sequence[] sequences, long minimum)
{
for (int i = 0, n = sequences.length; i < n; i++)
{
long value = sequences[i].get();
minimum = Math.min(minimum, value);
}
return minimum;
}
複製代碼/<code>

這個方法是從 Sequence 數組中獲取最小序列 。如果 sequences 為空,則返回 minimum。回到上一步,看下 sequences 這個數組是從哪裡過來的,它的值在哪裡設置的。

<code>long consumed = Util.getMinimumSequence(gatingSequences, nextValue);
複製代碼/<code>

gatingSequences是 SingleProducerSequencer 父類 AbstractSequencer 中的成員變量:

<code>protected volatile Sequence[] gatingSequences = new Sequence[0];
複製代碼/<code>

gatingSequences 是在下面這個方法裡面來管理的。

<code>/**
* @see Sequencer#addGatingSequences(Sequence...)
*/
@Override
public final void addGatingSequences(Sequence... gatingSequences)
{
SequenceGroups.addSequences(this, SEQUENCE_UPDATER, this, gatingSequences);
}
複製代碼/<code>

這個方法的調用棧向前追溯有這幾個地方調用了:


螞蟻金服分佈式鏈路跟蹤組件SOFATracer中Disruptor 實踐


WorkerPool 來管理多個消費者;hangdlerEventsWith 這個方法也是用來設置消費者的。但是在上面的測試案例中我們是想通過不設定消費者只設定生成者來觀察環形隊列的佔用情況,所以 gatingSequences 會一直是空的,因此在計算時會把 produced 的值作為 minimum 返回。這樣每次計算就相當於:

<code>return getBufferSize() - (produced - produced) === getBufferSize();
複製代碼/<code>

也就驗證了為何在不設定消費者的情況下,remainingCapacity 的值會一直保持不變。

SOFATracer 中 Disruptor 實踐

SOFATracer 中,AsyncCommonDigestAppenderManager 對 Disruptor 進行了封裝,用於處理外部組件的Tracer摘要日誌。該部分藉助 AsyncCommonDigestAppenderManager 的源碼來分析下 SOFATracer 如何使用Disruptor 的。

SOFATracer 中使用了兩種不同的事件模型,一種是 SOFATracer 內部使用的 StringEvent , 一種是外部擴展使用的 SofaTacerSpanEvent。這裡以 SofaTacerSpanEvent 這種事件模型來分析。StringEvent 消息事件模型對應的是 AsyncCommonAppenderManager 類封裝的disruptor。

SofaTracerSpanEvent ( -> LongEvent)

定義消息事件模型,SofaTacerSpanEvent 和前面 Demo 中的 LongEvent 基本結構是一樣的,主要是內部持有的消息數據不同,LongEvent 中是一個 long 類型的數據,SofaTacerSpanEvent 中持有的是 SofaTracerSpan 。

<code>public class SofaTracerSpanEvent {
private volatile SofaTracerSpan sofaTracerSpan;
public SofaTracerSpan getSofaTracerSpan() {
return sofaTracerSpan;
}
public void setSofaTracerSpan(SofaTracerSpan sofaTracerSpan) {
this.sofaTracerSpan = sofaTracerSpan;
}
}
複製代碼/<code>

Consumer ( -> LongEventHandler)

Consumer 是 AsyncCommonDigestAppenderManager 的內部類;實現了 EventHandler 接口,這個 consumer 就是作為消費者存在的。

在 AsyncCommonAppenderManager 中也有一個,這個地方個人覺得可以抽出去,這樣可以使得AsyncCommonDigestAppenderManager/AsyncCommonAppenderManager 的代碼看起來更乾淨。

<code>private class Consumer implements EventHandler<sofatracerspanevent> {
//日誌類型集合,非該集合內的日誌類型將不會被處理
protected Set<string> logTypes = Collections.synchronizedSet(new HashSet<string>());
@Override
public void onEvent(SofaTracerSpanEvent event, long sequence, boolean endOfBatch)
throws Exception {
// 拿到具體的消息數據 sofaTracerSpan
SofaTracerSpan sofaTracerSpan = event.getSofaTracerSpan();
// 如果沒有數據,則不做任何處理
if (sofaTracerSpan != null) {
try {
String logType = sofaTracerSpan.getLogType();
// 驗證當前日誌類型是否可以被當前consumer消費
if (logTypes.contains(logType)) {
// 獲取編碼類型
SpanEncoder encoder = contextEncoders.get(logType);
//獲取 appender
TraceAppender appender = appenders.get(logType);
// 對數據進行編碼處理

String encodedStr = encoder.encode(sofaTracerSpan);
if (appender instanceof LoadTestAwareAppender) {
((LoadTestAwareAppender) appender).append(encodedStr,
TracerUtils.isLoadTest(sofaTracerSpan));
} else {
appender.append(encodedStr);
}
// 刷新緩衝區,日誌輸出
appender.flush();
}
} catch (Exception e) {
// 異常省略
}
}
}

public void addLogType(String logType) {
logTypes.add(logType);
}
}
複製代碼/<string>/<string>/<sofatracerspanevent>/<code>

SofaTracerSpanEventFactory (-> LongEventFactory)

用於產生消息事件的 Factory。

<code>public class SofaTracerSpanEventFactory implements EventFactory<sofatracerspanevent> {
@Override
public SofaTracerSpanEvent newInstance() {
return new SofaTracerSpanEvent();
}
}
複製代碼/<sofatracerspanevent>/<code>

ConsumerThreadFactory (-> LongEventThreadFactory )

用來產生消費線程的 Factory。

<code>public class ConsumerThreadFactory implements ThreadFactory {
private String workName;
public String getWorkName() {
return workName;
}
public void setWorkName(String workName) {
this.workName = workName;
}

@Override
public Thread newThread(Runnable runnable) {
Thread worker = new Thread(runnable, "Tracer-AsyncConsumer-Thread-" + workName);
worker.setDaemon(true);
return worker;
}
}
複製代碼/<code>

構建 Disruptor

Disruptor 的構建是在 AsyncCommonDigestAppenderManager 的構造函數中完成的。

<code>public AsyncCommonDigestAppenderManager(int queueSize, int consumerNumber) {
// 使用這個計算來保證realQueueSize是2的次冪(返回當前 大於等於queueSize的最小的2的次冪數 )
int realQueueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
//構建disruptor,使用的是 ProducerType.MULTI
//等待策略是 BlockingWaitStrategy
disruptor = new Disruptor<sofatracerspanevent>(new SofaTracerSpanEventFactory(),
realQueueSize, threadFactory, ProducerType.MULTI, new BlockingWaitStrategy());
//消費者列表
this.consumers = new ArrayList<consumer>(consumerNumber);

for (int i = 0; i < consumerNumber; i++) {
Consumer consumer = new Consumer();
consumers.add(consumer);
//設置異常處理程序
disruptor.setDefaultExceptionHandler(new ConsumerExceptionHandler());
//綁定消費者
disruptor.handleEventsWith(consumer);
}

//是否允許丟棄,從配置文件獲取
this.allowDiscard = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_ALLOW_DISCARD, DEFAULT_ALLOW_DISCARD));

if (allowDiscard) {
   //是否記錄丟失日誌的數量
this.isOutDiscardNumber = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_NUMBER,

DEFAULT_IS_OUT_DISCARD_NUMBER));
//是否記錄丟失日誌的TraceId和RpcId
this.isOutDiscardId = Boolean.parseBoolean(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_IS_OUT_DISCARD_ID,
DEFAULT_IS_OUT_DISCARD_ID));
//丟失日誌的數量達到該閾值進行一次日誌輸出
this.discardOutThreshold = Long.parseLong(SofaTracerConfiguration.getProperty(
SofaTracerConfiguration.TRACER_ASYNC_APPENDER_DISCARD_OUT_THRESHOLD,
DEFAULT_DISCARD_OUT_THRESHOLD));
if (isOutDiscardNumber) {
this.discardCount = new PaddedAtomicLong(0L);
}
}
}
複製代碼/<consumer>/<sofatracerspanevent>/<code>

啟動 Disruptor

Disruptor 的啟動委託給了 AsyncCommonDigestAppenderManager 的 start 方法來執行。

<code>public void start(final String workerName) {
this.threadFactory.setWorkName(workerName);
this.ringBuffer = this.disruptor.start();
}
複製代碼/<code>

來看下,SOFATracer 中具體是在哪裡調用這個 start 的:


螞蟻金服分佈式鏈路跟蹤組件SOFATracer中Disruptor 實踐


  • CommonTracerManager : 這個裡面持有了 AsyncCommonDigestAppenderManager 類的一個單例對象,並且是 static 靜態代碼塊中調用了 start 方法;這個用來輸出普通日誌;
  • SofaTracerDigestReporterAsyncManager:這裡類裡面也是持有了AsyncCommonDigestAppenderManager 類的一個單例對像,並且提供了 getSofaTracerDigestReporterAsyncManager 方法來獲取該單例,在這個方法中調用了 start 方法;該對象用來輸出摘要日誌;

發佈事件

前面的 Demo 中是通過一個 for 循環來發布事件的,在 SOFATracer 中的事件發佈無非就是當有 Tracer 日誌需要輸出時會觸發發佈,那麼對應的就是日誌的 append 操作,將日誌 append 到環形緩衝區。

<code>public boolean append(SofaTracerSpan sofaTracerSpan) {
long sequence = 0L;
//是否允許丟棄
if (allowDiscard) {
try {
//允許丟棄就使用tryNext嘗試申請序列,申請不到拋出異常
sequence = ringBuffer.tryNext();
} catch (InsufficientCapacityException e) {
//是否輸出丟失日誌的TraceId和RpcId
if (isOutDiscardId) {
SofaTracerSpanContext sofaTracerSpanContext = sofaTracerSpan
.getSofaTracerSpanContext();
if (sofaTracerSpanContext != null) {
SynchronizingSelfLog.warn("discarded tracer: traceId["
+ sofaTracerSpanContext.getTraceId()

+ "];spanId[" + sofaTracerSpanContext.getSpanId()
+ "]");
}
}
//是否輸出丟失日誌的數量
if ((isOutDiscardNumber) && discardCount.incrementAndGet() == discardOutThreshold) {
discardCount.set(0);
if (isOutDiscardNumber) {
SynchronizingSelfLog.warn("discarded " + discardOutThreshold + " logs");
}
}

return false;
}
} else {
// 不允許丟棄則使用next方法
sequence = ringBuffer.next();
}

try {
SofaTracerSpanEvent event = ringBuffer.get(sequence);
event.setSofaTracerSpan(sofaTracerSpan);
} catch (Exception e) {
SynchronizingSelfLog.error("fail to add event");
return false;
}
//發佈
ringBuffer.publish(sequence);
return true;
}
複製代碼/<code>

SOFATracer 事件發佈的調用邏輯:


螞蟻金服分佈式鏈路跟蹤組件SOFATracer中Disruptor 實踐


追溯調用的流程,可以知道當前 span 調用 finish 時或者 SOFATracer 中調用 reportSpan 時就相當於發佈了一個消息事件。

小結

本文對 SOFATracer 中使用 Disruptor 來進行日誌輸出的代碼進行了簡單的分析,更多內部細節原理可以自行看下SOFATracer 的代碼。SOFATracer 作為一種比較底層的中間件組件,在實際的業務開發中基本是無法感知的。但是作為技術來學習,還是有很多點可以挖一挖。


分享到:


相關文章: