<code>本文由“開發者手冊”原創,如需轉載,拿走不謝!/<code>
前提概要
本篇接之前的一篇: ,繼續往下走。
此時kafka的topic裡已經有了消息,但是kafka還沒有消費者。
本篇演示logstash的input配置為kafka源,使得logstash充當kafka消費者角色,然後把獲取到的消息輸出到控制檯。
系統環境介紹
<code>centos7.5參數如下:
命令:uname -a
輸出:Linux server160131 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
命令:rpm -q centos-release
輸出:centos-release-7-5.1804.el7.centos.x86_64
需要centos7.5精簡版安裝包的同學可以評論或者私信我,私信關鍵字”centos“會自動回覆您下載地址。/<code>
logstash版本介紹
<code>logstash版本為:logstash-7.6.1/<code>
如果你還沒有安裝logstash,請參考我的另一篇文章:
kafka集群介紹
<code>kafka集群信息如下:
1、server160131:192.168.160.131:9092
2、server160132:192.168.160.132:9092
3、server160133:192.168.160.133:9092/<code>
如果你還沒有部署kafka集群,請參考我的另一篇文章:
演示步驟
1、創建logstash配置文件kafka2console.yml
<code>命令:vim /opt/logstash/logstash-7.6.1/config/kafka2console.conf
文件內容如下:
input {
kafka {
bootstrap_servers => "192.168.160.131:9092,192.168.160.132:9092,192.168.160.133:9092"
topics_pattern => "filebeat_.*"
codec => "json"
}
}
output {
stdout {
codec => "rubydebug"
}
}
參數說明:
bootstrap_servers:kafka集群地址
topics_pattern:支持正則匹配的topic名稱
codec:按json格式解碼,因為filebeat的輸出為json格式
/<code>
2、啟動logstash
<code>命令:/opt/logstash/logstash-7.6.1/bin/logstash -f /opt/logstash/logstash-7.6.1/config/kafka2console.conf/<code>
3、查看輸出
<code>輸出內容為:
{
"input" => {
"type" => "log"
},
"@version" => "1",
"app" => "testapp",
"@timestamp" => 2020-03-27T13:07:34.844Z,
"host" => {
"name" => "server160131"
},
"log" => {
"file" => {
"path" => "/opt/logs/testapp/c.log"
}
},
"message" => "tokafka"
}/<code>
結束語
希望我的文章能給您帶來幫助,如果您有什麼疑問,可以給我留言,我會在第一時間給你提供解答。如果您有好的建議或想法,也歡迎給我留言。
閱讀更多 開發者手冊 的文章