Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

在正式進入乾貨講解前,先簡單說說這以下兩個元件。

Protocol Handler 是在 Pulsar 2.5.0 版本後加入的新機制,希望開發者們能利用 Pulsar 已有的基礎架構,把 Pulsar 當作一個可靠、高效、穩定的流數據存儲,可以利用它去開發一些可插拔消息協議。

所以 Kafka-on-Pulsar 是基於 Protocol Handler 進行開發的,支持 Kafka 2.0 協議的插件。只需下載 KoP 插件並安裝到已有的 Pulsar broker 裡,就能在 Pulsar 裡支持 Kafka 協議。優點是簡化了從 Kafka 遷移到 Pulsar 的流程,不需要更改兩遍代碼,直接無縫遷移。

接下來就一起詳細看看,KoP 的開發歷程吧。

什麼是 Apache Pulsar

Apache Pulsar 是一個事件流平臺。最初,Apache Pulsar 就採用雲原生、分層分片的架構。該架構將服務和存儲分離開來,使系統實現更友好的容器化。

而現在,Pulsar 不僅僅是是一個消息中間件,更是一個消息+流數據結合的系統,即 Cloud-Native Event Streaming。

我們之前寫過很多關於 Pulsar 的具體詳情,感興趣的可以查看 :

Apache Pulsar 介紹

Why KoP?

Plusar 為隊列和流工作負載提供統一的消息模型。Pulsar 支持自己基於 protobuf 的二進制協議,以確保高性能和低延遲。protobuf 有利於實現 Pulsar 客戶端。

而且,該項目也支持 Java,Go,Python 和 C ++ 語言以及社區提供的第三方客戶端。但是,對於使用其他消息傳輸協議編寫的應用程序,用戶必須重寫這些應用程序,否則這些應用程序無法採用 Pulsar 新的統一消息傳輸協議。

為了解決這一問題,Pulsar 社區之前也開發了一些應用程序,以便將 Kafka 應用程序從其他消息系統遷移到 Pulsar。例如,Pulsar 在 Kafka Java API 上提供了 Kafka wrapper。

Kafka wrapper 允許用戶在不改變代碼的情況下將其使用的 Kafka Java 客戶端應用程序從 Kafka 切換到 Pulsar。Pulsar 還提供豐富的 connector 生態系統,用於連接 Pulsar 和其他數據系統。

但是,那些想要從其他 Kafka 應用程序切換到 Pulsar 的用戶仍然有強烈的需求。

KoP 的誕生背景

因此,就產生了“在 Pulsar 上支持 Kafka 協議”的想法。最初的猜想是添加一個 proxy,比如好多公司會在 Kafka 之前加一個類似 HTTP proxy,後續再轉換成 Pulsar 協議。

第二種猜想是能否直接將 Kafka 協議直接接入到 Pulsar broker 裡,也就是目前 KoP 的成型。

那麼關於第一種 proxy 做法,如果實現起來大概是什麼樣呢?OVHcloud 就有過一次嘗試。

之前 OVHcloud 一直採用 Apache Kafka。儘管他們有在 Kafka 上運行多個集群且每秒處理數百萬條消息的經驗,但仍面臨艱鉅的運營挑戰。所以,OVHcloud 放棄 Kafka,決定將其服務的產品轉移到 Pulsar,並在 Pulsar 上構建其產品。

但是為了照顧到依舊使用 Kafka 系統的用戶,所以他們想在 Pulsar 裡添加一個 proxy 去支持 Kafka 協議。他們最初的做法就是將 Kafka 協議的一幀轉換成 Pulsar 協議。

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

Proxy 收到來自 Kafka 客戶端的任何一幀,通過自由狀態機將其轉換為 Pulsar 相應的接口。

這個狀態機一種是用於接收 Kafka 請求,第二種是用於處理 Pulsar response。然後在其中間再添加一個狀態機進行同步。

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

因為在 TCP 層進行這些操作,所以它的表現還是不錯的。藉由 Rust 的特性,整體運行流暢。但是這個情況下,代碼仍需要一行行去寫,同時 Kafka 協議裡有一些是沒有辦法通過 proxy 方式實現。比如:group coordinator 和 offsets management。

還有一個比較關鍵的點是,因為用 Rust 去構寫,所以比較難開源。即便是開源出來也很難作為一個組件去插入到 Pulsar 系統中。

剛好去年 StreamNative 的一條推特引起了 OVHcloud 的注意。這是 StreamNative 第一次舉行線下 Pulsar meetup 時翟佳老師分享的 KoP demo。

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

經過幾次雙方經驗互談交流後,雙方合力推出了更完善的「KoP」。利用 Pulsar 和 BookKeeper 的事件流存儲架構和 Pulsar 的可插拔協議處理插件框架來提供一種精簡而全面的解決方案。

KoP 組件與 Broker 協作

所以當我們倒回去重看 Pulsar 架構,下方模塊圖中最核心的:Broker、BookKeeper、ZooKeeper。Pulsar 就是基於 Managed ledger 實現的一套分佈式流式存儲,包括如何存數據、如何防止數據丟失、流如何從本地機房複製到另一機房等。

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

Pulsar 協議本身是一個很輕量級的東西,即上圖中的 Pulsar protocol handler。它主要是處理 TCP 過來的請求格式,然後將請求轉化和讀取的操作。所以 Pulsar 協議最核心部分在存儲層面、分佈式均衡層面等。

將 Pulsar protocol handler 抽象出來,變成一個框架/接口。利用這個框架,可以直接訪問 Pulsar 已經構建好的存儲系統,剩下要做的只是協議的解析和轉換。

所以依據這個構想,將 Kafka 協議帶入去實踐。在 Pulsar 2.5 版本時新加了一個「Pluggable protocol handler」的概念(PIP-41),將接口單獨抽離了出來。

Pulsar protocol handler 的使用是類似 Pulsar function/connector,只需將其插入到 Pulsar broker 中,就可以讓 Pulsar 具有讀取和解析其他協議的能力。這個機制只需要調整兩個配置:

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

配置完成後,重啟集群即可支持「其他類型協議」的處理能力。當然這個特性只在 Pulsar 2.5 版本後才支持,所以如需嘗試,可以先將 Pulsar 系統升級到 2.5 版本。

Kafka-on-Pulsar 的前世今生,新秀 Pulsar 到底好在哪?

所以在此機制下過程就會變得更加明瞭簡單。只需在 Pulsar 裡實現 Kafka protocol handler 即可,剩下的上圖實線綠色部分是 Kafka 原生客戶端。只需將數據接入到 Pulsar 集群,就可以處理 Kafka 請求。

為什麼選取 Kafka 作為實踐對象?

應為 Pulsar 和 Kafka 在一些層面有很多相似之處。比如日誌層,Pulsar 和 Kafka 都採用非常相似的數據模型,用於發佈/訂閱消息和事件流,Pulsar 和 Kafka 都採用分佈式日誌。

通過對比 Pulsar 和 Kafka,我們發現這兩種系統有很多相似之處。這兩種系統都包括以下操作:

  • Topic 查找所有客戶端都連接到任一 broker 以查找 Topic 的元數據(即 owner broker)。獲取元數據之後,客戶端與 owner broker 建立持久的 TCP 連接。
  • 發佈客戶端與 Topic 區的 owner broker 進行對話,以將消息追加到分佈式日誌中。
  • 消費客戶端與 Topic 分區的 owner broker 進行對話,以便從分佈式日誌中讀取消息。
  • 偏移量為發佈給 Topic 分區的消息分配偏移量。在 Pulsar 中,偏移量被稱為 MessageId。consumer 可以使用偏移量來查找日誌中的給定位置,以便讀取消息。
  • 消費狀態這兩個系統都維護訂閱中的 consumer( Kafka 稱之為消費組)的消費狀態。Kafka 將消費狀態存儲在 `__offsets` Topic,而 Pulsar 將消費狀態存儲在 `cursors` 。

實現方式

1. Topic

Kafka 將所有 Topic 存儲在扁平的命名空間。但是,Pulsar 將 Topic 存儲在層次化、多租戶的命名空間。我們在 broker 配置中添加了 `kafkaNamespace` 配置,這樣管理員就可以將 Kafka Topic 映射到 Pulsar Topic。

為了方便 Kafka 用戶使用 Apache Pulsar 的多租戶特性,當 Kafka 用戶使用 SASL 驗證機制來驗證 Kafka 客戶端的時候,可以指定一個 Pulsar 租戶和命名空間作為其 SASL 用戶名。

2. 消息 ID 和偏移量

Kafka 為每條被成功發佈到 Topic 分區的消息都指定了一個偏移量。Pulsar 為每條消息指定了一個 `MessageID`。消息 ID 由 `ledger-id`、 `entry-id` 和 `batch-index` 組成。我們在 Pulsar-Kafka wrapper 中使用相同的方法將 Pulsar 的消息 ID 轉換為偏移量,反之亦然。

3. 消息

Kafka 和 Pulsar 的消息都包含鍵、值、時間戳和 header(在 Pulsar 中被稱作 ‘properties’)。我們自動在 Kafka 消息和 Pulsar 消息之間轉換這些字段。

4. Topic 查找

我們為 Kafka 和 Pulsar 的請求處理插件提供相同的 Topic 查找方法。請求處理插件發現 Topic,查找所請求的 Topic 分區的全部所有權,然後將包含所有權信息的 Kafka `TopicMetadata` 返回給 Kafka 客戶端。

5. 發佈消息

當收到 Kafka 客戶端發佈的消息後,Kafka 請求處理插件逐一將多個字段(例如鍵、值、時間戳和 headers)進行映射,從而將 Kafka 消息轉換為 Pulsar 消息。

同時,Kafka 請求處理插件利用 ManagedLedger append API 將這些已轉化的 Pulsar 消息存儲在 BookKeeper。Kafka 請求處理插件將 Kafka 消息轉換為 Pulsar 消息後,現有的 Pulsar 應用程序就可以接收 Kafka 客戶端發佈的消息。

6. 消費消息

當收到 Kafka 客戶端的 consumer 請求時,Kafka 請求處理插件打開一個非持久 cursor,然後從請求的偏移量開始讀取 entries。

Kafka 請求處理插件將 Pulsar 消息轉換回 Kafka 消息後,現有的 Kafka 應用程序就可以接收 Pulsar 客戶端發佈的消息。

7. Group coordinator & 偏移量管理

最大的挑戰是實現 group coordinator 和偏移量管理。Pulsar 不支持集中的 group coordinator,無法為消費組裡的 consumer 分配分區,也無法管理每個消費組的偏移量。

Pulsar broker 基於分區來管理分區分配,而分區的 owner broker 通過將確認信息存儲在 cursors 來管理偏移量。

我們很難讓 Pulsar 模型與 Kafka 模型保持一致。因此,為了完全兼容 Kafka 客戶端,我們將 coordinator group 的更改和偏移量存儲在 Pulsar 名為 `public/kafka/__offsets` 系統 Topic 中,從而實現 Kafka coordinator group。

這樣,我們能夠在 Pulsar 和 Kafka 之間建立橋樑,並允許用戶使用現有的 Pulsar 工具和策略來管理訂閱並監控 Kafka consumer。我們在已實現的 coordinator group 中添加一個後臺線程,定期將偏移量更新從系統 Topic 同步到 Pulsar cursor。

因此,實際上 Kafka 消費組被認為是 Pulsar 訂閱。所有現有的 Pulsar 工具也可以用於管理 Kafka 消費組。

KoP 生產化

如果將 KoP 應用到實際場景中,就需要考慮以下多個方面:

  • 多租戶
  • 安全性
  • 跨機房複製
  • 分層存儲
  • Schema
  • 與已有的數據環境(如 Flink、Spark、Presto)集成

Q & A

1. Pulsar 有多種擴展,這些擴展有統一的管理方式嗎?

目前在做一個項目:Pulsar Registry,類似於 DocHub。也可以看作一個應用商店,會集中一些組件/插件合集,可以期待一下。

2. Kafka 0.11 以下的版本是否能平滑升級到高版本?如果消息格式變了,是不是沒法平滑升級?

不能,0.10/0.11 版本以上才可以平滑升級。

總 結

KoP 最終的目的,是方便用戶將 Kafka 上已有的應用遷移到 Pulsar 上,同時通過 KoP 的方式讓用戶可以更方便地構建產品。未來 KoP 也會加大對 schema 和 Kafka 版本的支持與多兼容性。

使用 KoP

KoP 使用 Apache License V2 許可證,項目地址如下:

:link:https://github.com/streamnative/kop

StreamNative Platform 已經內置 KoP。你可以選擇下載 StreamNative Platform來試用 KoP 的所有功能。

:link:http://streamnative.io/download

如果已經運行 Pulsar 集群,並且希望其支持 Kafka 協議,可以將 KoP 協議處理插件安裝到現有的 Pulsar 集群。相關詳細信息,請參考如下鏈接說明:

:link:http://streamnative.io/docs/latest/connect-and-ingest/kop/get-started -kop /#configure

end


分享到:


相關文章: