給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件




學習 RocketMQ,需要搞懂兩個東西:通信和存儲。這裡花了一點時間寫了一個 RocketMQ 的 wireshark lua 插件,過程挺有意思,寫出來記錄一下。

通過閱讀這篇文章,你會了解到下面這些知識。

  • wireshark lua 插件的骨架代碼如何編寫
  • 插件版 Hello World 如何實現
  • RocketMQ 的基本通信協議格式
  • RocketMQ 在 PULL 有消息時的 Body 格式是什麼樣的

初探 Hello World 插件

從 wireshark 的 about 頁面可以看到現在它支持的 Lua 版本,下面是我 v3.0.6 版本的 wireshark 對應的頁面。

給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件

可以看到,目前支持的 Lua 版本是 5.2.4。下面我們來看一段骨架代碼。

<code>-- 聲明協議
local NAME = "RocketMQ"
local PORTS = { 9876, 10911 }
local proto = Proto.new(NAME, "RocketMQ Protocol")

-- 聲明 dissector 函數,處理包
function proto.dissector(tvb, pinfo, tree)
print("load plugin...demo")
pinfo.cols.protocol = proto.name;
pinfo.cols.info = "Hello, World"
end

-- 註冊 dissector 到 wireshark
for _, port in ipairs(PORTS) do
DissectorTable.get("tcp.port"):add(port, proto)
end/<code>

找到 wireshark 插件目錄,在我的電腦上這個路徑是 /Applications/Wireshark.app/Contents/Resources/share/wireshark/,修改其中的 init.lua 文件

<code>vim /Applications/Wireshark.app/Contents/Resources/share/wireshark/init.lua/<code>

增加一行加載上面 lua 文件的 dofile 調用。

<code>...
dofile("/path/to/demo.lua")/<code>

執行前後效果如下。


給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件

解析 RocketMQ 協議

RocketMQ 的通信協議是比較簡單的,整體的協議格式如下所示。

給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件

RocketMQ 的通信協議由四部分組成:

  • 第一部分:頭 4 個字節表示剩下三部分的總長度(不包括自己這 4 個字節)
  • 第二部分:接下來的 4 個字節表示 Header 部分的長度
  • 第三部分:接下來的 Header Length 長度的內容為協議頭,是用 json 序列化後存儲,主要用來表示不同的請求響應類型
  • 第四部分:body 內容

以一個實際的包為例:

給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件

頭四個字節 00 00 01 9b 表示整個包的長度 411(0x019b),接下來的四個字節 00 00 00 d4 表示 Header Length,這裡為 212(0xD4),接下來的 212 個字節表示Header 的內容,可以看到這是一段 json 的字符串,最後的 195(411-4-212) 個字節表示 Body 的真正內容,具體的消息格式下面會再講到。

接下來我們來寫解析的程序。

解析的邏輯在 proto.dissector 方法中進行,它的簽名如下所示。

<code>function proto.dissector(tvb, pinfo, tree)
end/<code>

這些參數的釋義如下:

  • tvb 是 "Testy Virtual Buffer" 的縮寫,是包含數據包的 buffer 內容
  • pinfo 是 Packet Information 的縮寫,表示 Packet 包相關的信息,可以獲取包的源端口、目標端口等信息。
  • tree 表示 wireshark UI 界面的展示樹,解析包得到的信息都會添加到這個有層級關係的樹中。

接下來我們把 RocketMQ 通信的四個部分展示到 wireshark 中。修改 proto.dissector 函數的代碼如下所示。

<code>function proto.dissector(tvb, pinfo, tree)
print("load plugin...demo")

local subtree = tree:add(proto, tvb())
pinfo.cols.protocol = proto.name;
pinfo.cols.info = ""

local length = tvb(0, 4):uint()
subtree:add("Total Length", length)
local headerLength = tvb(4, 4):uint()
subtree:add("Header Length", headerLength)
local headerData = tvb(8, headerLength):string()
subtree:add("Header", headerData)
local bodyDataLen = length - 4 - headerLength
local bodyData = tvb(8 + headerLength, bodyDataLen):string()

subtree:add("Body", bodyData)
end/<code>

重新加載 lua 腳本,可以看到 Wireshark 中 RocketMQ 協議的幾個部分已經顯示出來了。

給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件

為了能區分是通信 Request 還是 Response,我們可以通過目標端口號來區分,新增一個方法。

<code>function isRequest(pinfo)
local dstPort = pinfo.dst_port;
for _, port in ipairs(PORTS) do
if (dstPort == port) then
return true
end
end

return false
end/<code>

在 proto.dissector 中新增對請求和響應的區分,增加更可讀的描述。

<code>if (isRequest(pinfo)) then
pinfo.cols.info:append("[REQUEST]" .. "↑↑↑")
else
pinfo.cols.info:append("[RESPONSE]" .. "↓↓↓")
end/<code>

效果如下所示。

給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件

接下來我們要做的就是把 json 做解析,展示的更好看一點,先來看 header 和 body 為 json 格式時請求和響應。增加一個遞歸的方法,統一處理 json 格式的數據。

<code>-- k,v 分別表示 json 的 key 和 value,tree 表示 UI 樹
function parseAndAddTree(k, v, tree)
if (type(v) == 'table') then
local sizeStr = ""
if (#v > 0) then
sizeStr = "size: " .. #v
end;
local childTree = tree:add(k, sizeStr, tree)
for key, value in pairs(v) do
parseAndAddTree(key, value, childTree)
end
else
tree:add(k .. ":", json.stringify(v))
end
end/<code>

在 proto.dissector 方法中增加 Header 的解析,如下所示。

<code>local subtree = tree:add(protoMQ, tvb())
local headerTree = subtree:add("Header", "")

-- 解析 json
local header = json.parse(headerData, 1, "}")

for k, v in pairs(header) do
parseAndAddTree(k, v, headerTree)
end/<code>

重新加載運行上面的代碼,效果如下所示。

給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件

同時我們也可以在 RocketMQ 的源碼中找到請求和響應 code 對應的更可讀的字符串表示,

<code>local requestCodeMap = {
[10] = "SEND_MESSAGE",
[11] = "PULL_MESSAGE",
[12] = "QUERY_MESSAGE",
...
}

local responseCode = {
[0] = "SUCCESS",
[1] = "SYSTEM_ERROR",
[2] = "SYSTEM_BUSY",
}/<code>

如果 Body 是 json 字符串的話也可以用這種方式來處理,如下所示。

給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件

但是在一些情況下,Body 並不是用 json 字符串來表示的,比如在 PULL 消息的時候,如果服務器有返回可消費的消息,這時 Body 中存儲的並不是字符串,而是 RocketMQ 自定義的消息格式,如下所示。


給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件

寫這段解析是個體力活,我參照 RocketMQ 的 Java 源碼實現了一個 lua 版本,完整的代碼如下所示,

<code>function decodeMessageExt(bodyTree, pinfo, bodyData)
local bodyTree = bodyTree:add("Body", "")

pinfo.cols.info:append(">>>>#FOUND#")

local offset = 0;

bodyTree:add("totalSize", bodyData(offset, 4):int())
offset = offset + 4;

local magicCode = string.format("0X%8.8X", bodyData(offset, 4):uint())
bodyTree:add("magicCode", magicCode)
offset = offset + 4;

bodyTree:add("bodyCRC", bodyData(offset, 4):int())
offset = offset + 4;

bodyTree:add("queueId", bodyData(offset, 4):int())
offset = offset + 4;

bodyTree:add("flag", bodyData(offset, 4):int())
offset = offset + 4;

bodyTree:add("queueOffset", bodyData(offset, 8):int64():tonumber())
offset = offset + 8;

bodyTree:add("physicOffset", bodyData(offset, 8):int64():tonumber())
offset = offset + 8;

bodyTree:add("sysFlag", bodyData(offset, 4):int())
offset = offset + 4;


bodyTree:add("bornTimeStamp", bodyData(offset, 8):int64():tonumber())
offset = offset + 8;

local bornHost = bodyData(offset, 1):uint()
.. "." .. bodyData(offset + 1, 1):uint()
.. "." .. bodyData(offset + 2, 1):uint()
.. "." .. bodyData(offset + 3, 1):uint()

bodyTree:add("bornHost", bornHost)
offset = offset + 4;

bodyTree:add("port", bodyData(offset, 4):int())
offset = offset + 4;
bodyTree:add("storeTimestamp", bodyData(offset, 8):int64():tonumber())
offset = offset + 8;

local storeHost = bodyData(offset, 1):uint()

.. "." .. bodyData(offset + 1, 1):uint()
.. "." .. bodyData(offset + 2, 1):uint()
.. "." .. bodyData(offset + 3, 1):uint()
bodyTree:add("storeHost", storeHost)
offset = offset + 4;

bodyTree:add("storePort", bodyData(offset, 4):int())
offset = offset + 4;

--13 RECONSUMETIMES
bodyTree:add("reconsumeTimes", bodyData(offset, 4):int())
offset = offset + 4;
--14 Prepared Transaction Offset
bodyTree:add("preparedTransactionOffset", bodyData(offset, 8):int64():tonumber())
offset = offset + 8;
--15 BODY
local bodyLen = bodyData(offset, 4):int()
-- bodyTree:add("bodyLen", bodyLen)
offset = offset + 4;

bodyTree:add("body:", bodyData(offset, bodyLen):string())
offset = offset + bodyLen;

--16 TOPIC
local topicLen = bodyData(offset, 1):int()
offset = offset + 1;
-- bodyTree:add("topicLen", topicLen)
local topic = bodyData(offset, topicLen):string()
bodyTree:add("topic:", topic)
pinfo.cols.info:append(" topic:" .. topic)

offset = offset + topicLen;

--17 properties
local propertiesLength = bodyData(offset, 2):int()
offset = offset + 2;
bodyTree:add("propertiesLength", propertiesLength)

if (propertiesLength > 0) then
local propertiesStr = bodyData(offset, propertiesLength):string()
offset = offset + propertiesLength
local propertiesTree = bodyTree:add("propertiesStr", "size: " .. propertiesLength)
for k, v in string.gmatch(propertiesStr, "(%w+)\\1(%w+)") do
propertiesTree:add(k, v)
end
end
end/<code>

運行的效果如下所示。

給 wireshark 寫一個 RocketMQ 協議解析的 lua 插件

作者:挖坑的張師傅
原文鏈接:https://juejin.im/post/5e65c655e51d452728647f3a


分享到:


相關文章: