RabbitMQ 实战教程


什么是RabbitMQ?

RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议。它提供了可靠的消息传递、灵活的路由、可扩展性和多种消息模式,广泛应用于微服务架构、异步处理和解耦系统组件。

RabbitMQ的主要特点:

  • 可靠性:支持消息持久化、发布确认和事务
  • 灵活路由:支持多种交换机类型和路由规则
  • 可扩展性:支持集群和联邦部署
  • 多协议支持:支持AMQP、STOMP、MQTT等多种协议
  • 管理界面:提供Web管理界面和命令行工具
  • 高可用性:支持镜像队列和故障转移

1. RabbitMQ安装与配置

1.1 Docker方式安装

# 拉取RabbitMQ镜像(包含管理界面)
docker pull rabbitmq:3-management

# 运行RabbitMQ容器
docker run -d \
  --name rabbitmq-server \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=password \
  rabbitmq:3-management

# 检查RabbitMQ状态
docker logs rabbitmq-server

1.2 Docker Compose方式安装

# docker-compose.yml
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management
    container_name: rabbitmq-server
    restart: unless-stopped
    ports:
      - "5672:5672"    # AMQP端口
      - "15672:15672"  # 管理界面端口
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
      - ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
    healthcheck:
      test: ["CMD", "rabbitmqctl", "status"]
      interval: 30s
      timeout: 10s
      retries: 3

volumes:
  rabbitmq_data:

1.3 配置文件详解

# rabbitmq.conf - RabbitMQ配置文件
# 基本配置
listeners.tcp.default = 5672
management.listener.port = 15672
management.listener.ssl = false

# 内存配置
vm_memory_high_watermark.relative = 0.6
vm_memory_calculation_strategy = allocated

# 磁盘配置
disk_free_limit.relative = 2.0
disk_free_limit.absolute = 2GB

# 队列配置
default_vhost = /
default_user = admin
default_pass = password
default_permissions.configure = .*
default_permissions.read = .*
default_permissions.write = .*

# 集群配置
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2

# 高可用配置
queue_master_locator = min-masters

# 日志配置
log.file.level = info
log.file.rotation.date = $D0
log.file.rotation.size = 10485760

1.4 高可用集群配置

# docker-compose-cluster.yml
version: '3.8'

services:
  rabbitmq1:
    image: rabbitmq:3-management
    hostname: rabbit1
    container_name: rabbitmq1
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
      RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
    volumes:
      - rabbitmq1_data:/var/lib/rabbitmq

  rabbitmq2:
    image: rabbitmq:3-management
    hostname: rabbit2
    container_name: rabbitmq2
    ports:
      - "5673:5672"
      - "15673:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
      RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
    volumes:
      - rabbitmq2_data:/var/lib/rabbitmq

  rabbitmq3:
    image: rabbitmq:3-management
    hostname: rabbit3
    container_name: rabbitmq3
    ports:
      - "5674:5672"
      - "15674:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
      RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
    volumes:
      - rabbitmq3_data:/var/lib/rabbitmq

volumes:
  rabbitmq1_data:
  rabbitmq2_data:
  rabbitmq3_data:

2. RabbitMQ基础概念

2.1 核心概念

概念说明
Producer消息生产者,发送消息到Exchange
Consumer消息消费者,从Queue接收消息
Exchange交换机,接收生产者消息并路由到队列
Queue队列,存储消息等待消费
Binding绑定,Exchange和Queue之间的关联规则
Routing Key路由键,用于Exchange路由消息
Virtual Host虚拟主机,逻辑隔离的工作空间

2.2 交换机类型

# 直连交换机(Direct Exchange)
# 消息的routing key必须完全匹配binding key
rabbitmqctl set_policy direct_exchange_policy "direct_exchange.*" '{"pattern":"direct_exchange.*","definition":{"type":"direct"},"apply-to":"exchanges"}'

# 主题交换机(Topic Exchange)
# 支持通配符匹配
rabbitmqctl set_policy topic_exchange_policy "topic_exchange.*" '{"pattern":"topic_exchange.*","definition":{"type":"topic"},"apply-to":"exchanges"}'

# 扇形交换机(Fanout Exchange)
# 广播模式,忽略routing key
rabbitmqctl set_policy fanout_exchange_policy "fanout_exchange.*" '{"pattern":"fanout_exchange.*","definition":{"type":"fanout"},"apply-to":"exchanges"}'

# 头交换机(Headers Exchange)
# 基于消息头匹配
rabbitmqctl set_policy headers_exchange_policy "headers_exchange.*" '{"pattern":"headers_exchange.*","definition":{"type":"headers"},"apply-to":"exchanges"}'

3. 管理与监控

3.1 命令行管理

# 连接到RabbitMQ容器
docker exec -it rabbitmq-server bash

# 查看服务器状态
rabbitmqctl status

# 查看节点信息
rabbitmqctl cluster_status

# 查看用户列表
rabbitmqctl list_users

# 查看虚拟主机
rabbitmqctl list_vhosts

# 查看队列信息
rabbitmqctl list_queues

# 查看交换机信息
rabbitmqctl list_exchanges

# 查看绑定信息
rabbitmqctl list_bindings

# 创建用户
rabbitmqctl add_user newuser newpassword

# 设置用户权限
rabbitmqctl set_permissions -p / newuser ".*" ".*" ".*"

# 创建虚拟主机
rabbitmqctl add_vhost /myhost

# 设置虚拟主机权限
rabbitmqctl set_permissions -p /myhost newuser ".*" ".*" ".*"

3.2 Web管理界面

# 访问管理界面
# http://localhost:15672
# 默认用户名/密码: guest/guest (需要在配置中启用远程访问)
# 或使用上面配置的 admin/password

3.3 队列管理

# 创建队列
rabbitmqadmin declare queue name=my_queue durable=true

# 创建交换机
rabbitmqadmin declare exchange name=my_exchange type=direct durable=true

# 绑定交换机和队列
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key

# 发送消息
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello World"

# 获取消息
rabbitmqadmin get queue=my_queue ackmode=ack_requeue_false

4. Python与RabbitMQ集成

4.1 安装pika客户端

pip install pika
pip install aio-pika  # 异步支持

4.2 基本连接与操作

import pika
import json
import logging
from typing import Callable, Any
import time

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class RabbitMQConnection:
    def __init__(self, host='localhost', port=5672, username='admin', password='password', virtual_host='/'):
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.virtual_host = virtual_host
        self.connection = None
        self.channel = None
    
    def connect(self):
        """建立连接"""
        credentials = pika.PlainCredentials(self.username, self.password)
        parameters = pika.ConnectionParameters(
            host=self.host,
            port=self.port,
            virtual_host=self.virtual_host,
            credentials=credentials,
            heartbeat=600,
            blocked_connection_timeout=300
        )
        
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        logger.info(f"Connected to RabbitMQ at {self.host}:{self.port}")
    
    def disconnect(self):
        """断开连接"""
        if self.connection and not self.connection.is_closed:
            self.connection.close()
            logger.info("Disconnected from RabbitMQ")
    
    def declare_queue(self, queue_name: str, durable=True, auto_delete=False, exclusive=False):
        """声明队列"""
        self.channel.queue_declare(
            queue=queue_name,
            durable=durable,
            auto_delete=auto_delete,
            exclusive=exclusive
        )
        logger.info(f"Declared queue: {queue_name}")
    
    def declare_exchange(self, exchange_name: str, exchange_type: str = 'direct', durable=True):
        """声明交换机"""
        self.channel.exchange_declare(
            exchange=exchange_name,
            exchange_type=exchange_type,
            durable=durable
        )
        logger.info(f"Declared exchange: {exchange_name}, type: {exchange_type}")
    
    def bind_queue(self, queue_name: str, exchange_name: str, routing_key: str):
        """绑定队列到交换机"""
        self.channel.queue_bind(
            queue=queue_name,
            exchange=exchange_name,
            routing_key=routing_key
        )
        logger.info(f"Bound queue {queue_name} to exchange {exchange_name} with routing key {routing_key}")

# 发送消息
def send_message(queue_name: str, message: dict, routing_key: str = ''):
    """发送消息到队列或交换机"""
    rmq = RabbitMQConnection()
    try:
        rmq.connect()
        
        # 声明队列(如果不存在)
        rmq.declare_queue(queue_name)
        
        # 发送消息
        message_json = json.dumps(message)
        rmq.channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=message_json,
            properties=pika.BasicProperties(
                delivery_mode=2,  # 消息持久化
            )
        )
        
        logger.info(f"Message sent to queue {queue_name}: {message_json}")
        
    finally:
        rmq.disconnect()

# 消费消息
def consume_messages(queue_name: str, callback_func: Callable[[Any], None]):
    """消费队列中的消息"""
    rmq = RabbitMQConnection()
    try:
        rmq.connect()
        
        # 声明队列
        rmq.declare_queue(queue_name)
        
        # 设置QoS(服务质量)
        rmq.channel.basic_qos(prefetch_count=1)
        
        def callback(ch, method, properties, body):
            try:
                message = json.loads(body.decode('utf-8'))
                logger.info(f"Received message: {message}")
                
                # 调用回调函数处理消息
                callback_func(message)
                
                # 手动确认消息
                ch.basic_ack(delivery_tag=method.delivery_tag)
                logger.info("Message acknowledged")
                
            except Exception as e:
                logger.error(f"Error processing message: {e}")
                # 拒绝消息并重新排队
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        
        # 开始消费
        rmq.channel.basic_consume(queue=queue_name, on_message_callback=callback)
        logger.info(f"Started consuming messages from queue {queue_name}")
        
        # 开始消费循环
        rmq.channel.start_consuming()
        
    except KeyboardInterrupt:
        rmq.channel.stop_consuming()
        logger.info("Stopped consuming messages")
    finally:
        rmq.disconnect()

4.3 高级功能

# 发布-订阅模式
def publish_message(exchange_name: str, message: dict, routing_key: str, exchange_type: str = 'direct'):
    """发布消息到交换机"""
    rmq = RabbitMQConnection()
    try:
        rmq.connect()
        
        # 声明交换机
        rmq.declare_exchange(exchange_name, exchange_type)
        
        # 发布消息
        message_json = json.dumps(message)
        rmq.channel.basic_publish(
            exchange=exchange_name,
            routing_key=routing_key,
            body=message_json,
            properties=pika.BasicProperties(
                delivery_mode=2,  # 持久化
                content_type='application/json',
                timestamp=int(time.time())
            )
        )
        
        logger.info(f"Published message to exchange {exchange_name} with routing key {routing_key}")
        
    finally:
        rmq.disconnect()

# RPC模式
class RPCServer:
    def __init__(self, queue_name: str):
        self.queue_name = queue_name
        self.connection = None
        self.channel = None
        self.process_function = None
    
    def connect(self):
        credentials = pika.PlainCredentials('admin', 'password')
        parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue=self.queue_name, durable=True)
        self.channel.basic_qos(prefetch_count=1)
    
    def register_process_function(self, func: Callable):
        """注册处理函数"""
        self.process_function = func
    
    def start_server(self):
        """启动RPC服务器"""
        self.connect()
        
        def on_request(ch, method, props, body):
            response = None
            try:
                # 解析请求
                request = json.loads(body.decode('utf-8'))
                
                # 调用处理函数
                if self.process_function:
                    response = self.process_function(request)
                else:
                    response = {"error": "No process function registered"}
                
            except Exception as e:
                response = {"error": str(e)}
                logger.error(f"Error in RPC server: {e}")
            
            # 发送响应
            ch.basic_publish(
                exchange='',
                routing_key=props.reply_to,
                properties=pika.BasicProperties(correlation_id=props.correlation_id),
                body=json.dumps(response)
            )
            
            # 确认原始消息
            ch.basic_ack(delivery_tag=method.delivery_tag)
        
        self.channel.basic_consume(queue=self.queue_name, on_message_callback=on_request)
        logger.info(f"RPC Server started, waiting for requests on queue {self.queue_name}")
        self.channel.start_consuming()

class RPCClient:
    def __init__(self):
        self.connection = None
        self.channel = None
        self.response = None
        self.correlation_id = None
        self.properties = None
    
    def call(self, queue_name: str, message: dict) -> dict:
        """调用RPC服务"""
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host='localhost')
        )
        self.channel = self.connection.channel()
        
        # 声明回调队列
        result = self.channel.queue_declare(queue='', exclusive=True)
        callback_queue = result.method.queue
        
        self.correlation_id = str(time.time())
        
        self.channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            properties=pika.BasicProperties(
                reply_to=callback_queue,
                correlation_id=self.correlation_id
            ),
            body=json.dumps(message)
        )
        
        # 等待响应
        self.channel.basic_consume(
            queue=callback_queue,
            on_message_callback=self.on_response,
            auto_ack=True
        )
        
        while self.response is None:
            self.connection.process_data_events(time_limit=1)
        
        self.connection.close()
        return self.response
    
    def on_response(self, ch, method, props, body):
        """处理响应"""
        if self.correlation_id == props.correlation_id:
            self.response = json.loads(body.decode('utf-8'))

# 消息确认和持久化
def send_persistent_message(queue_name: str, message: dict):
    """发送持久化消息"""
    rmq = RabbitMQConnection()
    try:
        rmq.connect()
        
        # 声明持久化队列
        rmq.channel.queue_declare(queue=queue_name, durable=True)
        
        # 发送持久化消息
        rmq.channel.basic_publish(
            exchange='',
            routing_key=queue_name,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # 消息持久化
                headers={'type': 'persistent_message'}
            )
        )
        
        logger.info(f"Persistent message sent to {queue_name}")
        
    finally:
        rmq.disconnect()

4.4 实际应用场景

4.4.1 任务队列系统

import time
import threading
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
from typing import Optional

class TaskStatus(Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    id: str
    name: str
    payload: dict
    priority: int = 1
    created_at: str = None
    status: TaskStatus = TaskStatus.PENDING
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now().isoformat()

class TaskQueueSystem:
    def __init__(self, connection_params: dict = None):
        self.connection_params = connection_params or {
            'host': 'localhost',
            'port': 5672,
            'username': 'admin',
            'password': 'password',
            'virtual_host': '/'
        }
        self.connection = None
        self.channel = None
        self.task_queue = 'task_queue'
        self.result_queue = 'result_queue'
        self.setup_queues()
    
    def setup_queues(self):
        """设置任务队列和结果队列"""
        credentials = pika.PlainCredentials(
            self.connection_params['username'], 
            self.connection_params['password']
        )
        parameters = pika.ConnectionParameters(
            host=self.connection_params['host'],
            port=self.connection_params['port'],
            virtual_host=self.connection_params['virtual_host'],
            credentials=credentials
        )
        
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        
        # 声明队列(持久化)
        self.channel.queue_declare(queue=self.task_queue, durable=True)
        self.channel.queue_declare(queue=self.result_queue, durable=True)
        
        # 设置QoS
        self.channel.basic_qos(prefetch_count=1)
        
        logger.info("Task queues initialized")
    
    def submit_task(self, task: Task) -> str:
        """提交任务到队列"""
        task_dict = {
            'id': task.id,
            'name': task.name,
            'payload': task.payload,
            'priority': task.priority,
            'created_at': task.created_at,
            'status': task.status.value
        }
        
        self.channel.basic_publish(
            exchange='',
            routing_key=self.task_queue,
            body=json.dumps(task_dict),
            properties=pika.BasicProperties(
                delivery_mode=2,  # 持久化
                priority=task.priority  # 优先级
            )
        )
        
        logger.info(f"Task submitted: {task.id} - {task.name}")
        return task.id
    
    def process_tasks(self, processor_func: Callable[[Task], Any]):
        """处理队列中的任务"""
        def callback(ch, method, properties, body):
            try:
                task_data = json.loads(body.decode('utf-8'))
                task = Task(
                    id=task_data['id'],
                    name=task_data['name'],
                    payload=task_data['payload'],
                    priority=task_data['priority'],
                    created_at=task_data['created_at']
                )
                
                logger.info(f"Processing task: {task.id}")
                
                # 更新任务状态为处理中
                task.status = TaskStatus.PROCESSING
                
                try:
                    # 执行任务处理函数
                    result = processor_func(task)
                    
                    # 任务完成
                    task.status = TaskStatus.COMPLETED
                    
                    # 发送结果
                    result_data = {
                        'task_id': task.id,
                        'status': task.status.value,
                        'result': result,
                        'processed_at': datetime.now().isoformat()
                    }
                    
                    self.channel.basic_publish(
                        exchange='',
                        routing_key=self.result_queue,
                        body=json.dumps(result_data),
                        properties=pika.BasicProperties(delivery_mode=2)
                    )
                    
                    logger.info(f"Task completed: {task.id}")
                    
                except Exception as e:
                    # 任务失败
                    task.status = TaskStatus.FAILED
                    error_data = {
                        'task_id': task.id,
                        'status': task.status.value,
                        'error': str(e),
                        'processed_at': datetime.now().isoformat()
                    }
                    
                    self.channel.basic_publish(
                        exchange='',
                        routing_key=self.result_queue,
                        body=json.dumps(error_data),
                        properties=pika.BasicProperties(delivery_mode=2)
                    )
                    
                    logger.error(f"Task failed: {task.id}, error: {e}")
                
                # 确认消息
                ch.basic_ack(delivery_tag=method.delivery_tag)
                
            except Exception as e:
                logger.error(f"Error processing task message: {e}")
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        
        self.channel.basic_consume(queue=self.task_queue, on_message_callback=callback)
        logger.info("Starting to process tasks...")
        self.channel.start_consuming()
    
    def get_results(self, timeout: int = 30) -> list:
        """获取处理结果"""
        results = []
        
        def result_callback(ch, method, properties, body):
            result_data = json.loads(body.decode('utf-8'))
            results.append(result_data)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        
        # 临时消费结果队列
        self.channel.basic_consume(queue=self.result_queue, on_message_callback=result_callback)
        
        # 消费一段时间
        start_time = time.time()
        while time.time() - start_time < timeout and len(results) == 0:
            self.connection.process_data_events(time_limit=1)
        
        return results

# 使用示例
def example_task_processor(task: Task) -> dict:
    """示例任务处理器"""
    logger.info(f"Processing task {task.id}: {task.name}")
    
    # 模拟任务处理
    time.sleep(2)  # 模拟耗时操作
    
    # 根据任务类型执行不同操作
    if task.name == "calculate_sum":
        numbers = task.payload.get('numbers', [])
        result = sum(numbers)
    elif task.name == "process_data":
        data = task.payload.get('data', {})
        result = {"processed": True, "count": len(data)}
    else:
        result = {"default": "processed", "task_id": task.id}
    
    return result

# 创建任务队列系统
task_system = TaskQueueSystem()

# 提交示例任务
import uuid
task1 = Task(
    id=str(uuid.uuid4()),
    name="calculate_sum",
    payload={"numbers": [1, 2, 3, 4, 5]},
    priority=2
)

task2 = Task(
    id=str(uuid.uuid4()),
    name="process_data",
    payload={"data": {"key1": "value1", "key2": "value2"}},
    priority=1
)

task_system.submit_task(task1)
task_system.submit_task(task2)

print("Tasks submitted successfully")

4.4.2 事件驱动系统

from enum import Enum
import uuid
from datetime import datetime

class EventType(Enum):
    USER_REGISTERED = "user_registered"
    ORDER_CREATED = "order_created"
    PAYMENT_PROCESSED = "payment_processed"
    NOTIFICATION_SENT = "notification_sent"

@dataclass
class Event:
    event_type: EventType
    data: dict
    timestamp: str = None
    event_id: str = None
    
    def __post_init__(self):
        if self.timestamp is None:
            self.timestamp = datetime.now().isoformat()
        if self.event_id is None:
            self.event_id = str(uuid.uuid4())

class EventBus:
    def __init__(self):
        self.connection = None
        self.channel = None
        self.exchange_name = 'event_bus'
        self.setup_event_bus()
    
    def setup_event_bus(self):
        """设置事件总线(使用主题交换机)"""
        credentials = pika.PlainCredentials('admin', 'password')
        parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
        
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        
        # 声明主题交换机
        self.channel.exchange_declare(
            exchange=self.exchange_name,
            exchange_type='topic',
            durable=True
        )
        
        logger.info("Event bus initialized")
    
    def publish_event(self, event: Event):
        """发布事件"""
        event_data = {
            'event_type': event.event_type.value,
            'data': event.data,
            'timestamp': event.timestamp,
            'event_id': event.event_id
        }
        
        routing_key = f"event.{event.event_type.value}"
        
        self.channel.basic_publish(
            exchange=self.exchange_name,
            routing_key=routing_key,
            body=json.dumps(event_data),
            properties=pika.BasicProperties(
                delivery_mode=2,
                timestamp=int(datetime.now().timestamp())
            )
        )
        
        logger.info(f"Event published: {event.event_type.value} - {event.event_id}")
    
    def subscribe_to_events(self, pattern: str, handler_func: Callable[[Event], None]):
        """订阅特定模式的事件"""
        # 创建临时队列
        result = self.channel.queue_declare(queue='', exclusive=True)
        queue_name = result.method.queue
        
        # 绑定队列到交换机
        self.channel.queue_bind(
            exchange=self.exchange_name,
            queue=queue_name,
            routing_key=pattern  # 支持通配符,如 "event.user.*"
        )
        
        def callback(ch, method, properties, body):
            try:
                event_data = json.loads(body.decode('utf-8'))
                event = Event(
                    event_type=EventType(event_data['event_type']),
                    data=event_data['data'],
                    timestamp=event_data['timestamp'],
                    event_id=event_data['event_id']
                )
                
                # 处理事件
                handler_func(event)
                
                # 确认消息
                ch.basic_ack(delivery_tag=method.delivery_tag)
                
            except Exception as e:
                logger.error(f"Error handling event: {e}")
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        
        self.channel.basic_consume(queue=queue_name, on_message_callback=callback)
        logger.info(f"Subscribed to events with pattern: {pattern}")
        
        try:
            self.channel.start_consuming()
        except KeyboardInterrupt:
            self.channel.stop_consuming()

# 事件处理器示例
def user_event_handler(event: Event):
    """处理用户相关事件"""
    if event.event_type == EventType.USER_REGISTERED:
        user_data = event.data
        logger.info(f"New user registered: {user_data.get('username')}")
        
        # 发送欢迎邮件事件
        welcome_event = Event(
            event_type=EventType.NOTIFICATION_SENT,
            data={
                'user_id': user_data.get('user_id'),
                'type': 'welcome_email',
                'recipient': user_data.get('email')
            }
        )
        
        event_bus = EventBus()
        event_bus.publish_event(welcome_event)

def order_event_handler(event: Event):
    """处理订单相关事件"""
    if event.event_type == EventType.ORDER_CREATED:
        order_data = event.data
        logger.info(f"New order created: {order_data.get('order_id')}")
        
        # 发布支付事件
        payment_event = Event(
            event_type=EventType.PAYMENT_PROCESSED,
            data={
                'order_id': order_data.get('order_id'),
                'amount': order_data.get('total'),
                'status': 'pending'
            }
        )
        
        event_bus = EventBus()
        event_bus.publish_event(payment_event)

# 使用示例
# 发布用户注册事件
event_bus = EventBus()
user_registration_event = Event(
    event_type=EventType.USER_REGISTERED,
    data={
        'user_id': 'user_123',
        'username': 'john_doe',
        'email': 'john@example.com',
        'registration_date': datetime.now().isoformat()
    }
)

event_bus.publish_event(user_registration_event)
print("User registration event published")

4.4.3 消息重试机制

import time
from typing import Optional

class RetryableMessageQueue:
    def __init__(self, max_retries: int = 3, retry_delay: float = 1.0):
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.main_queue = 'main_queue'
        self.retry_queue = 'retry_queue'
        self.dead_letter_queue = 'dead_letter_queue'
        self.setup_queues()
    
    def setup_queues(self):
        """设置主队列、重试队列和死信队列"""
        credentials = pika.PlainCredentials('admin', 'password')
        parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
        
        self.connection = pika.BlockingConnection(parameters)
        self.channel = self.connection.channel()
        
        # 声明死信队列
        self.channel.queue_declare(queue=self.dead_letter_queue, durable=True)
        
        # 声明重试队列,设置死信交换机
        self.channel.queue_declare(
            queue=self.retry_queue,
            durable=True,
            arguments={
                'x-message-ttl': int(self.retry_delay * 1000),  # TTL毫秒
                'x-dead-letter-exchange': '',  # 死信交换机
                'x-dead-letter-routing-key': self.main_queue  # 死信路由键
            }
        )
        
        # 声明主队列
        self.channel.queue_declare(queue=self.main_queue, durable=True)
        
        # 设置QoS
        self.channel.basic_qos(prefetch_count=1)
        
        logger.info("Retryable message queues initialized")
    
    def send_message(self, message: dict, delay_seconds: Optional[float] = None):
        """发送消息,可选择延迟"""
        target_queue = self.main_queue
        if delay_seconds:
            # 如果指定了延迟,发送到重试队列(通过TTL实现延迟)
            target_queue = self.retry_queue
            message['retry_info'] = {
                'original_queue': self.main_queue,
                'delay': delay_seconds,
                'retries': 0
            }
        
        self.channel.basic_publish(
            exchange='',
            routing_key=target_queue,
            body=json.dumps(message),
            properties=pika.BasicProperties(delivery_mode=2)
        )
        
        logger.info(f"Message sent to {target_queue}")
    
    def process_message_with_retry(self, processor_func: Callable[[dict], bool]):
        """处理消息,支持重试机制"""
        def callback(ch, method, properties, body):
            message = json.loads(body.decode('utf-8'))
            
            # 检查是否是重试消息
            retry_info = message.get('retry_info', {})
            current_retry = retry_info.get('retries', 0)
            
            try:
                # 执行处理函数
                success = processor_func(message)
                
                if success:
                    # 处理成功,确认消息
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                    logger.info("Message processed successfully")
                else:
                    # 处理失败,根据重试次数决定是否重试
                    if current_retry < self.max_retries:
                        # 增加重试次数
                        retry_info['retries'] = current_retry + 1
                        message['retry_info'] = retry_info
                        
                        # 发送到重试队列
                        self.channel.basic_publish(
                            exchange='',
                            routing_key=self.retry_queue,
                            body=json.dumps(message),
                            properties=pika.BasicProperties(delivery_mode=2)
                        )
                        
                        # 确认原始消息
                        ch.basic_ack(delivery_tag=method.delivery_tag)
                        logger.warning(f"Message failed, retrying... (attempt {current_retry + 1})")
                    else:
                        # 达到最大重试次数,发送到死信队列
                        message['error'] = 'Max retries exceeded'
                        self.channel.basic_publish(
                            exchange='',
                            routing_key=self.dead_letter_queue,
                            body=json.dumps(message),
                            properties=pika.BasicProperties(delivery_mode=2)
                        )
                        
                        # 确认原始消息
                        ch.basic_ack(delivery_tag=method.delivery_tag)
                        logger.error(f"Message failed after {self.max_retries} retries, moved to dead letter queue")
                        
            except Exception as e:
                logger.error(f"Error processing message: {e}")
                # 同样应用重试逻辑
                if current_retry < self.max_retries:
                    retry_info['retries'] = current_retry + 1
                    message['retry_info'] = retry_info
                    
                    self.channel.basic_publish(
                        exchange='',
                        routing_key=self.retry_queue,
                        body=json.dumps(message),
                        properties=pika.BasicProperties(delivery_mode=2)
                    )
                    ch.basic_ack(delivery_tag=method.delivery_tag)
                else:
                    message['error'] = str(e)
                    self.channel.basic_publish(
                        exchange='',
                        routing_key=self.dead_letter_queue,
                        body=json.dumps(message),
                        properties=pika.BasicProperties(delivery_mode=2)
                    )
                    ch.basic_ack(delivery_tag=method.delivery_tag)
        
        self.channel.basic_consume(queue=self.main_queue, on_message_callback=callback)
        logger.info("Starting to process messages with retry mechanism...")
        self.channel.start_consuming()

# 使用示例:处理可能失败的消息
def unreliable_processor(message: dict) -> bool:
    """模拟可能失败的处理器"""
    import random
    
    # 模拟处理失败的情况
    if random.random() < 0.7:  # 70% 的概率失败
        logger.error("Processor failed randomly")
        return False
    
    logger.info(f"Successfully processed message: {message}")
    return True

# 创建带重试的消息队列
retry_queue = RetryableMessageQueue(max_retries=3, retry_delay=5.0)

# 发送测试消息
test_message = {
    'id': str(uuid.uuid4()),
    'content': 'Test message that might fail',
    'timestamp': datetime.now().isoformat()
}

retry_queue.send_message(test_message)
print("Test message sent with retry capability")

5. 性能优化

5.1 连接池管理

import threading
from queue import Queue
import time

class RabbitMQConnectionPool:
    def __init__(self, max_connections=10, host='localhost', port=5672, **credentials):
        self.max_connections = max_connections
        self.host = host
        self.port = port
        self.credentials = credentials
        self.pool = Queue(maxsize=max_connections)
        self.active_connections = 0
        self.lock = threading.Lock()
        
        # 预创建连接
        for _ in range(max_connections):
            conn = self._create_connection()
            if conn:
                self.pool.put(conn)
    
    def _create_connection(self):
        """创建新的RabbitMQ连接"""
        try:
            credentials = pika.PlainCredentials(
                self.credentials.get('username', 'guest'),
                self.credentials.get('password', 'guest')
            )
            parameters = pika.ConnectionParameters(
                host=self.host,
                port=self.port,
                credentials=credentials
            )
            
            connection = pika.BlockingConnection(parameters)
            return connection
        except Exception as e:
            logger.error(f"Failed to create connection: {e}")
            return None
    
    def get_connection(self):
        """获取连接"""
        try:
            return self.pool.get(timeout=5)
        except:
            # 如果池为空,创建新连接(不超过最大限制)
            with self.lock:
                if self.active_connections < self.max_connections:
                    conn = self._create_connection()
                    if conn:
                        self.active_connections += 1
                        return conn
            raise Exception("No available connections")
    
    def return_connection(self, connection):
        """归还连接到池"""
        try:
            if not connection.is_closed:
                self.pool.put(connection, timeout=1)
            else:
                # 连接已关闭,创建新连接补充
                new_conn = self._create_connection()
                if new_conn:
                    self.pool.put(new_conn)
        except:
            # 池已满或其他问题,关闭连接
            if not connection.is_closed:
                connection.close()
            with self.lock:
                self.active_connections -= 1

5.2 批量消息处理

class BatchMessageProcessor:
    def __init__(self, batch_size=100, timeout=5.0):
        self.batch_size = batch_size
        self.timeout = timeout
        self.message_batch = []
        self.lock = threading.Lock()
        self.condition = threading.Condition(self.lock)
    
    def add_message(self, message):
        """添加消息到批处理队列"""
        with self.condition:
            self.message_batch.append(message)
            if len(self.message_batch) >= self.batch_size:
                self.condition.notify()
    
    def process_batch(self, processor_func: Callable[[list], None]):
        """处理消息批次"""
        while True:
            with self.condition:
                # 等待达到批次大小或超时
                while len(self.message_batch) < self.batch_size:
                    self.condition.wait(timeout=self.timeout)
                    if len(self.message_batch) > 0:
                        break
                
                if self.message_batch:
                    batch = self.message_batch.copy()
                    self.message_batch.clear()
                
            if batch:
                try:
                    processor_func(batch)
                    logger.info(f"Processed batch of {len(batch)} messages")
                except Exception as e:
                    logger.error(f"Error processing batch: {e}")

6. 监控与运维

6.1 使用RabbitMQ管理API

import requests
import json

class RabbitMQAdminAPI:
    def __init__(self, host='localhost', port=15672, username='admin', password='password'):
        self.base_url = f"http://{host}:{port}/api"
        self.auth = (username, password)
    
    def get_overview(self):
        """获取RabbitMQ概览"""
        response = requests.get(f"{self.base_url}/overview", auth=self.auth)
        return response.json()
    
    def get_nodes(self):
        """获取节点信息"""
        response = requests.get(f"{self.base_url}/nodes", auth=self.auth)
        return response.json()
    
    def get_queues(self, vhost='%2F'):  # %2F is URL encoded '/'
        """获取队列信息"""
        response = requests.get(f"{self.base_url}/queues/{vhost}", auth=self.auth)
        return response.json()
    
    def get_exchanges(self, vhost='%2F'):
        """获取交换机信息"""
        response = requests.get(f"{self.base_url}/exchanges/{vhost}", auth=self.auth)
        return response.json()
    
    def get_connections(self):
        """获取连接信息"""
        response = requests.get(f"{self.base_url}/connections", auth=self.auth)
        return response.json()
    
    def get_channels(self):
        """获取通道信息"""
        response = requests.get(f"{self.base_url}/channels", auth=self.auth)
        return response.json()
    
    def publish_message_via_api(self, exchange_name, routing_key, message, vhost='%2F'):
        """通过API发布消息"""
        url = f"{self.base_url}/exchanges/{vhost}/{exchange_name}/publish"
        payload = {
            "properties": {},
            "routing_key": routing_key,
            "payload": json.dumps(message),
            "payload_encoding": "string"
        }
        response = requests.post(url, json=payload, auth=self.auth)
        return response.json()

# 使用示例
admin_api = RabbitMQAdminAPI()
overview = admin_api.get_overview()
print(f"RabbitMQ Version: {overview.get('rabbitmq_version')}")
print(f"Management Version: {overview.get('management_version')}")

6.2 健康检查

def rabbitmq_health_check():
    """RabbitMQ健康检查"""
    try:
        # 测试连接
        credentials = pika.PlainCredentials('admin', 'password')
        parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
        connection = pika.BlockingConnection(parameters)
        channel = connection.channel()
        
        # 创建临时队列进行测试
        result = channel.queue_declare(queue='', exclusive=True)
        test_queue = result.method.queue
        
        # 发送测试消息
        test_message = {"health_check": True, "timestamp": time.time()}
        channel.basic_publish(
            exchange='',
            routing_key=test_queue,
            body=json.dumps(test_message)
        )
        
        # 尝试消费消息
        method, properties, body = channel.basic_get(queue=test_queue, auto_ack=True)
        
        if method:
            logger.info("RabbitMQ health check: PASSED")
            success = True
        else:
            logger.error("RabbitMQ health check: FAILED - Could not retrieve test message")
            success = False
        
        # 清理
        channel.queue_delete(queue=test_queue)
        connection.close()
        
        return success
        
    except Exception as e:
        logger.error(f"RabbitMQ health check: FAILED - {str(e)}")
        return False

# 执行健康检查
is_healthy = rabbitmq_health_check()
print(f"RabbitMQ Health Status: {'Healthy' if is_healthy else 'Unhealthy'}")

7. 最佳实践

7.1 安全最佳实践

"""
RabbitMQ安全最佳实践:
1. 使用强密码:设置复杂的用户名和密码
2. 网络隔离:使用防火墙限制访问
3. TLS加密:在生产环境中启用TLS
4. 最小权限原则:为不同应用创建专用用户
5. 访问控制:使用虚拟主机和权限控制
6. 监控审计:启用访问日志记录
7. 定期更新:及时更新RabbitMQ版本
"""

# 安全的连接配置
def create_secure_connection():
    import ssl
    
    # 使用环境变量存储凭据
    import os
    
    credentials = pika.PlainCredentials(
        os.getenv('RABBITMQ_USER', 'admin'),
        os.getenv('RABBITMQ_PASS', 'password')
    )
    
    # 配置TLS(生产环境)
    context = ssl.create_default_context()
    context.check_hostname = False
    context.verify_mode = ssl.CERT_NONE  # 生产环境应使用适当的证书验证
    
    parameters = pika.ConnectionParameters(
        host=os.getenv('RABBITMQ_HOST', 'localhost'),
        port=int(os.getenv('RABBITMQ_PORT', '5672')),
        virtual_host='/',
        credentials=credentials,
        ssl_options=pika.SSLOptions(context) if os.getenv('RABBITMQ_USE_SSL') == 'true' else None,
        heartbeat=600,
        blocked_connection_timeout=300
    )
    
    return pika.BlockingConnection(parameters)

7.2 性能最佳实践

"""
RabbitMQ性能最佳实践:
1. 消息持久化权衡:根据可靠性需求选择
2. 批量操作:减少网络往返次数
3. QoS设置:合理设置prefetch_count
4. 连接复用:避免频繁创建/销毁连接
5. 队列设计:避免单一热点队列
6. 监控调优:持续监控性能指标
"""

# 性能优化的消费者示例
def optimized_consumer(queue_name: str, processor_func: Callable):
    """性能优化的消费者"""
    rmq = RabbitMQConnection()
    try:
        rmq.connect()
        
        # 设置较高的prefetch_count以提高吞吐量
        rmq.channel.basic_qos(prefetch_count=10)  # 根据消费者处理能力调整
        
        def callback(ch, method, properties, body):
            try:
                message = json.loads(body.decode('utf-8'))
                result = processor_func(message)
                
                # 快速确认消息
                ch.basic_ack(delivery_tag=method.delivery_tag)
                
            except Exception as e:
                logger.error(f"Error processing message: {e}")
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
        
        rmq.channel.basic_consume(queue=queue_name, on_message_callback=callback)
        rmq.channel.start_consuming()
        
    except KeyboardInterrupt:
        rmq.channel.stop_consuming()
    finally:
        rmq.disconnect()

7.3 应用场景总结

场景说明推荐配置
任务队列异步任务处理、后台作业Direct exchange, durable queue
事件驱动微服务间通信、事件传播Topic exchange, fanout for broadcast
RPC调用远程过程调用Direct exchange with reply queue
发布订阅广播消息、通知系统Fanout exchange
数据流处理ETL、实时分析Headers exchange for complex routing

总结

RabbitMQ是一个功能强大、可靠的消息队列系统,适用于各种异步通信场景。通过合理的配置、安全措施和性能优化,可以构建高效、可靠的消息传递系统。掌握RabbitMQ的核心概念和最佳实践,能够帮助开发者构建出色的分布式应用程序架构。