#RabbitMQ 实战教程
#什么是RabbitMQ?
RabbitMQ是一个开源的消息代理和队列服务器,基于AMQP(Advanced Message Queuing Protocol)协议。它提供了可靠的消息传递、灵活的路由、可扩展性和多种消息模式,广泛应用于微服务架构、异步处理和解耦系统组件。
#RabbitMQ的主要特点:
- 可靠性:支持消息持久化、发布确认和事务
- 灵活路由:支持多种交换机类型和路由规则
- 可扩展性:支持集群和联邦部署
- 多协议支持:支持AMQP、STOMP、MQTT等多种协议
- 管理界面:提供Web管理界面和命令行工具
- 高可用性:支持镜像队列和故障转移
#1. RabbitMQ安装与配置
#1.1 Docker方式安装
# 拉取RabbitMQ镜像(包含管理界面)
docker pull rabbitmq:3-management
# 运行RabbitMQ容器
docker run -d \
--name rabbitmq-server \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3-management
# 检查RabbitMQ状态
docker logs rabbitmq-server#1.2 Docker Compose方式安装
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
container_name: rabbitmq-server
restart: unless-stopped
ports:
- "5672:5672" # AMQP端口
- "15672:15672" # 管理界面端口
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
volumes:
- rabbitmq_data:/var/lib/rabbitmq
- ./rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf
healthcheck:
test: ["CMD", "rabbitmqctl", "status"]
interval: 30s
timeout: 10s
retries: 3
volumes:
rabbitmq_data:#1.3 配置文件详解
# rabbitmq.conf - RabbitMQ配置文件
# 基本配置
listeners.tcp.default = 5672
management.listener.port = 15672
management.listener.ssl = false
# 内存配置
vm_memory_high_watermark.relative = 0.6
vm_memory_calculation_strategy = allocated
# 磁盘配置
disk_free_limit.relative = 2.0
disk_free_limit.absolute = 2GB
# 队列配置
default_vhost = /
default_user = admin
default_pass = password
default_permissions.configure = .*
default_permissions.read = .*
default_permissions.write = .*
# 集群配置
cluster_formation.peer_discovery_backend = classic_config
cluster_formation.classic_config.nodes.1 = rabbit@node1
cluster_formation.classic_config.nodes.2 = rabbit@node2
# 高可用配置
queue_master_locator = min-masters
# 日志配置
log.file.level = info
log.file.rotation.date = $D0
log.file.rotation.size = 10485760#1.4 高可用集群配置
# docker-compose-cluster.yml
version: '3.8'
services:
rabbitmq1:
image: rabbitmq:3-management
hostname: rabbit1
container_name: rabbitmq1
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
volumes:
- rabbitmq1_data:/var/lib/rabbitmq
rabbitmq2:
image: rabbitmq:3-management
hostname: rabbit2
container_name: rabbitmq2
ports:
- "5673:5672"
- "15673:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
volumes:
- rabbitmq2_data:/var/lib/rabbitmq
rabbitmq3:
image: rabbitmq:3-management
hostname: rabbit3
container_name: rabbitmq3
ports:
- "5674:5672"
- "15674:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_ERLANG_COOKIE: "SWQOKODSQALRPCLNMEQG"
volumes:
- rabbitmq3_data:/var/lib/rabbitmq
volumes:
rabbitmq1_data:
rabbitmq2_data:
rabbitmq3_data:#2. RabbitMQ基础概念
#2.1 核心概念
| 概念 | 说明 |
|---|---|
| Producer | 消息生产者,发送消息到Exchange |
| Consumer | 消息消费者,从Queue接收消息 |
| Exchange | 交换机,接收生产者消息并路由到队列 |
| Queue | 队列,存储消息等待消费 |
| Binding | 绑定,Exchange和Queue之间的关联规则 |
| Routing Key | 路由键,用于Exchange路由消息 |
| Virtual Host | 虚拟主机,逻辑隔离的工作空间 |
#2.2 交换机类型
# 直连交换机(Direct Exchange)
# 消息的routing key必须完全匹配binding key
rabbitmqctl set_policy direct_exchange_policy "direct_exchange.*" '{"pattern":"direct_exchange.*","definition":{"type":"direct"},"apply-to":"exchanges"}'
# 主题交换机(Topic Exchange)
# 支持通配符匹配
rabbitmqctl set_policy topic_exchange_policy "topic_exchange.*" '{"pattern":"topic_exchange.*","definition":{"type":"topic"},"apply-to":"exchanges"}'
# 扇形交换机(Fanout Exchange)
# 广播模式,忽略routing key
rabbitmqctl set_policy fanout_exchange_policy "fanout_exchange.*" '{"pattern":"fanout_exchange.*","definition":{"type":"fanout"},"apply-to":"exchanges"}'
# 头交换机(Headers Exchange)
# 基于消息头匹配
rabbitmqctl set_policy headers_exchange_policy "headers_exchange.*" '{"pattern":"headers_exchange.*","definition":{"type":"headers"},"apply-to":"exchanges"}'#3. 管理与监控
#3.1 命令行管理
# 连接到RabbitMQ容器
docker exec -it rabbitmq-server bash
# 查看服务器状态
rabbitmqctl status
# 查看节点信息
rabbitmqctl cluster_status
# 查看用户列表
rabbitmqctl list_users
# 查看虚拟主机
rabbitmqctl list_vhosts
# 查看队列信息
rabbitmqctl list_queues
# 查看交换机信息
rabbitmqctl list_exchanges
# 查看绑定信息
rabbitmqctl list_bindings
# 创建用户
rabbitmqctl add_user newuser newpassword
# 设置用户权限
rabbitmqctl set_permissions -p / newuser ".*" ".*" ".*"
# 创建虚拟主机
rabbitmqctl add_vhost /myhost
# 设置虚拟主机权限
rabbitmqctl set_permissions -p /myhost newuser ".*" ".*" ".*"#3.2 Web管理界面
# 访问管理界面
# http://localhost:15672
# 默认用户名/密码: guest/guest (需要在配置中启用远程访问)
# 或使用上面配置的 admin/password#3.3 队列管理
# 创建队列
rabbitmqadmin declare queue name=my_queue durable=true
# 创建交换机
rabbitmqadmin declare exchange name=my_exchange type=direct durable=true
# 绑定交换机和队列
rabbitmqadmin declare binding source=my_exchange destination=my_queue routing_key=my_routing_key
# 发送消息
rabbitmqadmin publish exchange=my_exchange routing_key=my_routing_key payload="Hello World"
# 获取消息
rabbitmqadmin get queue=my_queue ackmode=ack_requeue_false#4. Python与RabbitMQ集成
#4.1 安装pika客户端
pip install pika
pip install aio-pika # 异步支持#4.2 基本连接与操作
import pika
import json
import logging
from typing import Callable, Any
import time
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RabbitMQConnection:
def __init__(self, host='localhost', port=5672, username='admin', password='password', virtual_host='/'):
self.host = host
self.port = port
self.username = username
self.password = password
self.virtual_host = virtual_host
self.connection = None
self.channel = None
def connect(self):
"""建立连接"""
credentials = pika.PlainCredentials(self.username, self.password)
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
virtual_host=self.virtual_host,
credentials=credentials,
heartbeat=600,
blocked_connection_timeout=300
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
logger.info(f"Connected to RabbitMQ at {self.host}:{self.port}")
def disconnect(self):
"""断开连接"""
if self.connection and not self.connection.is_closed:
self.connection.close()
logger.info("Disconnected from RabbitMQ")
def declare_queue(self, queue_name: str, durable=True, auto_delete=False, exclusive=False):
"""声明队列"""
self.channel.queue_declare(
queue=queue_name,
durable=durable,
auto_delete=auto_delete,
exclusive=exclusive
)
logger.info(f"Declared queue: {queue_name}")
def declare_exchange(self, exchange_name: str, exchange_type: str = 'direct', durable=True):
"""声明交换机"""
self.channel.exchange_declare(
exchange=exchange_name,
exchange_type=exchange_type,
durable=durable
)
logger.info(f"Declared exchange: {exchange_name}, type: {exchange_type}")
def bind_queue(self, queue_name: str, exchange_name: str, routing_key: str):
"""绑定队列到交换机"""
self.channel.queue_bind(
queue=queue_name,
exchange=exchange_name,
routing_key=routing_key
)
logger.info(f"Bound queue {queue_name} to exchange {exchange_name} with routing key {routing_key}")
# 发送消息
def send_message(queue_name: str, message: dict, routing_key: str = ''):
"""发送消息到队列或交换机"""
rmq = RabbitMQConnection()
try:
rmq.connect()
# 声明队列(如果不存在)
rmq.declare_queue(queue_name)
# 发送消息
message_json = json.dumps(message)
rmq.channel.basic_publish(
exchange='',
routing_key=queue_name,
body=message_json,
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
)
)
logger.info(f"Message sent to queue {queue_name}: {message_json}")
finally:
rmq.disconnect()
# 消费消息
def consume_messages(queue_name: str, callback_func: Callable[[Any], None]):
"""消费队列中的消息"""
rmq = RabbitMQConnection()
try:
rmq.connect()
# 声明队列
rmq.declare_queue(queue_name)
# 设置QoS(服务质量)
rmq.channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
try:
message = json.loads(body.decode('utf-8'))
logger.info(f"Received message: {message}")
# 调用回调函数处理消息
callback_func(message)
# 手动确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info("Message acknowledged")
except Exception as e:
logger.error(f"Error processing message: {e}")
# 拒绝消息并重新排队
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
# 开始消费
rmq.channel.basic_consume(queue=queue_name, on_message_callback=callback)
logger.info(f"Started consuming messages from queue {queue_name}")
# 开始消费循环
rmq.channel.start_consuming()
except KeyboardInterrupt:
rmq.channel.stop_consuming()
logger.info("Stopped consuming messages")
finally:
rmq.disconnect()#4.3 高级功能
# 发布-订阅模式
def publish_message(exchange_name: str, message: dict, routing_key: str, exchange_type: str = 'direct'):
"""发布消息到交换机"""
rmq = RabbitMQConnection()
try:
rmq.connect()
# 声明交换机
rmq.declare_exchange(exchange_name, exchange_type)
# 发布消息
message_json = json.dumps(message)
rmq.channel.basic_publish(
exchange=exchange_name,
routing_key=routing_key,
body=message_json,
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
content_type='application/json',
timestamp=int(time.time())
)
)
logger.info(f"Published message to exchange {exchange_name} with routing key {routing_key}")
finally:
rmq.disconnect()
# RPC模式
class RPCServer:
def __init__(self, queue_name: str):
self.queue_name = queue_name
self.connection = None
self.channel = None
self.process_function = None
def connect(self):
credentials = pika.PlainCredentials('admin', 'password')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
self.channel.queue_declare(queue=self.queue_name, durable=True)
self.channel.basic_qos(prefetch_count=1)
def register_process_function(self, func: Callable):
"""注册处理函数"""
self.process_function = func
def start_server(self):
"""启动RPC服务器"""
self.connect()
def on_request(ch, method, props, body):
response = None
try:
# 解析请求
request = json.loads(body.decode('utf-8'))
# 调用处理函数
if self.process_function:
response = self.process_function(request)
else:
response = {"error": "No process function registered"}
except Exception as e:
response = {"error": str(e)}
logger.error(f"Error in RPC server: {e}")
# 发送响应
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=json.dumps(response)
)
# 确认原始消息
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(queue=self.queue_name, on_message_callback=on_request)
logger.info(f"RPC Server started, waiting for requests on queue {self.queue_name}")
self.channel.start_consuming()
class RPCClient:
def __init__(self):
self.connection = None
self.channel = None
self.response = None
self.correlation_id = None
self.properties = None
def call(self, queue_name: str, message: dict) -> dict:
"""调用RPC服务"""
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host='localhost')
)
self.channel = self.connection.channel()
# 声明回调队列
result = self.channel.queue_declare(queue='', exclusive=True)
callback_queue = result.method.queue
self.correlation_id = str(time.time())
self.channel.basic_publish(
exchange='',
routing_key=queue_name,
properties=pika.BasicProperties(
reply_to=callback_queue,
correlation_id=self.correlation_id
),
body=json.dumps(message)
)
# 等待响应
self.channel.basic_consume(
queue=callback_queue,
on_message_callback=self.on_response,
auto_ack=True
)
while self.response is None:
self.connection.process_data_events(time_limit=1)
self.connection.close()
return self.response
def on_response(self, ch, method, props, body):
"""处理响应"""
if self.correlation_id == props.correlation_id:
self.response = json.loads(body.decode('utf-8'))
# 消息确认和持久化
def send_persistent_message(queue_name: str, message: dict):
"""发送持久化消息"""
rmq = RabbitMQConnection()
try:
rmq.connect()
# 声明持久化队列
rmq.channel.queue_declare(queue=queue_name, durable=True)
# 发送持久化消息
rmq.channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 消息持久化
headers={'type': 'persistent_message'}
)
)
logger.info(f"Persistent message sent to {queue_name}")
finally:
rmq.disconnect()#4.4 实际应用场景
#4.4.1 任务队列系统
import time
import threading
from datetime import datetime
from dataclasses import dataclass
from enum import Enum
from typing import Optional
class TaskStatus(Enum):
PENDING = "pending"
PROCESSING = "processing"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
id: str
name: str
payload: dict
priority: int = 1
created_at: str = None
status: TaskStatus = TaskStatus.PENDING
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now().isoformat()
class TaskQueueSystem:
def __init__(self, connection_params: dict = None):
self.connection_params = connection_params or {
'host': 'localhost',
'port': 5672,
'username': 'admin',
'password': 'password',
'virtual_host': '/'
}
self.connection = None
self.channel = None
self.task_queue = 'task_queue'
self.result_queue = 'result_queue'
self.setup_queues()
def setup_queues(self):
"""设置任务队列和结果队列"""
credentials = pika.PlainCredentials(
self.connection_params['username'],
self.connection_params['password']
)
parameters = pika.ConnectionParameters(
host=self.connection_params['host'],
port=self.connection_params['port'],
virtual_host=self.connection_params['virtual_host'],
credentials=credentials
)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
# 声明队列(持久化)
self.channel.queue_declare(queue=self.task_queue, durable=True)
self.channel.queue_declare(queue=self.result_queue, durable=True)
# 设置QoS
self.channel.basic_qos(prefetch_count=1)
logger.info("Task queues initialized")
def submit_task(self, task: Task) -> str:
"""提交任务到队列"""
task_dict = {
'id': task.id,
'name': task.name,
'payload': task.payload,
'priority': task.priority,
'created_at': task.created_at,
'status': task.status.value
}
self.channel.basic_publish(
exchange='',
routing_key=self.task_queue,
body=json.dumps(task_dict),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
priority=task.priority # 优先级
)
)
logger.info(f"Task submitted: {task.id} - {task.name}")
return task.id
def process_tasks(self, processor_func: Callable[[Task], Any]):
"""处理队列中的任务"""
def callback(ch, method, properties, body):
try:
task_data = json.loads(body.decode('utf-8'))
task = Task(
id=task_data['id'],
name=task_data['name'],
payload=task_data['payload'],
priority=task_data['priority'],
created_at=task_data['created_at']
)
logger.info(f"Processing task: {task.id}")
# 更新任务状态为处理中
task.status = TaskStatus.PROCESSING
try:
# 执行任务处理函数
result = processor_func(task)
# 任务完成
task.status = TaskStatus.COMPLETED
# 发送结果
result_data = {
'task_id': task.id,
'status': task.status.value,
'result': result,
'processed_at': datetime.now().isoformat()
}
self.channel.basic_publish(
exchange='',
routing_key=self.result_queue,
body=json.dumps(result_data),
properties=pika.BasicProperties(delivery_mode=2)
)
logger.info(f"Task completed: {task.id}")
except Exception as e:
# 任务失败
task.status = TaskStatus.FAILED
error_data = {
'task_id': task.id,
'status': task.status.value,
'error': str(e),
'processed_at': datetime.now().isoformat()
}
self.channel.basic_publish(
exchange='',
routing_key=self.result_queue,
body=json.dumps(error_data),
properties=pika.BasicProperties(delivery_mode=2)
)
logger.error(f"Task failed: {task.id}, error: {e}")
# 确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Error processing task message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
self.channel.basic_consume(queue=self.task_queue, on_message_callback=callback)
logger.info("Starting to process tasks...")
self.channel.start_consuming()
def get_results(self, timeout: int = 30) -> list:
"""获取处理结果"""
results = []
def result_callback(ch, method, properties, body):
result_data = json.loads(body.decode('utf-8'))
results.append(result_data)
ch.basic_ack(delivery_tag=method.delivery_tag)
# 临时消费结果队列
self.channel.basic_consume(queue=self.result_queue, on_message_callback=result_callback)
# 消费一段时间
start_time = time.time()
while time.time() - start_time < timeout and len(results) == 0:
self.connection.process_data_events(time_limit=1)
return results
# 使用示例
def example_task_processor(task: Task) -> dict:
"""示例任务处理器"""
logger.info(f"Processing task {task.id}: {task.name}")
# 模拟任务处理
time.sleep(2) # 模拟耗时操作
# 根据任务类型执行不同操作
if task.name == "calculate_sum":
numbers = task.payload.get('numbers', [])
result = sum(numbers)
elif task.name == "process_data":
data = task.payload.get('data', {})
result = {"processed": True, "count": len(data)}
else:
result = {"default": "processed", "task_id": task.id}
return result
# 创建任务队列系统
task_system = TaskQueueSystem()
# 提交示例任务
import uuid
task1 = Task(
id=str(uuid.uuid4()),
name="calculate_sum",
payload={"numbers": [1, 2, 3, 4, 5]},
priority=2
)
task2 = Task(
id=str(uuid.uuid4()),
name="process_data",
payload={"data": {"key1": "value1", "key2": "value2"}},
priority=1
)
task_system.submit_task(task1)
task_system.submit_task(task2)
print("Tasks submitted successfully")#4.4.2 事件驱动系统
from enum import Enum
import uuid
from datetime import datetime
class EventType(Enum):
USER_REGISTERED = "user_registered"
ORDER_CREATED = "order_created"
PAYMENT_PROCESSED = "payment_processed"
NOTIFICATION_SENT = "notification_sent"
@dataclass
class Event:
event_type: EventType
data: dict
timestamp: str = None
event_id: str = None
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now().isoformat()
if self.event_id is None:
self.event_id = str(uuid.uuid4())
class EventBus:
def __init__(self):
self.connection = None
self.channel = None
self.exchange_name = 'event_bus'
self.setup_event_bus()
def setup_event_bus(self):
"""设置事件总线(使用主题交换机)"""
credentials = pika.PlainCredentials('admin', 'password')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
# 声明主题交换机
self.channel.exchange_declare(
exchange=self.exchange_name,
exchange_type='topic',
durable=True
)
logger.info("Event bus initialized")
def publish_event(self, event: Event):
"""发布事件"""
event_data = {
'event_type': event.event_type.value,
'data': event.data,
'timestamp': event.timestamp,
'event_id': event.event_id
}
routing_key = f"event.{event.event_type.value}"
self.channel.basic_publish(
exchange=self.exchange_name,
routing_key=routing_key,
body=json.dumps(event_data),
properties=pika.BasicProperties(
delivery_mode=2,
timestamp=int(datetime.now().timestamp())
)
)
logger.info(f"Event published: {event.event_type.value} - {event.event_id}")
def subscribe_to_events(self, pattern: str, handler_func: Callable[[Event], None]):
"""订阅特定模式的事件"""
# 创建临时队列
result = self.channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# 绑定队列到交换机
self.channel.queue_bind(
exchange=self.exchange_name,
queue=queue_name,
routing_key=pattern # 支持通配符,如 "event.user.*"
)
def callback(ch, method, properties, body):
try:
event_data = json.loads(body.decode('utf-8'))
event = Event(
event_type=EventType(event_data['event_type']),
data=event_data['data'],
timestamp=event_data['timestamp'],
event_id=event_data['event_id']
)
# 处理事件
handler_func(event)
# 确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Error handling event: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
self.channel.basic_consume(queue=queue_name, on_message_callback=callback)
logger.info(f"Subscribed to events with pattern: {pattern}")
try:
self.channel.start_consuming()
except KeyboardInterrupt:
self.channel.stop_consuming()
# 事件处理器示例
def user_event_handler(event: Event):
"""处理用户相关事件"""
if event.event_type == EventType.USER_REGISTERED:
user_data = event.data
logger.info(f"New user registered: {user_data.get('username')}")
# 发送欢迎邮件事件
welcome_event = Event(
event_type=EventType.NOTIFICATION_SENT,
data={
'user_id': user_data.get('user_id'),
'type': 'welcome_email',
'recipient': user_data.get('email')
}
)
event_bus = EventBus()
event_bus.publish_event(welcome_event)
def order_event_handler(event: Event):
"""处理订单相关事件"""
if event.event_type == EventType.ORDER_CREATED:
order_data = event.data
logger.info(f"New order created: {order_data.get('order_id')}")
# 发布支付事件
payment_event = Event(
event_type=EventType.PAYMENT_PROCESSED,
data={
'order_id': order_data.get('order_id'),
'amount': order_data.get('total'),
'status': 'pending'
}
)
event_bus = EventBus()
event_bus.publish_event(payment_event)
# 使用示例
# 发布用户注册事件
event_bus = EventBus()
user_registration_event = Event(
event_type=EventType.USER_REGISTERED,
data={
'user_id': 'user_123',
'username': 'john_doe',
'email': 'john@example.com',
'registration_date': datetime.now().isoformat()
}
)
event_bus.publish_event(user_registration_event)
print("User registration event published")#4.4.3 消息重试机制
import time
from typing import Optional
class RetryableMessageQueue:
def __init__(self, max_retries: int = 3, retry_delay: float = 1.0):
self.max_retries = max_retries
self.retry_delay = retry_delay
self.main_queue = 'main_queue'
self.retry_queue = 'retry_queue'
self.dead_letter_queue = 'dead_letter_queue'
self.setup_queues()
def setup_queues(self):
"""设置主队列、重试队列和死信队列"""
credentials = pika.PlainCredentials('admin', 'password')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
self.connection = pika.BlockingConnection(parameters)
self.channel = self.connection.channel()
# 声明死信队列
self.channel.queue_declare(queue=self.dead_letter_queue, durable=True)
# 声明重试队列,设置死信交换机
self.channel.queue_declare(
queue=self.retry_queue,
durable=True,
arguments={
'x-message-ttl': int(self.retry_delay * 1000), # TTL毫秒
'x-dead-letter-exchange': '', # 死信交换机
'x-dead-letter-routing-key': self.main_queue # 死信路由键
}
)
# 声明主队列
self.channel.queue_declare(queue=self.main_queue, durable=True)
# 设置QoS
self.channel.basic_qos(prefetch_count=1)
logger.info("Retryable message queues initialized")
def send_message(self, message: dict, delay_seconds: Optional[float] = None):
"""发送消息,可选择延迟"""
target_queue = self.main_queue
if delay_seconds:
# 如果指定了延迟,发送到重试队列(通过TTL实现延迟)
target_queue = self.retry_queue
message['retry_info'] = {
'original_queue': self.main_queue,
'delay': delay_seconds,
'retries': 0
}
self.channel.basic_publish(
exchange='',
routing_key=target_queue,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)
)
logger.info(f"Message sent to {target_queue}")
def process_message_with_retry(self, processor_func: Callable[[dict], bool]):
"""处理消息,支持重试机制"""
def callback(ch, method, properties, body):
message = json.loads(body.decode('utf-8'))
# 检查是否是重试消息
retry_info = message.get('retry_info', {})
current_retry = retry_info.get('retries', 0)
try:
# 执行处理函数
success = processor_func(message)
if success:
# 处理成功,确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.info("Message processed successfully")
else:
# 处理失败,根据重试次数决定是否重试
if current_retry < self.max_retries:
# 增加重试次数
retry_info['retries'] = current_retry + 1
message['retry_info'] = retry_info
# 发送到重试队列
self.channel.basic_publish(
exchange='',
routing_key=self.retry_queue,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)
)
# 确认原始消息
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.warning(f"Message failed, retrying... (attempt {current_retry + 1})")
else:
# 达到最大重试次数,发送到死信队列
message['error'] = 'Max retries exceeded'
self.channel.basic_publish(
exchange='',
routing_key=self.dead_letter_queue,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)
)
# 确认原始消息
ch.basic_ack(delivery_tag=method.delivery_tag)
logger.error(f"Message failed after {self.max_retries} retries, moved to dead letter queue")
except Exception as e:
logger.error(f"Error processing message: {e}")
# 同样应用重试逻辑
if current_retry < self.max_retries:
retry_info['retries'] = current_retry + 1
message['retry_info'] = retry_info
self.channel.basic_publish(
exchange='',
routing_key=self.retry_queue,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
message['error'] = str(e)
self.channel.basic_publish(
exchange='',
routing_key=self.dead_letter_queue,
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2)
)
ch.basic_ack(delivery_tag=method.delivery_tag)
self.channel.basic_consume(queue=self.main_queue, on_message_callback=callback)
logger.info("Starting to process messages with retry mechanism...")
self.channel.start_consuming()
# 使用示例:处理可能失败的消息
def unreliable_processor(message: dict) -> bool:
"""模拟可能失败的处理器"""
import random
# 模拟处理失败的情况
if random.random() < 0.7: # 70% 的概率失败
logger.error("Processor failed randomly")
return False
logger.info(f"Successfully processed message: {message}")
return True
# 创建带重试的消息队列
retry_queue = RetryableMessageQueue(max_retries=3, retry_delay=5.0)
# 发送测试消息
test_message = {
'id': str(uuid.uuid4()),
'content': 'Test message that might fail',
'timestamp': datetime.now().isoformat()
}
retry_queue.send_message(test_message)
print("Test message sent with retry capability")#5. 性能优化
#5.1 连接池管理
import threading
from queue import Queue
import time
class RabbitMQConnectionPool:
def __init__(self, max_connections=10, host='localhost', port=5672, **credentials):
self.max_connections = max_connections
self.host = host
self.port = port
self.credentials = credentials
self.pool = Queue(maxsize=max_connections)
self.active_connections = 0
self.lock = threading.Lock()
# 预创建连接
for _ in range(max_connections):
conn = self._create_connection()
if conn:
self.pool.put(conn)
def _create_connection(self):
"""创建新的RabbitMQ连接"""
try:
credentials = pika.PlainCredentials(
self.credentials.get('username', 'guest'),
self.credentials.get('password', 'guest')
)
parameters = pika.ConnectionParameters(
host=self.host,
port=self.port,
credentials=credentials
)
connection = pika.BlockingConnection(parameters)
return connection
except Exception as e:
logger.error(f"Failed to create connection: {e}")
return None
def get_connection(self):
"""获取连接"""
try:
return self.pool.get(timeout=5)
except:
# 如果池为空,创建新连接(不超过最大限制)
with self.lock:
if self.active_connections < self.max_connections:
conn = self._create_connection()
if conn:
self.active_connections += 1
return conn
raise Exception("No available connections")
def return_connection(self, connection):
"""归还连接到池"""
try:
if not connection.is_closed:
self.pool.put(connection, timeout=1)
else:
# 连接已关闭,创建新连接补充
new_conn = self._create_connection()
if new_conn:
self.pool.put(new_conn)
except:
# 池已满或其他问题,关闭连接
if not connection.is_closed:
connection.close()
with self.lock:
self.active_connections -= 1#5.2 批量消息处理
class BatchMessageProcessor:
def __init__(self, batch_size=100, timeout=5.0):
self.batch_size = batch_size
self.timeout = timeout
self.message_batch = []
self.lock = threading.Lock()
self.condition = threading.Condition(self.lock)
def add_message(self, message):
"""添加消息到批处理队列"""
with self.condition:
self.message_batch.append(message)
if len(self.message_batch) >= self.batch_size:
self.condition.notify()
def process_batch(self, processor_func: Callable[[list], None]):
"""处理消息批次"""
while True:
with self.condition:
# 等待达到批次大小或超时
while len(self.message_batch) < self.batch_size:
self.condition.wait(timeout=self.timeout)
if len(self.message_batch) > 0:
break
if self.message_batch:
batch = self.message_batch.copy()
self.message_batch.clear()
if batch:
try:
processor_func(batch)
logger.info(f"Processed batch of {len(batch)} messages")
except Exception as e:
logger.error(f"Error processing batch: {e}")#6. 监控与运维
#6.1 使用RabbitMQ管理API
import requests
import json
class RabbitMQAdminAPI:
def __init__(self, host='localhost', port=15672, username='admin', password='password'):
self.base_url = f"http://{host}:{port}/api"
self.auth = (username, password)
def get_overview(self):
"""获取RabbitMQ概览"""
response = requests.get(f"{self.base_url}/overview", auth=self.auth)
return response.json()
def get_nodes(self):
"""获取节点信息"""
response = requests.get(f"{self.base_url}/nodes", auth=self.auth)
return response.json()
def get_queues(self, vhost='%2F'): # %2F is URL encoded '/'
"""获取队列信息"""
response = requests.get(f"{self.base_url}/queues/{vhost}", auth=self.auth)
return response.json()
def get_exchanges(self, vhost='%2F'):
"""获取交换机信息"""
response = requests.get(f"{self.base_url}/exchanges/{vhost}", auth=self.auth)
return response.json()
def get_connections(self):
"""获取连接信息"""
response = requests.get(f"{self.base_url}/connections", auth=self.auth)
return response.json()
def get_channels(self):
"""获取通道信息"""
response = requests.get(f"{self.base_url}/channels", auth=self.auth)
return response.json()
def publish_message_via_api(self, exchange_name, routing_key, message, vhost='%2F'):
"""通过API发布消息"""
url = f"{self.base_url}/exchanges/{vhost}/{exchange_name}/publish"
payload = {
"properties": {},
"routing_key": routing_key,
"payload": json.dumps(message),
"payload_encoding": "string"
}
response = requests.post(url, json=payload, auth=self.auth)
return response.json()
# 使用示例
admin_api = RabbitMQAdminAPI()
overview = admin_api.get_overview()
print(f"RabbitMQ Version: {overview.get('rabbitmq_version')}")
print(f"Management Version: {overview.get('management_version')}")#6.2 健康检查
def rabbitmq_health_check():
"""RabbitMQ健康检查"""
try:
# 测试连接
credentials = pika.PlainCredentials('admin', 'password')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 创建临时队列进行测试
result = channel.queue_declare(queue='', exclusive=True)
test_queue = result.method.queue
# 发送测试消息
test_message = {"health_check": True, "timestamp": time.time()}
channel.basic_publish(
exchange='',
routing_key=test_queue,
body=json.dumps(test_message)
)
# 尝试消费消息
method, properties, body = channel.basic_get(queue=test_queue, auto_ack=True)
if method:
logger.info("RabbitMQ health check: PASSED")
success = True
else:
logger.error("RabbitMQ health check: FAILED - Could not retrieve test message")
success = False
# 清理
channel.queue_delete(queue=test_queue)
connection.close()
return success
except Exception as e:
logger.error(f"RabbitMQ health check: FAILED - {str(e)}")
return False
# 执行健康检查
is_healthy = rabbitmq_health_check()
print(f"RabbitMQ Health Status: {'Healthy' if is_healthy else 'Unhealthy'}")#7. 最佳实践
#7.1 安全最佳实践
"""
RabbitMQ安全最佳实践:
1. 使用强密码:设置复杂的用户名和密码
2. 网络隔离:使用防火墙限制访问
3. TLS加密:在生产环境中启用TLS
4. 最小权限原则:为不同应用创建专用用户
5. 访问控制:使用虚拟主机和权限控制
6. 监控审计:启用访问日志记录
7. 定期更新:及时更新RabbitMQ版本
"""
# 安全的连接配置
def create_secure_connection():
import ssl
# 使用环境变量存储凭据
import os
credentials = pika.PlainCredentials(
os.getenv('RABBITMQ_USER', 'admin'),
os.getenv('RABBITMQ_PASS', 'password')
)
# 配置TLS(生产环境)
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE # 生产环境应使用适当的证书验证
parameters = pika.ConnectionParameters(
host=os.getenv('RABBITMQ_HOST', 'localhost'),
port=int(os.getenv('RABBITMQ_PORT', '5672')),
virtual_host='/',
credentials=credentials,
ssl_options=pika.SSLOptions(context) if os.getenv('RABBITMQ_USE_SSL') == 'true' else None,
heartbeat=600,
blocked_connection_timeout=300
)
return pika.BlockingConnection(parameters)#7.2 性能最佳实践
"""
RabbitMQ性能最佳实践:
1. 消息持久化权衡:根据可靠性需求选择
2. 批量操作:减少网络往返次数
3. QoS设置:合理设置prefetch_count
4. 连接复用:避免频繁创建/销毁连接
5. 队列设计:避免单一热点队列
6. 监控调优:持续监控性能指标
"""
# 性能优化的消费者示例
def optimized_consumer(queue_name: str, processor_func: Callable):
"""性能优化的消费者"""
rmq = RabbitMQConnection()
try:
rmq.connect()
# 设置较高的prefetch_count以提高吞吐量
rmq.channel.basic_qos(prefetch_count=10) # 根据消费者处理能力调整
def callback(ch, method, properties, body):
try:
message = json.loads(body.decode('utf-8'))
result = processor_func(message)
# 快速确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
logger.error(f"Error processing message: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
rmq.channel.basic_consume(queue=queue_name, on_message_callback=callback)
rmq.channel.start_consuming()
except KeyboardInterrupt:
rmq.channel.stop_consuming()
finally:
rmq.disconnect()#7.3 应用场景总结
| 场景 | 说明 | 推荐配置 |
|---|---|---|
| 任务队列 | 异步任务处理、后台作业 | Direct exchange, durable queue |
| 事件驱动 | 微服务间通信、事件传播 | Topic exchange, fanout for broadcast |
| RPC调用 | 远程过程调用 | Direct exchange with reply queue |
| 发布订阅 | 广播消息、通知系统 | Fanout exchange |
| 数据流处理 | ETL、实时分析 | Headers exchange for complex routing |
#总结
RabbitMQ是一个功能强大、可靠的消息队列系统,适用于各种异步通信场景。通过合理的配置、安全措施和性能优化,可以构建高效、可靠的消息传递系统。掌握RabbitMQ的核心概念和最佳实践,能够帮助开发者构建出色的分布式应用程序架构。

