Kafka 生产者和消费者学习笔记
最近搭建 kafka 集群环境以便于收集应用程序日志并进行个性化的处理,因此学习了 kafka 生产者和消费者 python 程序的实现。这篇文章当是 kafka 的学习笔记。
一、搭建 kafka 集群
为方便测试,我们在 MacOS 单机上搭建具有三个 kafka 节点的集群。如果在生产上部署 kafka ,请在不同的物理机上部署 kafka 集群。
1. 下载 kafka 镜像
我们使用 wurstmeister/kafka 镜像来部署 kafka,由于 kafka 依赖于 zookeeper ,因此,我们需要下载 zookeeper 和 kafka 两个镜像。
1 | docker pull wurstmeister/kafka |
2. 编写 docker-compopse.yml
我们搭建的 kafka 集群包括一个 zooker 节点和三个 kakfa 节点。
使用 ifconfig
命令获取本机的 ip 为 192.168.0.104
,完整的 docker-compose.yml 文件如下。有关 docker-compose 的使用,可以参考 《Docker Compose 入门教程》。
1 | version: '3' |
使用 docker-compose up -d
运行相关的容器,完成 kafka 集群的搭建。
二、Kafka 生产者
我们使用 kafka-python
包来编写 kafka 生产者。使用以下命令下载 kafka-python
包:
1 | pip install kafka-python |
Kafka 生产者的源代码如下:
1 | from kafka import KafkaProducer |
先创建一个 KafkaProducer
对象 producer
,KafkaProducer
指定了 kafka 集群各个节点的地址。然后通过 KafkaProducer
对象向 topic topic_test
发送消息。
三、Kafka 消费者
为接收生产者的消息,我们定义消费者 KafkaConsumer
对象 consumer
。定义 KafkaConsumer
对象时,指定 topic 为 topic_test
,这样当运行上面的生产者程序时,可以正常接收到消息。
1 | from kafka import KafkaConsumer |
先运行消费者程序,再运行生产者程序,可以看到消费者程序输出:
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
多个消费者
如果需要启动多个消费者来消费生产者发送的消息,并实现负载均衡,可以为 topic 设置多个 partition。为方便起见,我们设置 topic 的 partition 数量与消费者数量均为 2。
进入其中一个 kafka 容器:
1 | docker exec -it kafka1 /bin/bash |
在容器内,查看topic_test
的 partition 数量:
1 | cd /opt/kafka |
可以看到 topic_test
的 partition 数量为 1。修改 topic_test
partition 数量为 2:
1 | bin/kafka-topics.sh --zookeeper 192.168.0.104:2181 --alter --topic topic_test --partitions 2 |
启动两个消费者程序,然后启动一个生产者程序,可以看到,其中一个消费者输出:
b’hello kafka’
b’hello kafka’
b’hello kafka’
另一个消费者输出:
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
生产者发送的消息被两个消费者进行了消费,不同的消费者接收到的消息数量不一致,这是由于使用了默认的负载均衡策略所致。多次运行生产者程序,可以看到消费者接收到的总的消息数量基本上一致。