from kafka import KafkaProducer, KafkaConsumer
import json
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# ------------------- 生产者 -------------------
class SimpleKafkaProducer:
def __init__(self, bootstrap_servers=["localhost:9092"]):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode("utf-8"), # 自动序列化JSON
acks="1", # Leader确认即可,平衡性能和可靠性
retries=3, # 失败重试3次
linger_ms=5, # 延迟5ms凑批次,提高吞吐量
)
def send(self, topic: str, value: dict, key: str = None):
"""发送单条消息,key可以控制消息发到固定分区(比如同一用户的事件发到同一区)"""
try:
future = self.producer.send(
topic,
value=value,
key=key.encode("utf-8") if key else None
)
# 等待发送完成(可选,生产环境可以异步)
record_metadata = future.get(timeout=10)
logger.info(
f"消息发送成功:Topic={record_metadata.topic},分区={record_metadata.partition},偏移量={record_metadata.offset}"
)
except Exception as e:
logger.error(f"消息发送失败:{e}")
raise
def close(self):
self.producer.flush() # 刷新缓冲区
self.producer.close()
# ------------------- 消费者 -------------------
class SimpleKafkaConsumer:
def __init__(self, topics: list, group_id: str, bootstrap_servers=["localhost:9092"]):
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode("utf-8")), # 自动反序列化JSON
auto_offset_reset="earliest", # 新组从最早的消息读
enable_auto_commit=True, # 自动提交偏移量(生产环境建议手动)
auto_commit_interval_ms=1000,
)
def consume(self, callback):
"""循环消费,传入回调函数处理消息"""
logger.info(f"开始消费:Topics={self.consumer.subscription()},Group={self.consumer.config['group_id']}")
try:
for msg in self.consumer:
callback(msg.value)
except KeyboardInterrupt:
logger.info("消费已停止")
finally:
self.consumer.close()
# ------------------- 使用示例 -------------------
if __name__ == "__main__":
# 1. 发送订单消息
producer = SimpleKafkaProducer()
producer.send(
topic="order-events",
value={"order_id": 1001, "user_id": 123, "product_id": 456, "amount": 99.9},
key=str(123), # 同一用户的订单发到同一分区
)
producer.close()
# 2. 消费订单消息(单独开一个终端运行)
# def process_order(msg):
# logger.info(f"收到订单:{msg}")
# consumer = SimpleKafkaConsumer(topics=["order-events"], group_id="order-consumer-1")
# consumer.consume(process_order)