kafka輸出到logstash

<code>本文由“開發者手冊”原創,如需轉載,拿走不謝!/<code>

前提概要

本篇接之前的一篇: ,繼續往下走。

此時kafka的topic裡已經有了消息,但是kafka還沒有消費者。

本篇演示logstash的input配置為kafka源,使得logstash充當kafka消費者角色,然後把獲取到的消息輸出到控制檯。

系統環境介紹

<code>centos7.5參數如下:

命令:uname -a
輸出:Linux server160131 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux

命令:rpm -q centos-release
輸出:centos-release-7-5.1804.el7.centos.x86_64

需要centos7.5精簡版安裝包的同學可以評論或者私信我,私信關鍵字”centos“會自動回覆您下載地址。/<code>

logstash版本介紹

<code>logstash版本為:logstash-7.6.1/<code>

如果你還沒有安裝logstash,請參考我的另一篇文章:


kafka集群介紹

<code>kafka集群信息如下:
1、server160131:192.168.160.131:9092

2、server160132:192.168.160.132:9092
3、server160133:192.168.160.133:9092/<code>

如果你還沒有部署kafka集群,請參考我的另一篇文章:

演示步驟

1、創建logstash配置文件kafka2console.yml

<code>命令:vim /opt/logstash/logstash-7.6.1/config/kafka2console.conf

文件內容如下:
input {
kafka {
bootstrap_servers => "192.168.160.131:9092,192.168.160.132:9092,192.168.160.133:9092"
topics_pattern => "filebeat_.*"
codec => "json"
}
}

output {
stdout {
codec => "rubydebug"
}
}

參數說明:
bootstrap_servers:kafka集群地址
topics_pattern:支持正則匹配的topic名稱
codec:按json格式解碼,因為filebeat的輸出為json格式
/<code>

2、啟動logstash

<code>命令:/opt/logstash/logstash-7.6.1/bin/logstash -f /opt/logstash/logstash-7.6.1/config/kafka2console.conf/<code>

3、查看輸出

<code>輸出內容為:

{

"input" => {
"type" => "log"
},
"@version" => "1",
"app" => "testapp",
"@timestamp" => 2020-03-27T13:07:34.844Z,
"host" => {
"name" => "server160131"
},
"log" => {
"file" => {
"path" => "/opt/logs/testapp/c.log"
}
},
"message" => "tokafka"
}/<code>

結束語

希望我的文章能給您帶來幫助,如果您有什麼疑問,可以給我留言,我會在第一時間給你提供解答。如果您有好的建議或想法,也歡迎給我留言。

kafka輸出到logstash


分享到:


相關文章: