Apache Kafka在大型應用中的20項最佳實踐

Apache Kafka是一款流行的分佈式數據流平臺,它已經廣泛地被諸如New Relic(數據智能平臺)、Uber、Square(移動支付公司)等大型公司用來構建可擴展的、高吞吐量的、且高可靠的實時數據流系統。

Apache Kafka在大型應用中的20項最佳實踐

Apache Kafka是一款流行的分佈式數據流平臺,它已經廣泛地被諸如New Relic(數據智能平臺)、Uber、Square(移動支付公司)等大型公司用來構建可擴展的、高吞吐量的、且高可靠的實時數據流系統。例如,在New Relic的生產環境中,Kafka群集每秒能夠處理超過1500萬條消息,而且其數據聚合率接近1 Tbps。

可見,Kafka大幅簡化了對於數據流的處理,因此它也獲得了眾多應用開發人員和數據管理專家的青睞。然而,在大型系統中Kafka的應用會比較複雜。如果您的consumers無法跟上數據流的話,各種消息往往在未被查看之前就已經消失掉了。同時,它在自動化數據保留方面的限制,高流量的發佈+訂閱(publish-subscribe,pub/sub)模式等,可能都會影響到您系統的性能。可以毫不誇張地說,如果那些存放著數據流的系統無法按需擴容、或穩定性不可靠的話,估計您經常會寢食難安了。

為了減少上述複雜性,我在此分享New Relic公司為Kafka集群在應對高吞吐量方面的20項最佳實踐。我將從如下四個方面進行展開:

• Partitions(分區)

• Consumers(消費者)

• Producers(生產者)

• Brokers(代理)

快速瞭解Kafka的概念與架構

Kafka是一種高效的分佈式消息系統。在性能上,它具有內置的數據冗餘度與彈性,也具有高吞吐能力和可擴展性。在功能是,它支持自動化的數據保存限制,能夠以“流”的方式為應用提供數據轉換,以及按照“鍵-值(key-value)”的建模關係“壓縮”數據流。

要了解各種最佳實踐,您需要首先熟悉如下關鍵術語:

• Message(消息):Kafka中的一條記錄或數據單位。每條消息都有一個鍵和對應的一個值,有時還會有可選的消息頭。

• Producer(生產者):producer將消息發佈到Kafka的topics上。producer決定向topic分區的發佈方式,如:輪詢的隨機方法、或基於消息鍵(key)的分區算法。

• Broker(代理):Kafka以分佈式系統或集群的方式運行。那麼群集中的每個節點稱為一個broker。

• Topic(主題):topic是那些被髮布的數據記錄或消息的一種類別。消費者通過訂閱topic,來讀取寫給它們的數據。

• Topic partition(主題分區):不同的topic被分為不同的分區,而每一條消息都會被分配一個offset,通常每個分區都會被複制至少一到兩次。每個分區都有一個leader和存放在各個follower上的一到多個副本(即:數據的副本),此法可防止某個broker的失效。群集中的所有broker都可以作為leader和follower,但是一個broker最多隻能有一個topic partition的副本。Leader可被用來進行所有的讀寫操作。

• Offset(偏移量):單個分區中的每一條消息都被分配一個offset,它是一個單調遞增的整型數,可用來作為分區中消息的唯一標識符。

• Consumer(消費者):consumer通過訂閱topic partition,來讀取Kafka的各種topic消息。然後,消費類應用處理會收到消息,以完成指定的工作。

• Consumer group(消費組):consumer可以按照consumer group進行邏輯劃分。topic partition被均衡地分配給組中的所有consumers。因此,在同一個consumer group中,所有的consumer都以負載均衡的方式運作。換言之,同一組中的每一個consumer都能看到每一條消息。如果某個consumer處於“離線”狀態的話,那麼該分區將會被分配給同組中的另一個consumer。這就是所謂的“再均衡(rebalance)”。當然,如果組中的consumer多於分區數,則某些consumer將會處於閒置的狀態。相反,如果組中的consumer少於分區數,則某些consumer會獲得來自一個以上分區的消息。

• Lag(延遲):當consumer的速度跟不上消息的產生速度時,consumer就會因為無法從分區中讀取消息,而產生延遲。延遲表示為分區頭後面的offset數量。從延遲狀態(到“追趕上來”)恢復正常所需要的時間,取決於consumer每秒能夠應對的消息速度。其公式如下:

time = messages / (consume rate per second - produce rate per second)

針對Partitions的最佳實踐

• 瞭解分區的數據速率,以確保提供合適的數據保存空間。此處所謂“分區的數據速率”是指數據的生成速率。換言之,它是由“平均消息大小”乘以“每秒消息數”得出的。數據速率決定了在給定時間內,所能保證的數據保存空間的大小(以字節為單位)。如果您不知道數據速率的話,則無法正確地計算出滿足基於給定時間跨度的數據,所需要保存的空間大小。同時,數據速率也能夠標識出單個consumer在不產生延時的情況下,所需要支持的最低性能值。

• 除非您有其他架構上的需要,否則在寫topic時請使用隨機分區。在您進行大型操作時,各個分區在數據速率上的參差不齊是非常難以管理的。其原因來自於如下三個方面:

• 首先,“熱”(有較高吞吐量)分區上的consumer勢必會比同組中的其他consumer處理更多的消息,因此很可能會導致出現在處理上和網絡上的瓶頸。

• 其次,那些為具有最高數據速率的分區,所配置的最大保留空間,會導致topic中其他分區的磁盤使用量也做相應地增長。

• 第三,根據分區的leader關係所實施的最佳均衡方案,比簡單地將leader關係分散到所有broker上,要更為複雜。在同一topic中,“熱”分區會“承載”10倍於其他分區的權重。

有關topic partition的使用,您可以參閱《Kafka Topic Partition的各種有效策略》(https://blog.newrelic.com/engineering/effective-strategies-kafka-topic-partitioning/),以瞭解更多。

針對Consumers的最佳實踐

• 如果consumers運行的是比Kafka 0.10還要舊的版本,那麼請馬上升級。在0.8.x 版中,consumer使用Apache ZooKeeper來協調consumer group,而許多已知的bug會導致其長期處於再均衡狀態,或是直接導致再均衡算法的失敗(我們稱之為“再均衡風暴”)。因此在再均衡期間,一個或多個分區會被分配給同一組中的每個consumer。而在再均衡風暴中,分區的所有權會持續在各個consumers之間流轉,這反而阻礙了任何一個consumer去真正獲取分區的所有權。

• 調優consumer的套接字緩衝區(socket buffers),以應對數據的高速流入。在Kafka的0.10.x版本中,參數receive.buffer.bytes的默認值為64 kB。而在Kafka的0.8.x版本中,參數socket.receive.buffer.bytes的默認值為100 kB。這兩個默認值對於高吞吐量的環境而言都太小了,特別是如果broker和consumer之間的網絡帶寬延遲積(bandwidth-delay product)大於局域網(local area network,LAN)時。對於延遲為1毫秒或更多的高帶寬的網絡(如10 Gbps或更高),請考慮將套接字緩衝區設置為8或16 MB。如果您的內存不足,也至少考慮設置為1 MB。當然,您也可以設置為-1,它會讓底層操作系統根據網絡的實際情況,去調整緩衝區的大小。但是,對於需要啟動“熱”分區的consumers來說,自動調整可能不會那麼快。

• 設計具有高吞吐量的consumers,以便按需實施背壓(back-pressure)。通常,我們應該保證系統只去處理其能力範圍內的數據,而不要超負荷“消費”,進而導致進程中斷“掛起”,或出現consume group的溢出。如果是在Java虛擬機(JVM)中運行,consumers應當使用固定大小的緩衝區(請參見Disruptor模式:http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf),而且最好是使用堆外內存(off-heap)。固定大小的緩衝區能夠阻止consumer將過多的數據拉到堆棧上,以至於JVM花費掉其所有的時間去執行垃圾回收,進而無法履行其處理消息的本質工作。

• 在JVM上運行各種consumers時,請警惕垃圾回收對它們可能產生的影響。例如,長時間垃圾回收的停滯,可能導致ZooKeeper的會話被丟棄、或consumer group處於再均衡狀態。對於broker來說也如此,如果垃圾回收停滯的時間太長,則會產生集群掉線的風險。

針對Producers的最佳實踐

• 配置producer,以等待各種確認。籍此producer能夠獲知消息是否真正被髮送到了broker的分區上。在Kafka的0.10.x版本上,其設置是acks;而在0.8.x版本上,則為request.required.acks。Kafka通過複製,來提供容錯功能,因此單個節點的故障、或分區leader關係的更改不會影響到系統的可用性。如果您沒有用acks來配置producer(或稱“fire and forget”)的話,則消息可能會悄然丟失。

• 為各個producer配置retries。其默認值為3,當然是非常低的。不過,正確的設定值取決於您的應用程序,即:就那些對於數據丟失零容忍的應用而言,請考慮設置為Integer.MAX_VALUE(有效且最大)。這樣將能夠應對broker的leader分區出現無法立刻響應produce請求的情況。

• 為高吞吐量的producer,調優緩衝區的大小,特別是buffer.memory和batch.size(以字節為單位)。由於batch.size是按照分區設定的,而producer的性能和內存的使用量,都可以與topic中的分區數量相關聯。因此,此處的設定值將取決於如下幾個因素:producer數據速率(消息的大小和數量)、要生成的分區數、以及可用的內存量。請記住,將緩衝區調大並不總是好事,如果producer由於某種原因而失效了(例如,某個leader的響應速度比確認還要慢),那麼在堆內內存(on-heap)中的緩衝的數據量越多,其需要回收的垃圾也就越多。

• 檢測應用程序,以跟蹤諸如生成的消息數、平均消息大小、以及已使用的消息數等指標。

針對Brokers的最佳實踐

• 在各個brokers上,請壓縮topics所需的內存和CPU資源。日誌壓縮(請參見https://kafka.apache.org/documentation/#compaction)需要各個broker上的堆棧(內存)和CPU週期都能成功地配合實現。而如果讓那些失敗的日誌壓縮數據持續增長的話,則會給brokers分區帶來風險。您可以在broker上調整log.cleaner.dedupe.buffer.size和log.cleaner.threads這兩個參數,但是請記住,這兩個值都會影響到各個brokers上的堆棧使用。如果某個broker拋出OutOfMemoryError異常,那麼它將會被關閉、並可能造成數據的丟失。而緩衝區的大小和線程的計數,則取決於需要被清除的topic partition數量、以及這些分區中消息的數據速率與密鑰的大小。對於Kafka的0.10.2.1版本而言,通過ERROR條目來監控日誌清理程序的日誌文件,是檢測其線程可能出現問題的最可靠方法。

• 通過網絡吞吐量來監控brokers。請監控發向(transmit,TX)和收向(receive,RX)的流量,以及磁盤的I/O、磁盤的空間、以及CPU的使用率,而且容量規劃是維護群集整體性能的關鍵步驟。

• 在群集的各個brokers之間分配分區的leader關係。Leader通常會需要大量的網絡I/O資源。例如,當我們將複製因子(replication factor)配置為3、並運行起來時,leader必須首先獲取分區的數據,然後將兩套副本發送給另兩個followers,進而再傳輸到多個需要該數據的consumers上。因此在該例子中,單個leader所使用的網絡I/O,至少是follower的四倍。而且,leader還可能需要對磁盤進行讀操作,而follower只需進行寫操作。

• 不要忽略監控brokers的in-sync replica(ISR)shrinks、under-replicated partitions和unpreferred leaders。這些都是集群中潛在問題的跡象。例如,單個分區頻繁出現ISR收縮,則暗示著該分區的數據速率超過了leader的能力,已無法為consumer和其他副本線程提供服務了。

• 按需修改Apache Log4j(https://github.com/apache/kafka/blob/trunk/config/log4j.properties)的各種屬性。Kafka的broker日誌記錄會耗費大量的磁盤空間,但是我們卻不能完全關閉它。因為有時在發生事故之後,需要重建事件序列,那麼broker日誌就會是我們最好的、甚至是唯一的方法。

• 禁用topic的自動創建,或針對那些未被使用的topics建立清除策略。例如,在設定的x天內,如果未出現新的消息,您應該考慮該topic是否已經失效,並將其從群集中予以刪除。此舉可避免您花時間去管理群集中被額外創建的元數據。

• 對於那些具有持續高吞吐量的brokers,請提供足夠的內存,以避免它們從磁盤子系統中進行讀操作。我們應儘可能地直接從操作系統的緩存中直接獲取分區的數據。然而,這就意味著您必須確保自己的consumers能夠跟得上“節奏”,而對於那些延遲的consumer就只能強制broker從磁盤中讀取了。

• 對於具有高吞吐量服務級別目標(service level objectives,SLOs)的大型群集,請考慮為brokers的子集隔離出不同的topic。至於如何確定需要隔離的topics,則完全取決於您自己的業務需要。例如,您有一些使用相同群集的聯機事務處理(multiple online transaction processing,OLTP)系統,那麼將每個系統的topics隔離到不同brokers子集中,則能夠有助於限制潛在事件的影響半徑。

• 在舊的客戶端上使用新的topic消息格式。應當代替客戶端,在各個brokers上加載額外的格式轉換服務。當然,最好還是要儘量避免這種情況的發生。

• 不要錯誤地認為在本地主機上測試好broker,就能代表生產環境中的真實性能了。要知道,如果使用複製因子為1,並在環回接口上對分區所做的測試,是與大多數生產環境截然不同的。在環回接口上網絡延遲幾乎可以被忽略的,而在不涉及到複製的情況下,接收leader確認所需的時間則同樣會出現巨大的差異。

其他資源

希望上述各項建議能夠有助於您更有效地去使用Kafka。如果您想提高自己在Kafka方面的專業知識,請進一步查閱Kafka配套文檔中的“操作”部分,其中包含了有關操作群集等實用信息。


分享到:


相關文章: