ElasticSearch Aggregations GroupBy 實現源碼分析

準備工作

  • 為了方便調試,我對索引做了如下配置
{
"mappings": {
"my_type": {
"properties": {
"newtype": {
"type": "string",
"index": "not_analyzed"
},
"num": {
"type": "integer"
}
}
}
},
"settings" : {
"index" : {
"number_of_shards" : 1,
"number_of_replicas" : 0
}
}
}

這樣只有一個分片,方便IDE的跟蹤,也算是個看源碼的技巧

  • 數據
{
"user" : "kimchy",
"post_date" : "2009-11-15T14:12:12",
"newtype": "abc",
"message" : "trying out Elasticsearch",
"num" : 10
}

查詢語句

假定的查詢如下:

{
"from": 0,
"size": 0,

"_source": {
"includes": [
"AVG"
],
"excludes": []
},
"aggregations": {
"newtype": {
"terms": {
"field": "newtype",
"size": 200
},
"aggregations": {
"AVG(num)": {
"avg": {
"field": "num"
}
}
}
}
}
}

其語義類似這個sql 語句:

SELECT avg(num) FROM twitter group by newtype

也就是按newtype 字段進行group by,然後對num求平均值。在我們實際的業務系統中,這種統計需求也是最多的。

Phase概念

在查詢過程中,ES是將整個查詢分成幾個階段的,大體如下:

  • QueryPhase
  • rescorePhase
  • suggestPhase
  • aggregationPhase
  • FetchPhase

對於全文檢索,可能還有DFSPhase。

順帶提一點,Spark SQL + ES 的組合,最影響響應時間的地方其實是Fetch original source 。

而對於這些Phase,並不是一個鏈路的模式,而是在某個Phase調用另外一個Phase。這個在源碼中也很明顯,我們看如下一段代碼:

 //創建聚合需要的AggregationContext,
//裡面包含了各個Aggregator
aggregationPhase.preProcess(searchContext);

//實際query,還有聚合操作其實是在這部完成的
boolean rescore = execute(searchContext, searchContext.searcher());

//如果是全文檢索,並且需要打分
if (rescore) { // only if we do a regular search
rescorePhase.execute(searchContext);
}
suggestPhase.execute(searchContext);
//獲取聚合結果
aggregationPhase.execute(searchContext);
}

Aggregation的相關概念

要了解具體是如何實現聚合功能的,則需要了解ES 的aggregator相關的概念。大體有五個:

  • AggregatorFactory (典型的工廠模式)負責創建Aggregator實例
  • Aggregator (負責提供collector,並且提供具體聚合邏輯的類)
  • Aggregations (聚合結果)
  • PipelineAggregator (對聚合結果進一步處理)
  • Aggregator 的嵌套,比如 示例中的AvgAggregator 就是根據GlobalOrdinalsStringTermsAggregator 的以bucket為維度,對相關數據進行操作.這種嵌套結構也是
  • Bucket 其實就是被groupBy 字段的數字表示形式。用數字表示,可以節省對應字段列式存儲的空間,並且提高性能。

Aggregations 實現的機制

我們知道,無論檢索亦或是聚合查詢,本質上都需要轉化到Lucene裡的Collector,以上面的案例為例,其實由兩個Collector 完成最後的計算:

  • TotalHitCountCollecotr
  • GlobalOrdinalsStringTermsAggregator(裡面還有個Aggregator)

因為我們沒有定義過濾條件,所以最後的Query 是個MatchAllQuery,之後基於這個基礎上,這兩個collector 完成對應的計算。通常,這兩個Collector 會被wrap成一個新的MultiCollector ,最終傳入IndexSearcher的Collector 就是MultiCollector。

根據上面的分析,我們知道示例中的聚合計算完全由GlobalOrdinalsStringTermsAggregator負責。

基於DocValues實現groupBy概覽

對於每一個segment,我們都會為每個列單獨存儲成一個文件,為了壓縮,我們可能會將裡面具體的值轉換成數字,然後再形成一個字典和數字對應關係的文件。我們進行所謂的groupBy操作,以最後進行Avg為例子,其實就是維護了兩個大數組,

LongArray counts;//Long數組
DoubleArray sums; //Double 數組

counts是newtype(我們例子中被groupby的字段)次數統計,對應的數組下標是newtype(我們已經將newtype轉化為數字表示了)。我們遍歷文檔的時候(MatchAllQuery),可以獲取doc,然後根據doc到列存文件獲取對應的newtype,然後給counts 對應的newtype +1。 這樣我們就知道每個newtype 出現的次數了。

這裡我們也可以看到,消耗內存的地方取決於newtype的數量(distinct後),我們稱之為基數。基數過高的話,是比較消耗內存的。

sums 也是一樣的,下標是newtype的值,而對應的值則是不斷累加num(我們例子中需要被avg的字段)。

之後就可以遍歷兩個數組得到結果了,代碼大體如下:

//這裡的owningBucketOrd 就是newtype 的數字化表示
public double metric(long owningBucketOrd) {

if (valuesSource == null || owningBucketOrd >= sums.size()) {
return Double.NaN;
}
return sums.get(owningBucketOrd) / counts.get(owningBucketOrd);
}

GlobalOrdinalsStringTermsAggregator/AvgAggregator組合實現

GlobalOrdinalsStringTermsAggregator 首先要提供一個Collector 給主流程,所以其提供了一個newCollector方法:

protected LeafBucketCollector newCollector(
//DocValue 列式存儲的一個API表現
final RandomAccessOrds ords,
//AvgAggregator提供的Collector
final LeafBucketCollector sub)

接著判定是不是隻有一個列文件(DocValues):

final SortedDocValues singleValues = DocValues.unwrapSingleton(words);
//如果singleValues!=null 則是一個,否則有多個列文件

如果是一個的話:

public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
final int ord = singleValues.getOrd(doc);
if (ord >= 0) {
collectExistingBucket(sub, doc, ord);
}
}
//collectExistingBucket
public final void collectExistingBucket(LeafBucketCollector subCollector, int doc, long bucketOrd) throws IOException {
docCounts.increment(bucketOrd, 1);
subCollector.collect(doc, bucketOrd);
}

通過doc 拿到ord(newtype),然後交給Avg的collector 接著處理,進入AvgAggregator 裡的Collector的collect邏輯:

public void collect(int doc, long bucket) throws IOException {
counts = bigArrays.grow(counts, bucket + 1);
sums = bigArrays.grow(sums, bucket + 1);

values.setDocument(doc);
final int valueCount = values.count();
counts.increment(bucket, valueCount);
double sum = 0;
for (int i = 0; i < valueCount; i++) {
sum += values.valueAt(i);
}
sums.increment(bucket, sum);
}

這個和我上面的概述中描述是一致的。

如果是多個DocValues(此時索引還沒有對那些Segment做合併),這個時候會走下面的流程:

public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
ords.setDocument(doc);
final int numOrds = ords.cardinality();
for (int i = 0; i < numOrds; i++) {
final long globalOrd = ords.ordAt(i);
collectExistingBucket(sub, doc, globalOrd);
}
}

這裡的ords 包括了多個DocValues文件,然後做了全局映射,因為要把文件的下標做映射。為啥要有下標映射呢?因為多個列文件(DocValues)的全集才具有完整的newtype,但是每個列文件都是從0開始遞增的。現在要擴張到一個global的空間上。 ords.cardinality()
拿到了列文件(DocValues)的數目,然後對每個文件都處理一遍,通過ords.ordAt(i) 拿到newtype的全局下標,這個時候就可以繼續交給Avg完成了。

到這個階段,我們其實已經算好了每個newtype 出現的次數,以及num的累計值,也就是我們前面提到的兩個數組。

BuildAggregation

最終我們是要把這個數據輸出輸出的,不論是輸出給別的ES節點,還是直接輸出給調用方。所以有個BuildAggregation的過程,可以根據名字進行直觀的瞭解。

考慮到內存問題,ES允許你設置一些Threshhold,然後通過BucketPriorityQueue(優先隊列)來完成實際的數據收集以及排序(默認按文檔出現次數排序)。 裡面的元素是OrdBucket,OrdBucket包含了幾個值:

globalOrd: 全局下標
bucketOrd: 在所屬文件裡的下標
docCount : 文檔出現的次數

接著取出 topN 的對象,放到InternalTerms.Bucket[] 數組中。然後遍歷該數組,調用子Aggregator的buildAggregation方法,這裡的子Aggregator是AvgAggregator ,每個Bucket(newtype)就獲取到一個avg aggregations了,該aggregations通過InternalAggregations 包裹,InternalAggregations 包含了一個reduce 方法,該方法會調用具體InternalAggregation的doReduce 方法,比如AvgAggregator就有自己的reduce方法。說這個主要給下一小結做鋪墊。

最後會被包裝成StringTerms ,然後就可以序列化成JSON格式,基本就是你在接口上看到的樣子了。

多分片聚合結果合併

前面我們討論的,都是基於一個分片,但是最終是要把結果數據進行Merge的。 這個功能是由SearchPhaseController 對象來完成,大體如下:

sortedShardList = searchPhaseController.sortDocs(useScroll, firstResults);

final InternalSearchResponse internalResponse = searchPhaseController.merge(sortedShardList, firstResults,
firstResults, request);

其中merge 動作是按分類進行merge的,比如:

  • counter(計數,譬如total_hits)
  • hits
  • aggregations
  • suggest
  • profile (性能相關的數據)

這裡我們只關注aggregations的merge

 // merge addAggregation
InternalAggregations aggregations = null;
if (!queryResults.isEmpty()) {
if (firstResult.aggregations() != null && firstResult.aggregations().asList() != null) {
List<internalaggregations> aggregationsList = new ArrayList<>(queryResults.size());
for (AtomicArray.Entry extends QuerySearchResultProvider> entry : queryResults) {
aggregationsList.add((InternalAggregations) entry.value.queryResult().aggregations());
}
aggregations = InternalAggregations.reduce(aggregationsList, new ReduceContext(bigArrays,/> }
}

/<internalaggregations>

代碼有點長,核心是

InternalAggregations.reduce(.....)

裡面實際的邏輯也是比較簡單直觀的。會調用InternalTerms的reduce方法做merge,但是不同的類型的Aggregator產生Aggregations 合併邏輯是不一樣的,所以會委託給對應實現。比如GlobalOrdinalsStringTermsAggregator則會委託給InternalTerms的doReduce方法,而如AvgAggregator
會委託給InternalAvg的doReduce。 這裡就不展開。未來會單獨出一片文章講解。

附錄

這裡我們再額外講講ValueSource (ES 對FieldData/DocValues的抽象)。

前文我們提到,大部分Aggregator 都是依賴於FieldData/DocValues 來實現的,而ValueSource 則是他們在ES裡的表示。所以瞭解他們是很有必要的。ValuesSource 全類名是:

 org.elasticsearch.search.aggregations.support.ValuesSource

該類就是ES 為了管理 DocValues 而封裝的。它是一個抽象類,內部還有很多實現類,Bytes,WithOrdinals,FieldData,Numeric,LongValues 等等。這些都是對特定類型DocValues類型的 ES 表示。

按上面我們的查詢示例來看,newtype 字段對應的是

 org.elasticsearch.search.aggregations.support.ValuesSource.Bytes.WithOrdinals.FieldData

對象。這個對象是ES對Lucene String 類型的DocValues的一個表示。
你會發現在ValueSource類裡,有不同的FieldData。不同的FieldData 可能繼承自不同基類從而表示不同類型的數據。在現在這個FieldData 裡面有一個對象:

protected final IndexOrdinalsFieldData indexFieldData;

該對象在newtype(我們示例中的字段)是String類型的時候,對應的是實現類是

org.elasticsearch.index.fielddata.plain.SortedSetDVOrdinalsIndexFieldData

該對象的大體作用是,構建出DocValue的ES的Wraper。

具體代碼如下:

@Overridepublic AtomicOrdinalsFieldData load(LeafReaderContext context) { 
return new SortedSetDVBytesAtomicFieldData(
context.reader(),
fieldNames.indexName());
}
//或者通過loadGlobal方法得到
//org.elasticsearch.index.fielddata.ordinals.InternalGlobalOrdinalsIndexFieldData

以第一種情況為例,上面的代碼new 了一個新的org.elasticsearch.index.fielddata.AtomicOrdinalsFieldData對象,該對象的一個實現類是SortedSetDVBytesAtomicFieldData。 這個對象和Lucene的DocValues 完成最後的對接:

 @Override
public RandomAccessOrds getOrdinalsValues() {
try {
return FieldData.maybeSlowRandomAccessOrds(DocValues.getSortedSet(reader, field));
} catch (IOException e) {
throw new IllegalStateException("cannot load docvalues", e);
}
}

我們看到,通過Reader獲取到最後的列就是在該類裡的getOrdinalsValues 方法裡實現的。

該方法最後返回的RandomAccessOrds 就是Lucene的DocValues實現了。

分析了這麼多,所有的邏輯就濃縮在getLeafCollector的第一行代碼上。globalOrds 的類型是RandomAccessOrds,並且是直接和Lucene對應上了。

globalOrds = valuesSource.globalOrdinalsValues(cox);

getLeafCollector 最後newCollector的規則如下:

 protected LeafBucketCollector newCollector(final RandomAccessOrds ords, final LeafBucketCollector sub) {
grow(ords.getValueCount());
final SortedDocValues singleValues = DocValues.unwrapSingleton(ords);
if (singleValues != null) {
return new LeafBucketCollectorBase(sub, ords) {
@Override
public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
final int ord = singleValues.getOrd(doc);
if (ord >= 0) {
collectExistingBucket(sub, doc, ord);
}
}
};
}

我們知道,在Lucene裡,大部分文件都是不可更新的。一個段一旦生成後就是不可變的,新的數據或者刪除數據都需要生成新的段。DocValues的存儲文件也是類似的。所以DocValues.unwrapSingleton其實就是做這個判定的,是不是有多個文件 。無論是否則不是都直接創建了一個匿名的Collector。

當個文件的很好理解,包含了索引中newtype字段所有的值,其下標獲取也很自然。

//singleValues其實就是前面的RandomAccessOrds。
final int ord = singleValues.getOrd(doc);

根據文檔號獲取值對應的位置,如果ord >=0 則代表有值,否則代表沒有值。

如果有多個文件,則會返回如下的Collecor:

else {
return new LeafBucketCollectorBase(sub, ords) {
@Override
public void collect(int doc, long bucket) throws IOException {
assert bucket == 0;
ords.setDocument(doc);
final int numOrds = ords.cardinality();
for (int i = 0; i < numOrds; i++) {
final long globalOrd = ords.ordAt(i);
collectExistingBucket(sub, doc, globalOrd);
}
}
};

上面的代碼可以保證多個文件最終合起來保持一個文件的序號。什麼意思呢?比如A文件有一個文檔,B文件有一個,那麼最終獲取的globalOrd 就是0,1 而不會都是0。此時的 ords 實現類 不是SingletonSortedSetDocValues 而是

org.elasticsearch.index.fielddata.ordinals.GlobalOrdinalMapping

對象了。

計數的方式兩個都大體類似。

docCounts.increment(bucketOrd, 1);

這裡的bucketOrd 其實就是前面的ord/globalOrd。所以整個計算就是填充docCounts

總結

ES的 Aggregation機制還是挺複雜的。本文試圖通過一個簡單的group by 的例子來完成對其機制的解釋。其中ValueSource 那層我目前也沒沒完全吃透,如有表述不合適的地方,歡迎大家指出。

ElasticSearch Aggregations GroupBy 實現源碼分析



分享到:


相關文章: