Apache Kafka 实战教程


什么是Apache Kafka?

在电商双11、直播弹幕这类每秒百万级事件的场景下,Apache Kafka 绝对是不可或缺的流量蓄水池事件管道。它是一个开源的分布式事件流平台,核心优势是高吞吐量(百万级/秒)、低延迟、持久化存储、水平可扩展、支持多语言。

常见应用场景包括:日志聚合、事件溯源、流处理、系统间数据同步、异步消息队列。


1. Kafka 快速安装(Docker Compose 单节点)

新手入门首选 Docker Compose,一键启动 Zookeeper(旧版 Kafka 依赖的元数据管理)、单节点 Kafka、可视化监控 Kafka Manager:

# 保存为 docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    ports:
      - "2181:2181"

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092 # 宿主机可访问地址
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 # 单节点副本数只能1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true' # 新手可以自动创建

  kafka-manager:
    image: hlebalbau/kafka-manager:latest
    container_name: kafka-manager
    depends_on:
      - zookeeper
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: zookeeper:2181

启动服务:

docker-compose up -d

验证:

  • 浏览器打开 http://localhost:9000 进入 Kafka Manager,添加集群(ZK 地址填 zookeeper:2181
  • 后续命令行操作可以在宿主机安装 Kafka 工具,或者进容器执行(docker exec -it kafka /bin/bash

2. 核心概念(1分钟看懂)

概念通俗说明
Producer发消息的人/系统(比如订单系统发「订单创建」事件)
Consumer收消息的人/系统(比如库存系统收「订单创建」减库存)
Topic消息的分类标签(比如 order-events
PartitionTopic 的物理分片(类似文件夹分卷,提高读写性能)
Broker单台 Kafka 服务器节点
Consumer Group多个 Consumer 组成的组,实现负载均衡(同一个消息只会给组里的一个人)和容错(挂了一个其他人补上)

3. Topic 管理与命令行测试

3.1 Topic 管理(常用命令)

# 创建一个有3个分区、单副本的Topic(auto_create可以关,手动创建更可控)
kafka-topics --create --topic order-events --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

# 查看所有Topic
kafka-topics --list --bootstrap-server localhost:9092

# 查看Topic详情(分区数、副本、Leader等)
kafka-topics --describe --topic order-events --bootstrap-server localhost:9092

# 增加分区(只能加不能减!)
kafka-topics --alter --topic order-events --partitions 6 --bootstrap-server localhost:9092

3.2 命令行生产消费测试

# 打开生产者终端(输入一行回车就是一条消息)
kafka-console-producer --bootstrap-server localhost:9092 --topic order-events

# 打开另一个消费者终端(--from-beginning 从最早的消息开始读)
kafka-console-consumer --bootstrap-server localhost:9092 --topic order-events --from-beginning

4. Python 与 Kafka 集成(最常用的 kafka-python

4.1 安装依赖

pip install kafka-python

4.2 封装好的生产者和消费者(开箱即用)

from kafka import KafkaProducer, KafkaConsumer
import json
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# ------------------- 生产者 -------------------
class SimpleKafkaProducer:
    def __init__(self, bootstrap_servers=["localhost:9092"]):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode("utf-8"),  # 自动序列化JSON
            acks="1",  # Leader确认即可,平衡性能和可靠性
            retries=3,  # 失败重试3次
            linger_ms=5,  # 延迟5ms凑批次,提高吞吐量
        )

    def send(self, topic: str, value: dict, key: str = None):
        """发送单条消息,key可以控制消息发到固定分区(比如同一用户的事件发到同一区)"""
        try:
            future = self.producer.send(
                topic, 
                value=value, 
                key=key.encode("utf-8") if key else None
            )
            # 等待发送完成(可选,生产环境可以异步)
            record_metadata = future.get(timeout=10)
            logger.info(
                f"消息发送成功:Topic={record_metadata.topic},分区={record_metadata.partition},偏移量={record_metadata.offset}"
            )
        except Exception as e:
            logger.error(f"消息发送失败:{e}")
            raise

    def close(self):
        self.producer.flush()  # 刷新缓冲区
        self.producer.close()

# ------------------- 消费者 -------------------
class SimpleKafkaConsumer:
    def __init__(self, topics: list, group_id: str, bootstrap_servers=["localhost:9092"]):
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode("utf-8")),  # 自动反序列化JSON
            auto_offset_reset="earliest",  # 新组从最早的消息读
            enable_auto_commit=True,  # 自动提交偏移量(生产环境建议手动)
            auto_commit_interval_ms=1000,
        )

    def consume(self, callback):
        """循环消费,传入回调函数处理消息"""
        logger.info(f"开始消费:Topics={self.consumer.subscription()},Group={self.consumer.config['group_id']}")
        try:
            for msg in self.consumer:
                callback(msg.value)
        except KeyboardInterrupt:
            logger.info("消费已停止")
        finally:
            self.consumer.close()

# ------------------- 使用示例 -------------------
if __name__ == "__main__":
    # 1. 发送订单消息
    producer = SimpleKafkaProducer()
    producer.send(
        topic="order-events",
        value={"order_id": 1001, "user_id": 123, "product_id": 456, "amount": 99.9},
        key=str(123),  # 同一用户的订单发到同一分区
    )
    producer.close()

    # 2. 消费订单消息(单独开一个终端运行)
    # def process_order(msg):
    #     logger.info(f"收到订单:{msg}")
    # consumer = SimpleKafkaConsumer(topics=["order-events"], group_id="order-consumer-1")
    # consumer.consume(process_order)

5. 实战最佳实践(精简但核心)

5.1 分区策略

  • 分区数怎么定? 一般建议:分区数 ≈ 目标吞吐量 / 单分区吞吐量(单分区通常能到10-100万/秒,取决于消息大小和硬件)
  • 如何控制消息分区? 指定 key(相同 key 发到同一分区,保证顺序),或者不指定(轮询)

5.2 可靠性调优

场景acks值说明
日志聚合(允许丢少量)0不等待任何确认,最快
订单、支付(必须不丢)all(或-1)等待所有 ISR(同步副本)确认
一般业务(平衡)1(默认)等待 Leader 确认

5.3 性能调优

  • 生产者:开启 linger_ms(凑批次)、batch_size(批次大小,默认16KB)、compression_type(用 snappy/lz4 压缩,减少网络传输)
  • 消费者:开启 max_poll_records(批量拉取,默认500)、fetch_min_bytes(等待至少多少字节再返回,默认1B)

总结

Apache Kafka 是构建实时事件系统的首选工具,通过 Docker Compose 快速上手,掌握核心概念、Python 集成和最佳实践,就能应对大部分业务场景了。如果需要生产级的集群,建议扩展成 3 个节点(保证副本容错),并配置监控告警。