filebeat 日志输出至 kafka
Filebeat 是一款轻量级的日志采集器,可以用来收集日志,并将日志汇总起来处理。
Filebeat 的工具原理如下图所示:
图片来源:
https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html
通过 filebeat 配置文件 filebeat.yml
指定需要收集的日志,并可以指定日志输出至 elasticsearch,logstash,kafka,redis 等下游接收系统。
本文讲述如何配置 filebeat,将日志输出至 kafka 集群。
安装 filebeat
为方便使用,我们采用 docker 方式安装 filebeat。
采用 filebea 官方镜像 docker.elastic.co/beats/filebeat:7.7.1
,docker-compose.yml
配置如下:
1 | version: '3' |
其中 volumes
的配置,第一行是指定需要收集日志的目录,第二行是指定 filebeat 的配置文件,第三、第四行是参考官方文档添加的配置,详细可参考 https://www.elastic.co/guide/en/beats/filebeat/current/running-on-docker.html。
配置 filebeat
配置 filebeat 的输出,以便收集日志:
1 | - type: log |
上述配置中,指定收集的日志文件为 /home/lihao/code/server/logs
目录下的 .log
文件。
配置日志输出至 kafka 集群:
1 | output.kafka: |
kafka 集群地址为 10.88.115.137:9095, 10.88.115.137:9096, 10.88.115.137:9097
,kafka 和 topic 名称为 kafka_log
。
Filebeat 提供多种输出至 kafka 分区的策略,包括 random
,round_robin
,hash
,默认是 hash
。上面的配置中,我们指定 round_robin
的策略,并指定 reachable_only
为 true
,这表示仅将日志发布到可用分区。
设置好 kafka 分区策略后,后面我们可以通过设置 kafka topic 和 consumer 的数量,以实现负载均衡。
完成 filebeat.yml
配置后,我们使用命令启动 filebeat 容器。
1 | docker-compose up -d |
测试
启动 kafka consumer 测试程序,以测试从 kafka 接收日志。
1 | from kafka import KafkaConsumer |
然后往目录 /home/lihao/code/server/logs
写入日志,命令:
1 | cd /home/lihao/code/server/logs |
可以看到 consumer 程序输出:
… “message”:”hello” …
使用 kafka 命令可以查看到 topic 的 partition 数量(有关命令可以参考文章《
Kafka 生产者和消费者学习笔记》):
1 | bin/kafka-topics.sh --describe --zookeeper 10.88.115.137:2181 --topic kafka_log |
Topic kafka_log
的 partition 数量为 1。为实现负载均衡,设置 kafka_log
的 partition 数量为 2:
1 | bin/kafka-topics.sh --zookeeper 10.88.115.137:2181 --alter --topic kafka_log --partitions 2 |
打开终端,同时启动两个 kafka consumer 测试程序。然后,修改 partition 数量后,再次执行:
1 | cd /home/lihao/code/server/logs |
可以看到其中一个 kafka consumer 测试程序输出:
… “message”:”hello1” …
另一个 kafka consumer 测试程序输出:
… “message”:”hello2” …
不断执行上面的 echo
命令,可以看到 kafka consumer 测试程程序交替地输出接收到的日志。
在我们的生产环境,通过 prometheus 收集 kafka consumer 的 cpu 占用情况,可以看到生产环境上 3 个 kafka consumer 的负载基本一致,这也验证了 filebeat 的 partition.round_robin:
配置在实现负载均衡的作用。