filebeat輸出到logstash
filebeat輸出到elasticsearch
上節課我們講述了關於filebeat採集日誌資料的一些基本使用,我們可以自定義輸出一些我們想要的資訊和格式。本節課我們將繼續**filebeat如何配置輸出。
filebeat的輸出主要支援一下幾個元件:
console(控制台)輸出上一節課我們測試功能的時候已經介紹了,主要是用來開發測試使用。還有一些其他的資料出口,像logstash和elasticsearch也是我們經常會用到的。本節課我們將重點介紹輸出訊息佇列——kafka,也會簡單介紹一下輸出logstash和elasticsearch。
其實在日誌系統中,與其說是filebeat輸出kafka,不如說是將每個filebeat例項作為乙個kafka的producer,實際上有多少個filebeat就有多少個生產者,這樣通過kafka可以實現高併發的日誌集中處理。
output.kafka:
enabled: true
hosts: ["kafka1:9092","kafka2:9092","kafka3:9092"]
topic: logtest
codec.format:
string: '","hostname":"%","log_type":"%","message":"%"}'
說明:
測試:首先,我們進入kafka的bin目錄,啟動乙個消費者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic logtest --from-beginning
啟動之後,logtest這個topic中的訊息就會消費到控制台上了。
然後,我們啟動filebeat,並且往監控日誌中增加一條資訊。如果可以在kafka消費者終端看到我們剛剛寫入並且按照自定義格式輸出到控制台的訊息,證明輸出kafka已經走通。
經過上面的測試,我們已經知道我們已經把日誌成功傳送到kafka集群中了。不過上節課我們提到了,我們有很多種類的日誌,有些時候我們並不想把所有日誌都傳送到同乙個topic上,不便於分析處理也不便於日誌的維護。所以我們一般會選擇不同類別的日誌寫到不同的topic中。
首先,要在input中加入filed欄位,上一節課字段分類已經講述了。
fields:
log_type: system
然後,在配置topic輸出:
topic: '%'
這樣就會根據新增加的字段log_type的值來決定傳送到哪個topic中了。
到此,我們實現了filebeat傳送資料到指定kafka-topic。在實際應用中,我們還需要加一些配置來優化這一過程的效能。
required_acks:1
compression:gzip
compression_level
max_message_bytes
timeout
broker_timeout
client_id
loadbalance:true
worker
metadata
refresh_frequency
retry.max
retry.backoff
max_retries
bulk_max_sizeedit
對於日誌系統的其中一種架構(如下圖),我們會用到filebeat輸出到logstash這種方式。
配置輸出:
output.logstash:
hosts:[「127.0.0.1:5044」]
說明:
logstash配置輸入:
input
}
filebeat輸出到elasticsearch是配置檔案預設開啟的輸出。
配置輸出:
說明:
Filebeat採集日誌講解(一)
總結在elkf中,filebeat作為日誌採集端,採集日誌併發送到kafka。input就是以檔案形式儲存的log檔案,output就是kafka集群。在採集日誌中一般情況有以下幾點需要注意的 輸出內容確定,一般包括時間戳,主機名,日誌型別,日誌內容,其他的根據業務的實際需求,無用資訊可以直接過濾掉...
filebeat 資料採集流程
filebeat啟動流程 講解了filebeat的啟動流程,filebeat在構建完crawler物件,開始採集流程。crawler的start方法內,會啟動inputs func c crawler start pipeline beat.pipeline,r registrar.registra...
二 elk採集nginx日誌
為了記錄 的訪問詳情,方便記錄和統計ip的訪問次數和請求的url位址,我們採用輕量級的filebeat工具採集nginx日誌,然後把日誌的資料報傳送給logstash,最後kibana用於日誌的展示。1.配置nginx 1.1 修改nginx日誌輸出為json格式。完整的nginx.conf 1.2...