Apache Kafka 实战教程


什么是Apache Kafka?

Apache Kafka是一个开源的分布式事件流平台,能够处理实时数据流。它被设计用来提供高吞吐量、低延迟的数据管道和流处理应用。Kafka可以处理数百万条消息每秒,被广泛用于日志聚合、流处理、数据集成等场景。

Kafka的主要特点:

  • 高吞吐量:能够处理数百万条消息每秒
  • 持久化存储:消息持久化到磁盘,保证数据可靠性
  • 可扩展性:支持水平扩展,可以轻松扩展到数百个节点
  • 容错性:支持分区和副本机制,保证数据不丢失
  • 实时处理:支持实时数据流处理
  • 多语言支持:提供多种编程语言的客户端

1. Kafka安装与配置

1.1 Docker方式安装

# 拉取Kafka镜像
docker pull confluentinc/cp-kafka:latest

# 运行Kafka容器(单节点)
docker run -d \
  --name kafka-server \
  --network kafka-network \
  -p 9092:9092 \
  -e KAFKA_BROKER_ID=1 \
  -e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  confluentinc/cp-kafka:latest

# 运行Zookeeper容器(Kafka依赖Zookeeper)
docker run -d \
  --name zookeeper \
  --network kafka-network \
  -p 2181:2181 \
  -e ZOOKEEPER_CLIENT_PORT=2181 \
  -e ZOOKEEPER_TICK_TIME=2000 \
  confluentinc/cp-zookeeper:latest

1.2 Docker Compose方式安装

# docker-compose.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"
    networks:
      - kafka-network

  kafka:
    image: confluentinc/cp-kafka:latest
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
    networks:
      - kafka-network

  kafka-manager:
    image: hlebalbau/kafka-manager:latest
    container_name: kafka-manager
    depends_on:
      - zookeeper
    ports:
      - "9000:9000"
    environment:
      ZK_HOSTS: zookeeper:2181
      APPLICATION_SECRET: letmein
      KAFKA_MANAGER_AUTH_ENABLED: 'false'
    networks:
      - kafka-network

networks:
  kafka-network:
    driver: bridge

1.3 分布式集群配置

# docker-compose-cluster.yml
version: '3.8'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  kafka1:
    image: confluentinc/cp-kafka:latest
    container_name: kafka1
    depends_on:
      - zookeeper
    ports:
      - "9091:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

  kafka2:
    image: confluentinc/cp-kafka:latest
    container_name: kafka2
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

  kafka3:
    image: confluentinc/cp-kafka:latest
    container_name: kafka3
    depends_on:
      - zookeeper
    ports:
      - "9093:9092"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'

1.4 配置文件详解

# server.properties - Kafka Broker配置文件
# 基本配置
broker.id=1
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# Zookeeper配置
zookeeper.connect=localhost:2181
zookeeper.connection.timeout.ms=6000

# 日志配置
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1

# 副本配置
offsets.topic.replication.factor=1
transaction.state.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# 内存和性能配置
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

replica.fetch.min.bytes=1
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000

# 消息保留配置
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# 压缩配置
compression.type=producer

2. Kafka基础概念

2.1 核心概念

概念说明
Producer消息生产者,向Kafka Topic发送消息
Consumer消息消费者,从Kafka Topic读取消息
Topic消息类别,逻辑上的消息容器
PartitionTopic的分区,物理上的存储单元
BrokerKafka服务器节点
Offset消息在分区中的唯一标识
Consumer Group消费者组,实现负载均衡和容错

2.2 Topic和Partition

# 创建Topic
kafka-topics --create --topic my-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1

# 列出所有Topics
kafka-topics --list --bootstrap-server localhost:9092

# 查看Topic详细信息
kafka-topics --describe --topic my-topic --bootstrap-server localhost:9092

# 删除Topic
kafka-topics --delete --topic my-topic --bootstrap-server localhost:9092

# 修改Topic分区数
kafka-topics --alter --topic my-topic --partitions 6 --bootstrap-server localhost:9092

3. Kafka管理与监控

3.1 命令行管理

# 启动Kafka服务
bin/kafka-server-start.sh config/server.properties

# 停止Kafka服务
bin/kafka-server-stop.sh

# 生产者命令行工具
kafka-console-producer --bootstrap-server localhost:9092 --topic my-topic

# 消费者命令行工具
kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning

# 查看消费者组
kafka-consumer-groups --bootstrap-server localhost:9092 --list

# 查看消费者组详情
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-group

# 重置消费者组偏移量
kafka-consumer-groups --bootstrap-server localhost:9092 --reset-offsets --group my-group --topic my-topic --to-earliest --execute

3.2 监控指标

# 查看Broker状态
kafka-broker-api-versions --bootstrap-server localhost:9092

# 查看Topic消息统计
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic my-topic

# 查看消费者滞后情况
kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group my-group

4. Python与Kafka集成

4.1 安装kafka-python客户端

pip install kafka-python
pip install aiokafka  # 异步支持
pip install confluent-kafka  # Confluent官方客户端(性能更好)

4.2 基本生产者和消费者

from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
import json
import logging
from typing import Dict, Any, Callable

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class KafkaProducerWrapper:
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            acks='all',  # 等待所有副本确认
            retries=3,
            linger_ms=5,  # 批量发送延迟
            batch_size=16384  # 批量大小
        )
    
    def send_message(self, topic: str, message: Dict, key: str = None):
        """发送消息到指定Topic"""
        try:
            future = self.producer.send(topic, value=message, key=key)
            record_metadata = future.get(timeout=10)
            logger.info(f'Message sent to topic {record_metadata.topic}, partition {record_metadata.partition}, offset {record_metadata.offset}')
            return record_metadata
        except KafkaError as e:
            logger.error(f'Error sending message: {e}')
            raise
    
    def send_messages_batch(self, topic: str, messages: list):
        """批量发送消息"""
        for msg in messages:
            self.send_message(topic, msg)
    
    def flush(self):
        """刷新缓冲区"""
        self.producer.flush()
    
    def close(self):
        """关闭生产者"""
        self.producer.close()

class KafkaConsumerWrapper:
    def __init__(self, bootstrap_servers=['localhost:9092'], group_id='my-group'):
        self.consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda k: k.decode('utf-8') if k else None,
            auto_offset_reset='earliest',  # 从最早的消息开始消费
            enable_auto_commit=True,
            auto_commit_interval_ms=1000
        )
    
    def subscribe_topics(self, topics: list):
        """订阅Topic"""
        self.consumer.subscribe(topics)
        logger.info(f'Subscribed to topics: {topics}')
    
    def consume_messages(self, max_records=10):
        """消费消息"""
        messages = self.consumer.poll(timeout_ms=1000, max_records=max_records)
        for topic_partition, records in messages.items():
            for record in records:
                yield {
                    'topic': record.topic,
                    'partition': record.partition,
                    'offset': record.offset,
                    'key': record.key,
                    'value': record.value,
                    'timestamp': record.timestamp
                }
    
    def close(self):
        """关闭消费者"""
        self.consumer.close()

# 使用示例
def example_usage():
    # 创建生产者
    producer = KafkaProducerWrapper()
    
    # 发送消息
    message = {
        'id': 1,
        'name': 'John Doe',
        'email': 'john@example.com',
        'timestamp': '2026-04-10T20:00:00Z'
    }
    
    producer.send_message('user-events', message, key='user-1')
    
    # 创建消费者
    consumer = KafkaConsumerWrapper(group_id='user-event-consumer')
    consumer.subscribe_topics(['user-events'])
    
    # 消费消息
    for msg in consumer.consume_messages():
        logger.info(f'Received message: {msg}')
    
    # 清理资源
    producer.close()
    consumer.close()

4.3 高级功能

from kafka import KafkaProducer, KafkaConsumer, TopicPartition
from kafka.structs import OffsetAndTimestamp
import asyncio
from concurrent.futures import ThreadPoolExecutor

class AdvancedKafkaProducer:
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            compression_type='snappy',  # 启用压缩
            linger_ms=5,
            batch_size=16384,
            max_block_ms=5000
        )
    
    def send_with_callback(self, topic: str, message: Dict, key: str = None):
        """发送消息并设置回调"""
        def on_send_success(record_metadata):
            logger.info(f'Message delivered to {record_metadata.topic}[{record_metadata.partition}] at offset {record_metadata.offset}')
        
        def on_send_error(exc):
            logger.error(f'Message delivery failed: {exc}')
        
        future = self.producer.send(topic, value=message, key=key)
        future.add_callback(on_send_success)
        future.add_errback(on_send_error)
    
    def send_with_partition(self, topic: str, message: Dict, partition: int = None, key: str = None):
        """发送到指定分区"""
        future = self.producer.send(topic, value=message, partition=partition, key=key)
        return future.get()
    
    def send_with_headers(self, topic: str, message: Dict, headers: list = None):
        """发送带头部信息的消息"""
        future = self.producer.send(topic, value=message, headers=headers)
        return future.get()

class AdvancedKafkaConsumer:
    def __init__(self, bootstrap_servers=['localhost:9092'], group_id='advanced-group'):
        self.consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda k: k.decode('utf-8') if k else None,
            auto_offset_reset='latest',
            enable_auto_commit=False,  # 手动提交偏移量
            max_poll_records=100,  # 每次poll的最大记录数
            max_poll_interval_ms=300000  # 最大poll间隔
        )
    
    def manual_commit(self):
        """手动提交偏移量"""
        self.consumer.commit()
    
    def seek_to_timestamp(self, topic: str, timestamp: int):
        """根据时间戳定位到特定偏移量"""
        tp = TopicPartition(topic, 0)
        offsets = self.consumer.offsets_for_times({tp: timestamp})
        if offsets[tp]:
            self.consumer.seek(tp, offsets[tp].offset)
    
    def get_consumer_metrics(self):
        """获取消费者指标"""
        return self.consumer.metrics()
    
    def pause_partitions(self, partitions: list):
        """暂停特定分区"""
        tps = [TopicPartition(p['topic'], p['partition']) for p in partitions]
        self.consumer.pause(*tps)
    
    def resume_partitions(self, partitions: list):
        """恢复特定分区"""
        tps = [TopicPartition(p['topic'], p['partition']) for p in partitions]
        self.consumer.resume(*tps)

# 异步生产者消费者
class AsyncKafkaHandler:
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.bootstrap_servers = bootstrap_servers
        self.executor = ThreadPoolExecutor(max_workers=10)
    
    async def async_produce(self, topic: str, message: Dict):
        """异步生产消息"""
        loop = asyncio.get_event_loop()
        producer = KafkaProducerWrapper(self.bootstrap_servers)
        
        try:
            await loop.run_in_executor(
                self.executor,
                lambda: producer.send_message(topic, message)
            )
        finally:
            await loop.run_in_executor(self.executor, producer.close)
    
    async def async_consume(self, topic: str, group_id: str, callback: Callable):
        """异步消费消息"""
        loop = asyncio.get_event_loop()
        consumer = KafkaConsumerWrapper(self.bootstrap_servers, group_id)
        consumer.subscribe_topics([topic])
        
        try:
            while True:
                messages = await loop.run_in_executor(
                    self.executor,
                    lambda: list(consumer.consume_messages(max_records=10))
                )
                
                for msg in messages:
                    await callback(msg)
                
                if not messages:
                    await asyncio.sleep(0.1)  # 短暂休眠避免忙等待
        except KeyboardInterrupt:
            logger.info("Consumer stopped")
        finally:
            await loop.run_in_executor(self.executor, consumer.close)

4.4 实际应用场景

4.4.1 日志收集系统

import os
import time
from datetime import datetime
from pathlib import Path

class LogCollector:
    def __init__(self, kafka_bootstrap_servers=['localhost:9092']):
        self.producer = KafkaProducerWrapper(kafka_bootstrap_servers)
        self.log_topic = 'application-logs'
    
    def collect_log_file(self, log_file_path: str):
        """收集日志文件"""
        log_path = Path(log_file_path)
        
        if not log_path.exists():
            logger.error(f"Log file does not exist: {log_file_path}")
            return
        
        # 读取日志文件
        with open(log_path, 'r', encoding='utf-8') as file:
            # 获取上次读取的位置(如果有的话)
            position_file = f".{log_path.name}.position"
            start_pos = 0
            
            if os.path.exists(position_file):
                with open(position_file, 'r') as pos_file:
                    start_pos = int(pos_file.read().strip())
                file.seek(start_pos)
            
            # 读取新增的日志行
            for line in file:
                if line.strip():  # 跳过空行
                    log_entry = {
                        'timestamp': datetime.now().isoformat(),
                        'log_file': str(log_path),
                        'log_line': line.strip(),
                        'hostname': os.uname().nodename if hasattr(os, 'uname') else 'unknown'
                    }
                    
                    # 发送到Kafka
                    self.producer.send_message(self.log_topic, log_entry, key=log_path.name)
            
            # 更新位置文件
            with open(position_file, 'w') as pos_file:
                pos_file.write(str(file.tell()))
    
    def tail_log_file(self, log_file_path: str, interval: int = 1):
        """实时监控日志文件(类似tail -f)"""
        log_path = Path(log_file_path)
        
        while True:
            try:
                self.collect_log_file(log_file_path)
                time.sleep(interval)
            except KeyboardInterrupt:
                logger.info("Log collection stopped")
                break

# 使用示例
log_collector = LogCollector()
# log_collector.tail_log_file('/var/log/application.log')

4.4.2 事件驱动架构

from enum import Enum
from dataclasses import dataclass
from typing import Dict, Any, Optional
import uuid

class EventType(Enum):
    USER_REGISTERED = "user.registered"
    ORDER_CREATED = "order.created"
    PAYMENT_PROCESSED = "payment.processed"
    INVENTORY_UPDATED = "inventory.updated"

@dataclass
class Event:
    event_type: EventType
    data: Dict[str, Any]
    event_id: Optional[str] = None
    timestamp: Optional[str] = None
    source: Optional[str] = None
    
    def __post_init__(self):
        if self.event_id is None:
            self.event_id = str(uuid.uuid4())
        if self.timestamp is None:
            self.timestamp = datetime.now().isoformat()

class EventDrivenArchitecture:
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.producer = KafkaProducerWrapper(bootstrap_servers)
        self.consumer = KafkaConsumerWrapper(bootstrap_servers, 'eda-consumer-group')
        self.event_handlers = {}
    
    def register_handler(self, event_type: EventType, handler_func: Callable):
        """注册事件处理器"""
        if event_type not in self.event_handlers:
            self.event_handlers[event_type] = []
        self.event_handlers[event_type].append(handler_func)
        logger.info(f"Registered handler for event type: {event_type.value}")
    
    def publish_event(self, event: Event):
        """发布事件"""
        event_payload = {
            'event_type': event.event_type.value,
            'event_id': event.event_id,
            'data': event.data,
            'timestamp': event.timestamp,
            'source': event.source
        }
        
        # 根据事件类型发送到不同Topic
        topic = f"events.{event.event_type.value.replace('.', '-')}"
        self.producer.send_message(topic, event_payload, key=event.event_id)
        
        logger.info(f"Published event: {event.event_type.value} with ID: {event.event_id}")
    
    def start_event_processing(self, topics: list):
        """开始处理事件"""
        self.consumer.subscribe_topics(topics)
        
        logger.info(f"Starting event processing for topics: {topics}")
        
        try:
            while True:
                for message in self.consumer.consume_messages():
                    try:
                        event_type_str = message['value'].get('event_type')
                        event_type = EventType(event_type_str.replace('-', '.'))
                        
                        # 调用对应的处理器
                        if event_type in self.event_handlers:
                            for handler in self.event_handlers[event_type]:
                                try:
                                    handler(message['value'])
                                except Exception as e:
                                    logger.error(f"Error in handler for {event_type}: {e}")
                        else:
                            logger.warning(f"No handler registered for event type: {event_type_str}")
                            
                    except Exception as e:
                        logger.error(f"Error processing message: {e}")
                        
        except KeyboardInterrupt:
            logger.info("Event processing stopped")
        finally:
            self.producer.close()
            self.consumer.close()

# 事件处理器示例
def user_registered_handler(event_data: Dict):
    """处理用户注册事件"""
    user_info = event_data['data']
    logger.info(f"New user registered: {user_info.get('username', 'Unknown')}")
    
    # 发送欢迎邮件事件
    event_arch = EventDrivenArchitecture()
    welcome_event = Event(
        event_type=EventType.INVENTORY_UPDATED,
        data={
            'user_id': user_info.get('id'),
            'action': 'send_welcome_email',
            'template': 'welcome'
        },
        source='user-registration-handler'
    )
    # 注意:这里应该使用一个新的实例或改进设计避免循环依赖

def order_created_handler(event_data: Dict):
    """处理订单创建事件"""
    order_info = event_data['data']
    logger.info(f"New order created: {order_info.get('order_id')}")
    
    # 更新库存
    inventory_event = Event(
        event_type=EventType.INVENTORY_UPDATED,
        data={
            'product_id': order_info.get('product_id'),
            'quantity_change': -order_info.get('quantity', 0),
            'order_id': order_info.get('order_id')
        },
        source='order-created-handler'
    )
    # 发布库存更新事件

# 注册处理器
eda = EventDrivenArchitecture()
eda.register_handler(EventType.USER_REGISTERED, user_registered_handler)
eda.register_handler(EventType.ORDER_CREATED, order_created_handler)

4.4.3 流处理系统

from collections import defaultdict, deque
import threading
import time

class StreamProcessor:
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.producer = KafkaProducerWrapper(bootstrap_servers)
        self.consumer = KafkaConsumerWrapper(bootstrap_servers, 'stream-processor-group')
        self.aggregations = defaultdict(lambda: defaultdict(int))
        self.window_size = 60  # 60秒窗口
        self.processors = {}
    
    def register_aggregation(self, topic: str, field: str, agg_func: str = 'count'):
        """注册聚合函数"""
        key = f"{topic}.{field}.{agg_func}"
        self.processors[key] = {
            'topic': topic,
            'field': field,
            'agg_func': agg_func,
            'window': deque(),  # 滑动窗口
            'current_value': 0
        }
    
    def process_stream(self, input_topic: str, output_topic: str):
        """处理流数据"""
        self.consumer.subscribe_topics([input_topic])
        
        logger.info(f"Starting stream processing from {input_topic} to {output_topic}")
        
        try:
            for message in self.consumer.consume_messages():
                data = message['value']
                
                # 处理聚合
                for key, processor in self.processors.items():
                    if processor['topic'] == input_topic:
                        field_value = data.get(processor['field'])
                        
                        if processor['agg_func'] == 'count':
                            processor['current_value'] += 1
                        elif processor['agg_func'] == 'sum' and isinstance(field_value, (int, float)):
                            processor['current_value'] += field_value
                        elif processor['agg_func'] == 'avg' and isinstance(field_value, (int, float)):
                            # 简单平均值计算
                            count = processor.get('count', 0) + 1
                            total = processor.get('total', 0) + field_value
                            processor['current_value'] = total / count
                            processor['count'] = count
                            processor['total'] = total
                        
                        # 添加到窗口
                        timestamp = time.time()
                        processor['window'].append((timestamp, field_value))
                        
                        # 清理过期窗口数据
                        while (processor['window'] and 
                               timestamp - processor['window'][0][0] > self.window_size):
                            processor['window'].popleft()
                        
                        # 发送聚合结果
                        agg_result = {
                            'timestamp': datetime.now().isoformat(),
                            'aggregation_type': processor['agg_func'],
                            'field': processor['field'],
                            'value': processor['current_value'],
                            'window_size_seconds': self.window_size
                        }
                        
                        self.producer.send_message(output_topic, agg_result)
                        
        except KeyboardInterrupt:
            logger.info("Stream processing stopped")
        finally:
            self.producer.close()
            self.consumer.close()

# 使用示例:实时统计系统
stream_processor = StreamProcessor()

# 注册各种聚合
stream_processor.register_aggregation('user-actions', 'user_id', 'count')  # 用户行为计数
stream_processor.register_aggregation('sales', 'amount', 'sum')  # 销售总额
stream_processor.register_aggregation('ratings', 'score', 'avg')  # 平均评分

# 开始处理
# stream_processor.process_stream('user-actions', 'analytics-results')

5. 性能优化

5.1 生产者性能优化

class OptimizedKafkaProducer:
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            # 批量发送优化
            linger_ms=5,  # 延迟5ms等待更多消息组成批次
            batch_size=16384,  # 批量大小16KB
            
            # 压缩优化
            compression_type='snappy',  # 使用snappy压缩
            
            # 确认机制
            acks='1',  # 只需leader确认,提高吞吐量
            retries=3,
            
            # 内存优化
            buffer_memory=33554432,  # 32MB缓冲区
            
            # 序列化优化
            value_serializer=lambda v: json.dumps(v, default=str).encode('utf-8'),
            key_serializer=lambda k: k.encode('utf-8') if k else None,
            
            # 连接优化
            max_block_ms=5000
        )
    
    def send_batch_optimized(self, topic: str, messages: list):
        """优化的批量发送"""
        futures = []
        for msg in messages:
            future = self.producer.send(topic, value=msg)
            futures.append(future)
        
        # 等待所有消息发送完成
        for future in futures:
            try:
                record_metadata = future.get(timeout=10)
                logger.debug(f'Message sent to {record_metadata.topic}[{record_metadata.partition}] at offset {record_metadata.offset}')
            except Exception as e:
                logger.error(f'Failed to send message: {e}')
        
        # 刷新缓冲区
        self.producer.flush()

5.2 消费者性能优化

class OptimizedKafkaConsumer:
    def __init__(self, bootstrap_servers=['localhost:9092'], group_id='optimized-group'):
        self.consumer = KafkaConsumer(
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            
            # 性能优化配置
            max_poll_records=1000,  # 每次poll最大记录数
            max_poll_interval_ms=300000,  # 最大poll间隔5分钟
            fetch_min_bytes=1024,  # 最小fetch字节数
            fetch_max_bytes=52428800,  # 最大fetch字节数50MB
            fetch_max_wait_ms=500,  # 最大等待时间500ms
            
            # 反序列化优化
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda k: k.decode('utf-8') if k else None,
            
            # 偏移量管理
            enable_auto_commit=False,  # 手动提交以控制时机
            auto_commit_interval_ms=5000,  # 5秒提交一次
            
            # 重平衡优化
            session_timeout_ms=30000,  # 会话超时30秒
            heartbeat_interval_ms=10000  # 心跳间隔10秒
        )
    
    def optimized_consume(self, topics: list, batch_processor: Callable):
        """优化的批量消费"""
        self.consumer.subscribe(topics)
        
        while True:
            try:
                # 批量拉取消息
                messages = self.consumer.poll(
                    timeout_ms=1000,
                    max_records=1000
                )
                
                batch = []
                for tp, records in messages.items():
                    for record in records:
                        batch.append({
                            'topic': record.topic,
                            'partition': record.partition,
                            'offset': record.offset,
                            'key': record.key,
                            'value': record.value,
                            'timestamp': record.timestamp
                        })
                
                if batch:
                    # 批量处理
                    batch_processor(batch)
                    
                    # 手动提交偏移量
                    self.consumer.commit()
                
            except Exception as e:
                logger.error(f"Error in optimized consume: {e}")
                time.sleep(1)  # 出错时短暂休眠

5.3 Topic分区策略

def create_optimized_topic(admin_client, topic_name: str, num_partitions: int, replication_factor: int = 1):
    """创建优化的Topic"""
    from kafka.admin import NewTopic
    
    # 根据业务需求选择分区数
    # 一般原则:分区数 = 吞吐量需求 / 单分区吞吐量
    topic = NewTopic(
        name=topic_name,
        num_partitions=num_partitions,
        replication_factor=replication_factor,
        topic_configs={
            'retention.ms': '604800000',  # 保留7天
            'retention.bytes': '-1',  # 无限大小
            'segment.bytes': '1073741824',  # 段大小1GB
            'segment.ms': '604800000',  # 段时间7天
            'cleanup.policy': 'delete',  # 删除策略
            'compression.type': 'snappy'  # 压缩类型
        }
    )
    
    admin_client.create_topics([topic])
    logger.info(f"Created optimized topic: {topic_name} with {num_partitions} partitions")

6. 监控与运维

6.1 使用Kafka Manager

import requests
import json
from typing import Dict, List

class KafkaManagerAPI:
    def __init__(self, manager_url: str = 'http://localhost:9000'):
        self.manager_url = manager_url
    
    def get_clusters(self) -> List[Dict]:
        """获取集群列表"""
        response = requests.get(f"{self.manager_url}/clusters")
        return response.json()
    
    def get_topics(self, cluster_name: str) -> List[Dict]:
        """获取Topic列表"""
        response = requests.get(f"{self.manager_url}/clusters/{cluster_name}/topics")
        return response.json()
    
    def get_consumers(self, cluster_name: str) -> List[Dict]:
        """获取消费者组列表"""
        response = requests.get(f"{self.manager_url}/clusters/{cluster_name}/consumers")
        return response.json()
    
    def get_brokers(self, cluster_name: str) -> List[Dict]:
        """获取Broker列表"""
        response = requests.get(f"{self.manager_url}/clusters/{cluster_name}/brokers")
        return response.json()
    
    def get_topic_partitions(self, cluster_name: str, topic_name: str) -> Dict:
        """获取Topic分区信息"""
        response = requests.get(f"{self.manager_url}/clusters/{cluster_name}/topics/{topic_name}/partitions")
        return response.json()
    
    def get_consumer_lag(self, cluster_name: str, consumer_group: str) -> Dict:
        """获取消费者滞后信息"""
        response = requests.get(f"{self.manager_url}/clusters/{cluster_name}/consumers/{consumer_group}/lag")
        return response.json()

# 使用示例
kafka_manager = KafkaManagerAPI()
clusters = kafka_manager.get_clusters()
print(f"Available clusters: {[c['name'] for c in clusters]}")

6.2 健康检查

def kafka_health_check(bootstrap_servers=['localhost:9092']):
    """Kafka健康检查"""
    try:
        # 创建临时生产者和消费者进行测试
        producer = KafkaProducerWrapper(bootstrap_servers)
        consumer = KafkaConsumerWrapper(bootstrap_servers, 'health-check-group')
        
        # 创建测试Topic
        test_topic = 'health-check-topic'
        
        # 发送测试消息
        test_message = {
            'health_check': True,
            'timestamp': datetime.now().isoformat(),
            'test_id': str(uuid.uuid4())
        }
        
        producer.send_message(test_topic, test_message, key='health-check')
        producer.flush()
        
        # 订阅并消费测试消息
        consumer.subscribe_topics([test_topic])
        
        # 等待消息
        timeout = time.time() + 10  # 10秒超时
        message_received = False
        
        while time.time() < timeout and not message_received:
            for msg in consumer.consume_messages():
                if msg['value'].get('health_check') and msg['value'].get('test_id') == test_message['test_id']:
                    message_received = True
                    break
            time.sleep(0.1)
        
        # 清理资源
        producer.close()
        consumer.close()
        
        if message_received:
            logger.info("Kafka health check: PASSED")
            return True
        else:
            logger.error("Kafka health check: FAILED - Did not receive test message")
            return False
            
    except Exception as e:
        logger.error(f"Kafka health check: FAILED - {str(e)}")
        return False

# 执行健康检查
is_healthy = kafka_health_check()
print(f"Kafka Health Status: {'Healthy' if is_healthy else 'Unhealthy'}")

6.3 性能监控

class KafkaPerformanceMonitor:
    def __init__(self, bootstrap_servers=['localhost:9092']):
        self.bootstrap_servers = bootstrap_servers
        self.admin_client = None
    
    def setup_admin_client(self):
        """设置管理客户端"""
        from kafka.admin import KafkaAdminClient
        self.admin_client = KafkaAdminClient(bootstrap_servers=self.bootstrap_servers)
    
    def get_broker_metrics(self) -> Dict:
        """获取Broker指标"""
        # 这里需要使用JMX或其他监控工具来获取详细指标
        # 简化示例
        return {
            'broker_count': len(self.bootstrap_servers),
            'active_controllers': 1,
            'offline_partitions': 0,
            'under_replicated_partitions': 0
        }
    
    def get_topic_metrics(self, topic_name: str) -> Dict:
        """获取Topic指标"""
        if not self.admin_client:
            self.setup_admin_client()
        
        # 获取Topic描述信息
        from kafka.admin import ConfigResource, ConfigResourceType
        config_resources = [ConfigResource(ConfigResourceType.TOPIC, topic_name)]
        configs = self.admin_client.describe_configs(config_resources)
        
        return {
            'topic_name': topic_name,
            'configs': configs,
            'partition_count': self.get_partition_count(topic_name)
        }
    
    def get_partition_count(self, topic_name: str) -> int:
        """获取分区数量"""
        consumer = KafkaConsumerWrapper(self.bootstrap_servers)
        partitions = consumer.consumer.partitions_for_topic(topic_name)
        consumer.close()
        return len(partitions) if partitions else 0
    
    def get_throughput_metrics(self, topic_name: str, time_window: int = 60) -> Dict:
        """获取吞吐量指标"""
        # 这里需要实现具体的吞吐量测量逻辑
        # 可以通过监控消息生产和消费速率来计算
        return {
            'topic': topic_name,
            'time_window_seconds': time_window,
            'messages_produced': 0,
            'messages_consumed': 0,
            'throughput_msgs_per_sec': 0.0
        }

# 使用示例
monitor = KafkaPerformanceMonitor()
metrics = monitor.get_broker_metrics()
print(f"Broker Metrics: {json.dumps(metrics, indent=2)}")

7. 最佳实践

7.1 安全最佳实践

"""
Kafka安全最佳实践:
1. 网络安全:使用VPN或专用网络
2. 认证:启用SASL认证
3. 授权:配置ACLs控制访问权限
4. 加密:启用SSL/TLS传输加密
5. 审计:启用访问日志记录
6. 更新:定期更新Kafka版本
"""

# 安全配置示例
SECURE_KAFKA_CONFIG = {
    # SSL配置
    'security.protocol': 'SSL',
    'ssl.ca.location': '/path/to/ca-cert',
    'ssl.certificate.location': '/path/to/client-cert.pem',
    'ssl.key.location': '/path/to/client-key.pem',
    'ssl.key.password': 'key-password',
    
    # SASL/SCRAM配置
    # 'security.protocol': 'SASL_SSL',
    # 'sasl.mechanism': 'SCRAM-SHA-256',
    # 'sasl.username': 'kafka-client',
    # 'sasl.password': 'client-secret',
}

7.2 性能最佳实践

"""
Kafka性能最佳实践:
1. 合理分区:根据吞吐量需求设置分区数
2. 批量处理:启用批量发送和消费
3. 压缩:启用消息压缩减少网络传输
4. 序列化:使用高效的序列化格式
5. 监控:持续监控性能指标
6. 调优:根据实际负载调整配置
"""

# 性能优化配置
PERFORMANCE_OPTIMIZED_CONFIG = {
    # 生产者配置
    'linger.ms': 5,
    'batch.size': 16384,
    'compression.type': 'snappy',
    'acks': '1',
    
    # 消费者配置
    'max.poll.records': 1000,
    'fetch.min.bytes': 1024,
    'fetch.max.bytes': 52428800,
    
    # Topic配置
    'replication.factor': 3,
    'min.insync.replicas': 2,
}

7.3 应用场景总结

场景说明推荐配置
日志收集应用日志、系统日志聚合高吞吐量,压缩开启
事件溯源微服务间事件传递持久化,多副本
流处理实时数据分析低延迟,分区策略
数据集成系统间数据同步可靠性,事务支持
消息队列异步通信平衡性能和可靠性

总结

Apache Kafka是一个强大的分布式流处理平台,适用于各种实时数据处理场景。通过合理的配置、性能优化和安全措施,可以构建高可用、高性能的事件流系统。掌握Kafka的核心概念和最佳实践,能够帮助开发者构建出色的实时数据处理应用。