Kafka 生产者和消费者学习笔记

最近搭建 kafka 集群环境以便于收集应用程序日志并进行个性化的处理,因此学习了 kafka 生产者和消费者 python 程序的实现。这篇文章当是 kafka 的学习笔记。

一、搭建 kafka 集群

为方便测试,我们在 MacOS 单机上搭建具有三个 kafka 节点的集群。如果在生产上部署 kafka ,请在不同的物理机上部署 kafka 集群。

1. 下载 kafka 镜像

我们使用 wurstmeister/kafka 镜像来部署 kafka,由于 kafka 依赖于 zookeeper ,因此,我们需要下载 zookeeper 和 kafka 两个镜像。

1
2
docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper

2. 编写 docker-compopse.yml

我们搭建的 kafka 集群包括一个 zooker 节点和三个 kakfa 节点。

使用 ifconfig 命令获取本机的 ip 为 192.168.0.104,完整的 docker-compose.yml 文件如下。有关 docker-compose 的使用,可以参考 《Docker Compose 入门教程》

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
version: '3'
services:
zookeeper:
image: docker.io/wurstmeister/zookeeper
container_name: zookeeper
restart: always
ports:
- "2181:2181"
kafka1:
image: docker.io/wurstmeister/kafka
container_name: kafka1
restart: always
ports:
- "9095:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 0
KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9095
kafka2:
image: docker.io/wurstmeister/kafka
container_name: kafka2
restart: always
ports:
- "9096:9093"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9096
kafka3:
image: docker.io/wurstmeister/kafka
container_name: kafka3
restart: always
ports:
- "9097:9094"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 2
KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9097

使用 docker-compose up -d 运行相关的容器,完成 kafka 集群的搭建。

二、Kafka 生产者

我们使用 kafka-python 包来编写 kafka 生产者。使用以下命令下载 kafka-python 包:

1
pip install kafka-python

Kafka 生产者的源代码如下:

1
2
3
4
5
6
7
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers=["192.168.0.104:9095", '192.168.0.104:9096', '192.168.0.104:9097'])
for _ in range(10):
producer.send('topic_test', b'hello kafka')
producer.flush()

先创建一个 KafkaProducer 对象 producerKafkaProducer 指定了 kafka 集群各个节点的地址。然后通过 KafkaProducer 对象向 topic topic_test 发送消息。

三、Kafka 消费者

为接收生产者的消息,我们定义消费者 KafkaConsumer 对象 consumer。定义 KafkaConsumer 对象时,指定 topic 为 topic_test,这样当运行上面的生产者程序时,可以正常接收到消息。

1
2
3
4
5
6
from kafka import KafkaConsumer
consumer = KafkaConsumer('topic_test', group_id="my_group", bootstrap_servers=["192.168.0.104:9095", '192.168.0.104:9096', '192.168.0.104:9097'])
for msg in consumer:
print(msg.value)

先运行消费者程序,再运行生产者程序,可以看到消费者程序输出:

b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’

多个消费者

如果需要启动多个消费者来消费生产者发送的消息,并实现负载均衡,可以为 topic 设置多个 partition。为方便起见,我们设置 topic 的 partition 数量与消费者数量均为 2。

进入其中一个 kafka 容器:

1
docker exec -it kafka1 /bin/bash

在容器内,查看topic_test 的 partition 数量:

1
2
cd /opt/kafka
bin/kafka-topics.sh --describe --zookeeper 192.168.0.104:2181 --topic topic_test

可以看到 topic_test 的 partition 数量为 1。修改 topic_test partition 数量为 2:

1
bin/kafka-topics.sh --zookeeper 192.168.0.104:2181 --alter --topic topic_test --partitions 2

启动两个消费者程序,然后启动一个生产者程序,可以看到,其中一个消费者输出:

b’hello kafka’
b’hello kafka’
b’hello kafka’

另一个消费者输出:

b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’
b’hello kafka’

生产者发送的消息被两个消费者进行了消费,不同的消费者接收到的消息数量不一致,这是由于使用了默认的负载均衡策略所致。多次运行生产者程序,可以看到消费者接收到的总的消息数量基本上一致。

参考资料

  1. http://kafka.apache.org/
  2. http://wurstmeister.github.io/kafka-docker/
  3. https://www.cnblogs.com/answerThe/p/11267129.html
  4. https://www.jianshu.com/p/fe73765ef74d
  5. https://pypi.org/project/kafka-python/
  6. https://www.cnblogs.com/small-office/p/9399907.html