Elasticsearch 漫談


在ES中,索引構建和查詢因為沒有做分離,所以他們之間存在著非常激烈的競爭關係,而ES所暴露出來的那無數參數就是調整兩者之間關係的。

Merge ,Shard數,索引數

Merge的影響其實是非常大的。現在大部分存儲系統對於更新和刪除其實都是生成新的文件,並不會直接去更新原來的文件,查詢時對應的Reader會讀取這些文件,從而實現類似合併後的效果。在ES中,Merge由兩部分構成,MergeScheduler和MergePolicy。MergeScheduler控制合併的使用的工作線程以及一次合併多少文件等。MergePolicy則是控制如何進行文件的合併。默認的TireMergePolicy,會生成多個不大於5G的文件。

所以,對於Merge其實我們可以調整MergeScheduler和MergePolicy。對應的你可以在ElasticsearchConcurrentMergeScheduler和MergePolicyConfig兩個類裡看到詳細的可配置參數列表。

Merge有啥影響的?其實它和Shard數的控制也有很大關係。假設我們有100臺服務器,2400顆核,單機24顆核心,那麼默認每個分片會有四個線程用於Merge操作。假設我們有500個分片,那麼Merge可以使用的CPU核數達到了2000個,在一個數據寫入非常頻繁的系統,大部分CPU可能都會被Merge給消耗掉。所以並不是分片越多越好,這裡需要考慮Merge對系統的影響,並且分片越多,那麼用於Bulk的CPU就越多,對Search的性能其實也是有影響的。

在我的實際測試過程中,如果我將分片數設置為服務器數,並且將merge線程設置為1,也就是一個Shard一個merge線程,這種情況下,CPU會有效的降低,並且索引構建性能也能得到一定的提升。我猜測,如果調低 index.merge.policy.max_merged_segment,假設現在設置為1G,那麼將一個2M的新的Segment合併到1G的文件將比原來5G的快得多,消耗的CPU也更少,然而帶來的影響可能是索引查詢性能的下降以及可能導致系統文件句柄的耗盡。

如果一個Shard分片裡的數據過大,那麼譬如聚合查詢的響應時間基本就難以接受了,對於數據規模在五六億的一個分片而言,簡單的groupby 加sum的查詢可能耗時都能夠達到2分鐘,相對而言,Shard的文檔數量在百萬規模,能夠獲得一個較好的查詢響應時間,然而可能依然以秒計。

Shard一多,Merge以及Bulk構建索引消耗的CPU都會變得巨大,讓Search變得愈加困難。而隨著數據量規模的日益龐大,而單個Shard數據量又不宜太大,那麼只能加大Shard數量,這就導致我們陷入了一個困境。

解決上面的問題似乎有兩個簡單而有效的方案:

  1. 讓同一Node實例的Shard共用一個Merge線程池,而不是現在的每個Shard單獨戰友一個Merge線程池。
  2. 將Shard 字段的列式存儲,最好是能夠分成多個block,然後利用其有序性,對每個Block保留Min-Max值,從而在做equal或者range類的過濾時,跳過部分Block,避免時間消耗和Shard的數據量成線性關係。而且如果單個文件,則很難全部緩存起來,無法高效利用系統緩存。

聚合的精度問題

之前我曾經說過,在ES中,

  1. 有些查詢理論上是不精準的,有誤差的,然而大部分場景下卻都是準確的。
  2. 有些理論上是不精準的,有誤差的,實際場景也是有誤差的。

前者如簡單的group by 查詢,後者如distinct類的查詢。那為啥查詢會有誤差呢?比如簡單的groupby查詢,其實是做了一個假設,局部(各個分片的)的topN 放到一起,重新排序得到的topN會是全局的的topN,這種假設大部分場景是正確的。所以有了我上面的第一個結論。而對於比如distinct則使用了hyperloglog++之類的算法,這種算法本來就是一個估算算法,所以他肯定是有誤差的。一般而言百分之幾到千分之幾的誤差。

那為啥ES不能做精準的計算呢?那是因為ES是一個存儲,而不是一個正真意義上的分佈式計算引擎。分佈式計算引擎一定要有一個強大的Reduce能力,而ES目前還只能在單機做Reduce,這就導致它必定受限於單機的內存,所以他必須做一些假設或者採用某種估算算法才能避免內存被耗盡。

和Spark的整合問題

ES-Hadoop基本就是個半成品。為啥說是半成品呢?因為我們確實能夠利用ES-Hadoop項目很好的和Spark做結合,將數據導入到ES中。然而進行查詢的時候,因為ES-Hadoop採用了http協議,通過RestAPI 去獲取ES的數據導入到Spark中做計算,導致加載效率極低。加載效率低的原因其實不僅僅是採用了HTTP協議的緣故(如果換做RPC據說效率有50%以上的提升),還有如:

  1. Scroll API 需要每次重新獲得和過濾候選集,然後得到新批次的數據
  2. Scan後獲得DocId集合,然後fetch _source 是一個隨機讀過程而讓IO性能無法接受

其中影響最大的是fetch _source。 這也是Spark Data source API 帶來的問題,也不能全怪ES。為什麼這麼說呢? 因為Spark Data Source API 依然無法發揮底層存儲的計算能力,它只能下沉(PushDown)一些filter,而無法接受groupby後的結果進行計算,這就導致數據規模下不來。

Task Manager

能夠跑後臺任務對類似ES這種系統是很重要的。現在的ES無法實現把任務丟進去(或者查詢),然後可以異步監控獲取結果。一種比較直觀的場景是,我丟一個SQL進去,類似 insert to newtable from (select * from oldtalbe)這種,然後第二天就可以出結果,然後BI報表讀取newtable就能夠顯示了。這個只是功能的話是比較容易做的,最大的難點是資源的控制,不能說一個query任務就耗盡了所有的資源甚至跑掛了ES。 實際上涉及到兩個點:

  • 資源隔離
  • 任務調度

資源隔離是基礎,否則Task後臺運行就華而不實了。任務調度保證以最優的方式完成多任務的,不然只能一個接著一個跑,可能使用價值也就沒有那麼大了。

要實現資源隔離,只能自己去管理內存,可能需要JVM實現一個TaskMemoryManager的管理器,然後所有task都需要到這裡來申請資源,其實是很複雜的一件事情。

ES-SQL

我們知道 ES是有自己的DSL的,是一個用JSON來定義的查詢語言。寫起來還是比較繁瑣的,而相當一部分功能其實是可以映射到SQL上的。我覺得官方有必要提供對SQL的支持,Solr現在已經做了,但是ES目前還只有第三方在做。在我的視角里,沒有SQL支持的查詢系統,我基本是不考慮的。Spark 提供了那麼多易用的API,然而純SQL還是最好用的。

什麼時候生成Segment(磁盤文件)

在討論這個問題之前,我們先要理解一下文件的寫入過程。當我們打開一個文件描述符往裡面寫入數據的時候,一般而言會寫入文件系統的緩存裡,所以再最後需要fsync一下,強制將所有數據刷入磁盤。那麼對應的,Segment產生也分兩個階段,一個是產生了文件,一個是fsync到磁盤後不再變化了。

我們這裡指的產生Segment就是指已經被commit到磁盤的segment.

Segment這個名詞來自於Lucene,在前面Merge相關的內容裡已經反覆有所提及。Translog是觸發Segment生成一個比較重要的地方,因為他們本來就是起互補作用的。當我們要清空Translog然後打開新的Translog時,就會將現有的數據持久化到Segment裡。所以Translog的配置直接影響了Segment的生成頻率。另外,Translog做Recovery的時候,其實也是會觸發flush動作的,比如做SNAPSHOT。當然,ES也可以通過API手動觸發Flush從而產生Flush動作。

副本

ES副本對索引性能的影響幾乎是100%。 然而目前的機制而言,你是不能去掉副本的,因為一旦發生主片丟失,就不僅僅是已經存在的數據丟失,還包括新的數據部分也無法進入集群。至於為啥影響是100%呢?因為副本和主片都是通過HTTP協議完成的,而不是類似傳統的文件拷貝的方式。在5.0之後有一個優化,就是fsync可以實現異步化,可以有效提高吞吐。

索引速度

隨著ES在數據分析領域的大放異彩,索引速度越來越是個瓶頸。企業似乎也願意投資,使用百臺高性能服務器錄入千億規模數據的大有人在。然而和原生的Lucene的速度相比較,差距仍然是比較大的。那麼速度到底差在哪裡呢?

大體有幾個因子影響了索引的速度:

  1. Translog ,你可以類比MySQL的Binlog
  2. Version,版本檢查
  3. 一些特殊字段,譬如_all,_fieldNames等
  4. Schema Mapping相關的(譬如mapping Dynamic Update)
  5. JSON的解析(ES 交互基本是以JSON為主體的)
  6. Segments 的Merging
  7. Refresh Interval ,索引的刷新週期

Translog

在默認參數下,Translog 寫入的CPU消耗甚至比Lucene 的addDocument 還高兩倍。這點我還是蠻詫異的。Translog也要落磁盤,也需要commit,所以我們可以通過將index.translog.durability設置為async,這樣translog的寫入由默認的每次請求後就執行改成定時(5s)commit一次。這樣帶來的額外好處是減少 Translog寫磁盤的次數,也就了減少了構建索引的消耗。

Translog並不會無限存在,到了一定程度,就需要觸發索引的flush,具體動作是

  1. commit index segment
  2. clear translog
  3. open new translog

那麼這裡就有個需要權衡的地方:

  1. flush的越少,那麼索引性能越高
  2. flush的越少,translog就可能越大,那麼當發生故障時,恢復時間就可能越長。

這裡解釋下translog和故障恢復的關係。當數據進行recovery的時候,大致是如下一個流程:

INIT -> INDEX -> VERIFY_INDEX -> TRANSLOG -> FINALIZE -> DONE

大家如果使用marvel之類的,一般能夠看到INDEX 和 TRANSLOG階段,因為其他階段一般時間都比較短。INDEX其實就是先對索引進行SNAPSHOT,然後將文件拷貝過去,這個期間新增的數據都會寫入到Translog,並且translog會被Hold住而不被刪除。接著驗證沒有問題後就進入TRANSLOG階段進行日誌回放。回放的量取決於INDEX拷貝的時間長短。TRANSLOG回放其實是非常慢的。當然,這個你在做恢復的時候,把灌數據的程序停了就能有效縮短TRANSLOG階段時間。

第二種情況是重新Load某個Shard,比如某個Node被快速重啟了,這個時候因為數據還沒來得及commit成segment就掛了,再次啟動後,丟失的數據就可以從Trasnlog裡恢復了,如果Translog多了,就讓恢復變得很慢。所以在這種情況下,Translog保留多少條就變得很重要了,可以通過參數index.translog.flush_threshold_ops 控制。

當然,前面討論的一些設置讓translog也變得不可靠,一旦產生當機等問題,可能在內存中的translog沒有及時commit到磁盤而導致數據丟失。吞吐和可靠總是存在某種矛盾。

關於Translog的內容,大致就如上了。我覺得Translog的寫入和讀取等還是有優化空間的。這裡再說說5.0裡和Translog有關的一個優化,在ES裡實時Get的話,其實是通過內存中通過docId拿到translog offset ,然後再去拿的,5.0之後不需要這樣了,只要在內存維護最新文檔的docId而不是docId和translog offset的映射關係,然後有請求的話,將數據flush到segment裡然後直接去取。

Version

我們再說說Version機制,Version大致會有一個Map緩存,如果緩存沒有,就會走磁盤。索引Version檢查其實是一個昂貴的操作。如果是時序數據(不變數據),則讓系統auto generate id可以跳過Version檢查,這樣的話對性能也是巨大的提升。

特殊字段

在ES裡有一些特殊字段,比如_all,_fieldNames,_source等。_all性能影響還是比較大的。_source我們一般需要保留,否則會有很多不便,因為無法還原完整的記錄。_all一般而言可以關掉。之前我沒注意到_fieldNames這個字段,通過JProfiler我發現如下的代碼竟然佔了整個Bulk過程CPU的6%左右的消耗。

Elasticsearch 漫談

Snip20161024_35.png

後來一查,發現是為了生成_fieldNames字段的。如果你要追求索引灌入的性能,果斷關掉這個字段吧。

Mapping

ES的Mapping其實消耗也非常大,比如Dynamic update 特性。建議固定好的你Schema,然後在ETL過程中規範你的數據,然後關掉該特性。

JSON

JSON的解析其實是比較慢的,通過性能分析發現,比如StringFieldMapper裡的parseCreateFieldForString方法消耗CPU就特別厲害,仔細一看,

Elasticsearch 漫談

Snip20161024_36.png

裡面其實都是從JSON Parser裡拿出具體的數據呢。這塊似乎並沒有太多辦法。或許如果bulk使用RPC協議能夠更快的原因是因為Http協議使用的是JSON格式。


作者:祝威廉
鏈接:https://www.jianshu.com/p/746c85627448
來源:簡書
著作權歸作者所有。商業轉載請聯繫作者獲得授權,非商業轉載請註明出處。

Elasticsearch 漫談


分享到:


相關文章: