Skip to content
Leo的技术分享
Go back

Kafka 生产者和消费者学习笔记

最近搭建 kafka 集群环境以便于收集应用程序日志并进行个性化的处理,因此学习了 kafka 生产者和消费者 python 程序的实现。这篇文章当是 kafka 的学习笔记。

一、搭建 kafka 集群

为方便测试,我们在 MacOS 单机上搭建具有三个 kafka 节点的集群。如果在生产上部署 kafka ,请在不同的物理机上部署 kafka 集群。

1. 下载 kafka 镜像

我们使用 wurstmeister/kafka 镜像来部署 kafka,由于 kafka 依赖于 zookeeper ,因此,我们需要下载 zookeeper 和 kafka 两个镜像。

docker pull wurstmeister/kafka
docker pull wurstmeister/zookeeper

2. 编写 docker-compopse.yml

我们搭建的 kafka 集群包括一个 zooker 节点和三个 kakfa 节点。

使用 ifconfig 命令获取本机的 ip 为 192.168.0.104,完整的 docker-compose.yml 文件如下。有关 docker-compose 的使用,可以参考 《Docker Compose 入门教程》

version: '3'
services:
  zookeeper:
    image: docker.io/wurstmeister/zookeeper
    container_name: zookeeper
    restart: always
    ports:
      - "2181:2181"

  kafka1:
    image: docker.io/wurstmeister/kafka
    container_name: kafka1
    restart: always
    ports:
      - "9095:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9095

  kafka2:
    image: docker.io/wurstmeister/kafka
    container_name: kafka2
    restart: always
    ports:
      - "9096:9093"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9096

  kafka3:
    image: docker.io/wurstmeister/kafka
    container_name: kafka3
    restart: always
    ports:
      - "9097:9094"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: 192.168.0.104:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9094
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.104:9097

使用 docker-compose up -d 运行相关的容器,完成 kafka 集群的搭建。

二、Kafka 生产者

我们使用 kafka-python 包来编写 kafka 生产者。使用以下命令下载 kafka-python 包:

pip install kafka-python

Kafka 生产者的源代码如下:

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers=["192.168.0.104:9095", '192.168.0.104:9096', '192.168.0.104:9097']) 

for _ in range(10):
    producer.send('topic_test', b'hello kafka')
producer.flush()

先创建一个 KafkaProducer 对象 producerKafkaProducer 指定了 kafka 集群各个节点的地址。然后通过 KafkaProducer 对象向 topic topic_test 发送消息。

三、Kafka 消费者

为接收生产者的消息,我们定义消费者 KafkaConsumer 对象 consumer。定义 KafkaConsumer 对象时,指定 topic 为 topic_test,这样当运行上面的生产者程序时,可以正常接收到消息。

from kafka import KafkaConsumer

consumer = KafkaConsumer('topic_test', group_id="my_group", bootstrap_servers=["192.168.0.104:9095", '192.168.0.104:9096', '192.168.0.104:9097'])

for msg in consumer:
    print(msg.value)

先运行消费者程序,再运行生产者程序,可以看到消费者程序输出:

b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’

多个消费者

如果需要启动多个消费者来消费生产者发送的消息,并实现负载均衡,可以为 topic 设置多个 partition。为方便起见,我们设置 topic 的 partition 数量与消费者数量均为 2。

进入其中一个 kafka 容器:

docker exec -it kafka1 /bin/bash

在容器内,查看topic_test 的 partition 数量:

cd /opt/kafka
bin/kafka-topics.sh --describe --zookeeper 192.168.0.104:2181 --topic topic_test

可以看到 topic_test 的 partition 数量为 1。修改 topic_test partition 数量为 2:

bin/kafka-topics.sh --zookeeper 192.168.0.104:2181 --alter --topic topic_test --partitions 2

启动两个消费者程序,然后启动一个生产者程序,可以看到,其中一个消费者输出:

b’hello kafka’ b’hello kafka’ b’hello kafka’

另一个消费者输出:

b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’ b’hello kafka’

生产者发送的消息被两个消费者进行了消费,不同的消费者接收到的消息数量不一致,这是由于使用了默认的负载均衡策略所致。多次运行生产者程序,可以看到消费者接收到的总的消息数量基本上一致。

参考资料

  1. http://kafka.apache.org/
  2. http://wurstmeister.github.io/kafka-docker/
  3. https://www.cnblogs.com/answerThe/p/11267129.html
  4. https://www.jianshu.com/p/fe73765ef74d
  5. https://pypi.org/project/kafka-python/
  6. https://www.cnblogs.com/small-office/p/9399907.html

Share this post on:

Previous Post
filebeat 日志输出至 kafka
Next Post
使用 prometheus python 库编写自定义指标