#FastAPI异步任务队列Celery完全指南
📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:流式响应 StreamingResponse · Redis 集成
#目录
#异步任务队列基础概念
#为什么需要任务队列?
在现代Web应用中,许多操作都是耗时的,比如:
传统同步处理:
┌─────────────────────────────────────────────────────┐
│ 用户请求 → 服务器 → 执行耗时任务 → 返回响应 │
│ (用户需等待任务完成,可能超时) │
└─────────────────────────────────────────────────────┘
异步任务队列处理:
┌─────────────────────────────────────────────────────┐
│ 用户请求 → 服务器 → 添加到任务队列 → 立即响应 │
│ 后台Worker → 执行任务 → 存储结果 → 通知用户 │
└─────────────────────────────────────────────────────┘#任务队列的核心优势
| 优势 | 说明 | 应用场景 |
|---|---|---|
| 响应性 | 立即响应用户请求 | AI模型推理、图像生成 |
| 可扩展性 | 水平扩展Worker处理能力 | 大规模数据处理 |
| 可靠性 | 任务持久化,失败重试 | 邮件发送、文件处理 |
| 解耦性 | 服务间松耦合 | 微服务架构 |
#Celery架构详解
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Producer │───→│ Message │───→│ Celery │
│ (FastAPI) │ │ Broker │ │ Workers │
│ (提交任务) │ │ (Redis/RabbitMQ)│ │ (执行任务) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
↑ │
└──────────────────────────────────────────────┘
┌─────────────────┐
│ Results │
│ Backend │
│ (Redis/DB) │
└─────────────────┘#典型应用场景
- AI模型推理:批量处理AI模型预测任务
- 图像/视频处理:图像生成、视频转码
- 邮件发送:批量邮件、通知推送
- 数据处理:ETL作业、报表生成
- 文件操作:大文件上传、文档转换
- 第三方API调用:外部服务集成
#Celery安装与配置
#安装依赖
# 基础安装
pip install celery[redis] redis
# 包含更多功能的安装
pip install celery[redis,auth,msgpack,librabbitmq,pyro,yaml,sqs,gevent,eventlet,couchbase,cassandra,mongodb]
# 如果使用RabbitMQ作为Broker
pip install celery[rabbitmq]#Celery配置文件
# celery_app.py - Celery应用配置
from celery import Celery
from kombu import Queue
import os
def make_celery(app_name=__name__):
"""创建Celery应用实例"""
redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
celery = Celery(
app_name,
broker=redis_url,
backend=redis_url.replace('/0', '/1'), # 使用不同的数据库
include=[
'tasks.ai_tasks',
'tasks.email_tasks',
'tasks.file_processing',
'tasks.data_analysis'
]
)
# 详细配置
celery.conf.update(
# 序列化配置
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Asia/Shanghai',
enable_utc=True,
# 任务跟踪
task_track_started=True,
# 任务超时设置
task_time_limit=300, # 硬超时:5分钟
task_soft_time_limit=240, # 软超时:4分钟
# 工作进程配置
worker_prefetch_multiplier=1, # 防止任务积压
worker_max_tasks_per_child=1000, # 每个工作进程处理1000个任务后重启
# 确认机制
task_acks_late=True, # 任务完成后才确认
task_reject_on_worker_lost=True,
# 结果存储
result_expires=3600, # 结果1小时后自动清除
result_compression='gzip', # 结果压缩
# 重试配置
task_retry_kwargs={'max_retries': 3},
task_retry_backoff=True,
task_retry_backoff_max=700,
task_retry_jitter=True,
# 队列配置
task_default_queue='default',
task_routes={
'tasks.ai_tasks.*': {'queue': 'ai_tasks'},
'tasks.email_tasks.*': {'queue': 'email_tasks'},
'tasks.file_processing.*': {'queue': 'file_processing'},
},
task_create_missing_queues=True,
# 监控配置
worker_send_task_events=True,
task_send_sent_event=True,
# 安全配置
security_key=os.getenv('CELERY_SECURITY_KEY'),
security_certificate=os.getenv('CELERY_CERT_PATH'),
security_authority=os.getenv('CELERY_AUTHORITY_PATH'),
)
return celery
# 创建全局Celery实例
celery_app = make_celery('daoman_celery')
# 队列定义
celery_app.conf.task_queues = (
Queue('default', routing_key='default'),
Queue('ai_tasks', routing_key='ai.#'),
Queue('email_tasks', routing_key='email.#'),
Queue('file_processing', routing_key='file.#'),
)#环境配置
# config/celery_config.py - 配置管理
import os
from kombu import Queue
class CeleryConfig:
"""Celery配置类"""
# Broker配置
BROKER_URL = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
RESULT_BACKEND = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
# 序列化配置
TASK_SERIALIZER = 'json'
ACCEPT_CONTENT = ['json']
RESULT_SERIALIZER = 'json'
# 时区配置
TIMEZONE = 'Asia/Shanghai'
ENABLE_UTC = True
# 任务配置
TASK_TRACK_STARTED = True
TASK_TIME_LIMIT = 300
TASK_SOFT_TIME_LIMIT = 240
# 重试配置
TASK_RETRY_KWARGS = {'max_retries': 3}
TASK_RETRY_BACKOFF = True
TASK_RETRY_BACKOFF_MAX = 700
TASK_RETRY_JITTER = True
# 结果配置
RESULT_EXPIRES = 3600
RESULT_COMPRESSION = 'gzip'
# 工作进程配置
WORKER_PREFETCH_MULTIPLIER = 1
WORKER_MAX_TASKS_PER_CHILD = 1000
WORKER_SEND_TASK_EVENTS = True
TASK_SEND_SENT_EVENT = True
# 队列路由
TASK_ROUTES = {
'tasks.ai_tasks.*': {'queue': 'ai_tasks'},
'tasks.email_tasks.*': {'queue': 'email_tasks'},
'tasks.file_processing.*': {'queue': 'file_processing'},
}
# 自定义队列
TASK_QUEUES = (
Queue('default', routing_key='default'),
Queue('ai_tasks', routing_key='ai.#'),
Queue('email_tasks', routing_key='email.#'),
Queue('file_processing', routing_key='file.#'),
)
# 安全配置
SECURITY_KEY = os.getenv('CELERY_SECURITY_KEY')
SECURITY_CERTIFICATE = os.getenv('CELERY_CERT_PATH')
SECURITY_AUTHORITY = os.getenv('CELERY_AUTHORITY_PATH')
# 应用配置
celery_config = CeleryConfig()#任务定义与实现
#基础任务定义
# tasks/basic_tasks.py - 基础任务示例
from celery_app import celery_app
import time
import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name="tasks.send_email")
def send_email(self, to_email: str, subject: str, body: str, attachments: list = None):
"""
发送邮件任务
Args:
to_email: 收件人邮箱
subject: 邮件主题
body: 邮件正文
attachments: 附件列表
"""
try:
# 模拟邮件发送过程
logger.info(f"开始发送邮件到 {to_email}")
# 更新任务状态
self.update_state(state='PROGRESS', meta={
'current': 10,
'total': 100,
'status': '正在准备邮件内容...'
})
# 模拟邮件准备时间
time.sleep(2)
# 更新状态
self.update_state(state='PROGRESS', meta={
'current': 50,
'total': 100,
'status': '正在发送邮件...'
})
# 模拟邮件发送时间
time.sleep(3)
# 模拟邮件发送成功
logger.info(f"邮件已成功发送到 {to_email}")
return {
'status': 'sent',
'to_email': to_email,
'subject': subject,
'sent_at': time.time()
}
except Exception as exc:
logger.error(f"邮件发送失败: {str(exc)}")
# 标记任务失败,触发重试
raise self.retry(exc=exc, countdown=60, max_retries=3)
@celery_app.task(bind=True, name="tasks.process_data")
def process_data(self, data_list: list, operation: str = 'transform'):
"""
数据处理任务
Args:
data_list: 待处理数据列表
operation: 操作类型
"""
total_items = len(data_list)
for i, item in enumerate(data_list):
# 更新进度
self.update_state(state='PROGRESS', meta={
'current': i + 1,
'total': total_items,
'progress': int(((i + 1) / total_items) * 100),
'status': f'正在处理第 {i + 1} 个项目'
})
# 模拟数据处理
time.sleep(0.1)
# 根据操作类型处理数据
if operation == 'transform':
processed_item = f"transformed_{item}"
elif operation == 'validate':
processed_item = f"validated_{item}"
else:
processed_item = item
return {
'status': 'completed',
'processed_count': total_items,
'operation': operation
}#AI模型推理任务
# tasks/ai_tasks.py - AI模型推理任务
import numpy as np
import pickle
from celery_app import celery_app
from typing import Dict, Any, List
import torch
import logging
logger = logging.getLogger(__name__)
@celery_app.task(bind=True, name="tasks.ai_image_generation")
def generate_image(self, prompt: str, style: str = "realistic",
width: int = 512, height: int = 512, user_id: int = None):
"""
AI图像生成任务
Args:
prompt: 图像生成提示词
style: 图像风格
width: 图像宽度
height: 图像高度
user_id: 用户ID
"""
try:
logger.info(f"开始生成图像: {prompt}")
# 更新初始化状态
self.update_state(state='PROGRESS', meta={
'progress': 5,
'status': '正在加载AI模型...',
'step': 'model_loading'
})
# 模拟模型加载(实际应用中会加载预训练模型)
time.sleep(2)
# 更新生成状态
self.update_state(state='PROGRESS', meta={
'progress': 20,
'status': '正在处理提示词...',
'step': 'prompt_processing'
})
# 模拟提示词处理
time.sleep(1)
self.update_state(state='PROGRESS', meta={
'progress': 40,
'status': '正在生成图像...',
'step': 'image_generation'
})
# 模拟图像生成过程(实际应用中会调用AI模型)
for i in range(10): # 模拟生成步骤
time.sleep(0.5) # 模拟每个步骤的耗时
progress = 40 + int((i + 1) * 6) # 从40%到100%
self.update_state(state='PROGRESS', meta={
'progress': progress,
'status': f'正在生成图像... ({i + 1}/10)',
'step': 'image_generation'
})
# 模拟生成图像文件路径
import uuid
image_filename = f"generated_{uuid.uuid4().hex[:8]}_{int(time.time())}.png"
image_path = f"/uploads/images/{image_filename}"
# 模拟保存图像
self.update_state(state='PROGRESS', meta={
'progress': 95,
'status': '正在保存图像...',
'step': 'saving'
})
time.sleep(1)
# 保存记录到数据库(如果提供用户ID)
if user_id:
save_image_record(user_id, image_path, prompt, style)
logger.info(f"图像生成完成: {image_path}")
return {
'status': 'success',
'image_url': image_path,
'prompt': prompt,
'style': style,
'dimensions': f"{width}x{height}",
'generated_at': time.time()
}
except Exception as exc:
logger.error(f"图像生成失败: {str(exc)}")
raise self.retry(exc=exc, countdown=120, max_retries=2)
@celery_app.task(bind=True, name="tasks.ai_text_summarization")
def summarize_text(self, text: str, max_length: int = 100,
model_name: str = "default_summary_model"):
"""
文本摘要生成任务
Args:
text: 待摘要的文本
max_length: 摘要最大长度
model_name: 使用的模型名称
"""
try:
logger.info(f"开始文本摘要生成,原文长度: {len(text)}")
self.update_state(state='PROGRESS', meta={
'progress': 10,
'status': '正在分析文本结构...',
'original_length': len(text)
})
# 模拟文本分析
time.sleep(1)
self.update_state(state='PROGRESS', meta={
'progress': 30,
'status': '正在生成摘要...',
'original_length': len(text)
})
# 模拟摘要生成过程
# 在实际应用中,这里会调用预训练的摘要模型
import random
sentences = text.split('.')
summary_sentences = random.sample(sentences, min(3, len(sentences)))
summary = '.'.join(summary_sentences[:max_length//20]) # 简化摘要逻辑
self.update_state(state='PROGRESS', meta={
'progress': 80,
'status': '正在优化摘要质量...',
'original_length': len(text),
'summary_length': len(summary)
})
time.sleep(1)
return {
'status': 'success',
'original_text_length': len(text),
'summary': summary,
'summary_length': len(summary),
'compression_ratio': round(len(summary) / len(text), 2),
'model_used': model_name
}
except Exception as exc:
logger.error(f"文本摘要生成失败: {str(exc)}")
raise self.retry(exc=exc, countdown=60, max_retries=3)
@celery_app.task(bind=True, name="tasks.ai_batch_prediction")
def batch_predict(self, data_list: List[Dict], model_name: str = "default_model"):
"""
批量AI预测任务
Args:
data_list: 待预测数据列表
model_name: 使用的模型名称
"""
total_items = len(data_list)
predictions = []
for i, data_item in enumerate(data_list):
# 更新进度
self.update_state(state='PROGRESS', meta={
'current': i + 1,
'total': total_items,
'progress': int(((i + 1) / total_items) * 100),
'status': f'正在处理第 {i + 1}/{total_items} 个项目'
})
try:
# 模拟AI预测过程
# 在实际应用中,这里会调用具体的AI模型
prediction = {
'input': data_item,
'prediction': f"predicted_value_{i}",
'confidence': round(random.uniform(0.7, 0.99), 2),
'model': model_name,
'timestamp': time.time()
}
predictions.append(prediction)
time.sleep(0.1) # 模拟预测耗时
except Exception as item_exc:
logger.warning(f"单项预测失败: {str(item_exc)}, 继续处理其他项目")
predictions.append({
'input': data_item,
'error': str(item_exc),
'status': 'failed'
})
success_count = len([p for p in predictions if 'error' not in p])
return {
'status': 'completed',
'total_items': total_items,
'successful_predictions': success_count,
'failed_predictions': total_items - success_count,
'predictions': predictions,
'model_used': model_name
}#文件处理任务
# tasks/file_processing.py - 文件处理任务
import os
import shutil
from pathlib import Path
from celery_app import celery_app
import zipfile
import tempfile
from typing import List, Dict, Any
@celery_app.task(bind=True, name="tasks.process_uploaded_files")
def process_uploaded_files(self, file_paths: List[str], operations: List[str]):
"""
批量文件处理任务
Args:
file_paths: 文件路径列表
operations: 要执行的操作列表
"""
total_files = len(file_paths)
results = []
for i, file_path in enumerate(file_paths):
# 更新进度
self.update_state(state='PROGRESS', meta={
'current': i + 1,
'total': total_files,
'progress': int(((i + 1) / total_files) * 100),
'current_file': os.path.basename(file_path),
'status': f'正在处理文件 {i + 1}/{total_files}'
})
file_result = {
'file_path': file_path,
'original_size': os.path.getsize(file_path) if os.path.exists(file_path) else 0,
'operations': [],
'errors': []
}
try:
for operation in operations:
if operation == 'compress':
result = compress_file(file_path)
elif operation == 'resize_image':
result = resize_image(file_path)
elif operation == 'convert_format':
result = convert_file_format(file_path)
elif operation == 'extract_metadata':
result = extract_file_metadata(file_path)
else:
result = {'error': f'Unknown operation: {operation}'}
file_result['operations'].append({
'operation': operation,
'result': result
})
# 检查操作是否出错
if 'error' in result:
file_result['errors'].append(result['error'])
except Exception as exc:
file_result['errors'].append(str(exc))
results.append(file_result)
return {
'status': 'completed',
'total_files': total_files,
'processed_files': len([r for r in results if not r['errors']]),
'results': results
}
def compress_file(file_path: str) -> Dict[str, Any]:
"""压缩文件"""
try:
original_size = os.path.getsize(file_path)
compressed_path = f"{file_path}.zip"
with zipfile.ZipFile(compressed_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
zipf.write(file_path, os.path.basename(file_path))
compressed_size = os.path.getsize(compressed_path)
compression_ratio = round((original_size - compressed_size) / original_size * 100, 2)
return {
'compressed_path': compressed_path,
'original_size': original_size,
'compressed_size': compressed_size,
'compression_ratio': compression_ratio
}
except Exception as e:
return {'error': str(e)}
def resize_image(file_path: str) -> Dict[str, Any]:
"""调整图像大小(需要Pillow库)"""
try:
from PIL import Image
if not file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')):
return {'error': 'Not an image file'}
with Image.open(file_path) as img:
original_size = img.size
# 调整到512x512(保持宽高比)
img.thumbnail((512, 512), Image.Resampling.LANCZOS)
resized_path = f"{file_path}_resized.jpg"
img.save(resized_path, 'JPEG', quality=85)
return {
'resized_path': resized_path,
'original_size': original_size,
'resized_size': img.size
}
except ImportError:
return {'error': 'Pillow library not installed'}
except Exception as e:
return {'error': str(e)}
def convert_file_format(file_path: str) -> Dict[str, Any]:
"""转换文件格式(示例)"""
try:
# 这里只是一个示例,实际转换需要具体实现
return {
'converted': True,
'original_format': os.path.splitext(file_path)[1],
'new_path': file_path # 实际应用中会创建新文件
}
except Exception as e:
return {'error': str(e)}
def extract_file_metadata(file_path: str) -> Dict[str, Any]:
"""提取文件元数据"""
try:
stat_info = os.stat(file_path)
return {
'size': stat_info.st_size,
'created': stat_info.st_ctime,
'modified': stat_info.st_mtime,
'permissions': oct(stat_info.st_mode)[-3:]
}
except Exception as e:
return {'error': str(e)}#FastAPI集成方案
#FastAPI应用集成
# main.py - FastAPI应用与Celery集成
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.responses import JSONResponse
from celery.result import AsyncResult
from pydantic import BaseModel
from typing import List, Optional
import asyncio
from tasks.ai_tasks import generate_image, summarize_text, batch_predict
from tasks.email_tasks import send_email
from tasks.file_processing import process_uploaded_files
import logging
app = FastAPI(title="Daoman Celery Integration API")
logger = logging.getLogger(__name__)
class ImageGenerationRequest(BaseModel):
"""图像生成请求模型"""
prompt: str
style: str = "realistic"
width: int = 512
height: int = 512
user_id: Optional[int] = None
class EmailRequest(BaseModel):
"""邮件发送请求模型"""
to_email: str
subject: str
body: str
attachments: Optional[List[str]] = []
class BatchPredictionRequest(BaseModel):
"""批量预测请求模型"""
data_list: List[dict]
model_name: str = "default_model"
class FileProcessingRequest(BaseModel):
"""文件处理请求模型"""
file_paths: List[str]
operations: List[str]
@app.post("/tasks/image-generation")
async def create_image_generation_task(request: ImageGenerationRequest):
"""创建图像生成任务"""
try:
task = generate_image.delay(
prompt=request.prompt,
style=request.style,
width=request.width,
height=request.height,
user_id=request.user_id
)
logger.info(f"图像生成任务已创建: {task.id}")
return {
"task_id": task.id,
"status": "submitted",
"message": "图像生成任务已提交,正在后台处理",
"estimated_completion": "30-60秒"
}
except Exception as e:
logger.error(f"创建图像生成任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"任务创建失败: {str(e)}")
@app.post("/tasks/email-send")
async def create_email_task(request: EmailRequest):
"""创建邮件发送任务"""
try:
task = send_email.delay(
to_email=request.to_email,
subject=request.subject,
body=request.body,
attachments=request.attachments
)
logger.info(f"邮件发送任务已创建: {task.id}")
return {
"task_id": task.id,
"status": "submitted",
"message": "邮件发送任务已提交,正在后台处理"
}
except Exception as e:
logger.error(f"创建邮件发送任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"任务创建失败: {str(e)}")
@app.post("/tasks/text-summarization")
async def create_text_summarization_task(text: str, max_length: int = 100):
"""创建文本摘要任务"""
try:
task = summarize_text.delay(text=text, max_length=max_length)
logger.info(f"文本摘要任务已创建: {task.id}")
return {
"task_id": task.id,
"status": "submitted",
"message": "文本摘要任务已提交,正在后台处理"
}
except Exception as e:
logger.error(f"创建文本摘要任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"任务创建失败: {str(e)}")
@app.post("/tasks/batch-predict")
async def create_batch_prediction_task(request: BatchPredictionRequest):
"""创建批量预测任务"""
try:
task = batch_predict.delay(
data_list=request.data_list,
model_name=request.model_name
)
logger.info(f"批量预测任务已创建: {task.id}, 数据量: {len(request.data_list)}")
return {
"task_id": task.id,
"status": "submitted",
"message": f"批量预测任务已提交,共{len(request.data_list)}条数据",
"estimated_completion": f"{len(request.data_list) * 2}秒左右"
}
except Exception as e:
logger.error(f"创建批量预测任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"任务创建失败: {str(e)}")
@app.post("/tasks/file-process")
async def create_file_processing_task(request: FileProcessingRequest):
"""创建文件处理任务"""
try:
task = process_uploaded_files.delay(
file_paths=request.file_paths,
operations=request.operations
)
logger.info(f"文件处理任务已创建: {task.id}, 文件数: {len(request.file_paths)}")
return {
"task_id": task.id,
"status": "submitted",
"message": f"文件处理任务已提交,共{len(request.file_paths)}个文件",
"operations": request.operations
}
except Exception as e:
logger.error(f"创建文件处理任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"任务创建失败: {str(e)}")
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""查询任务状态"""
try:
task_result = AsyncResult(task_id, app=celery_app)
response = {
"task_id": task_id,
"status": task_result.state,
"result": None,
"traceback": None
}
if task_result.state == 'PENDING':
response["status"] = "pending"
response["message"] = "任务尚未开始执行"
elif task_result.state == 'PROGRESS':
response["status"] = "in_progress"
response["info"] = task_result.info
response["message"] = "任务正在执行中"
elif task_result.state == 'SUCCESS':
response["status"] = "completed"
response["result"] = task_result.result
response["message"] = "任务执行成功"
elif task_result.state == 'FAILURE':
response["status"] = "failed"
response["error"] = str(task_result.info)
response["traceback"] = task_result.traceback
response["message"] = "任务执行失败"
elif task_result.state == 'RETRY':
response["status"] = "retrying"
response["info"] = task_result.info
response["message"] = "任务正在重试"
elif task_result.state == 'REVOKED':
response["status"] = "cancelled"
response["message"] = "任务已被取消"
return response
except Exception as e:
logger.error(f"查询任务状态失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"查询任务状态失败: {str(e)}")
@app.delete("/tasks/{task_id}/cancel")
async def cancel_task(task_id: str):
"""取消任务"""
try:
# revoke方法可以取消待执行的任务
celery_app.control.revoke(task_id, terminate=True)
logger.info(f"任务已取消: {task_id}")
return {
"task_id": task_id,
"status": "cancelled",
"message": "任务已成功取消"
}
except Exception as e:
logger.error(f"取消任务失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"取消任务失败: {str(e)}")
@app.get("/tasks/{task_id}/result")
async def get_task_result(task_id: str):
"""获取任务结果(仅在完成时返回)"""
try:
task_result = AsyncResult(task_id, app=celery_app)
if task_result.ready():
if task_result.successful():
return {
"task_id": task_id,
"status": "completed",
"result": task_result.result
}
else:
return {
"task_id": task_id,
"status": "failed",
"error": str(task_result.info)
}
else:
raise HTTPException(
status_code=400,
detail="任务尚未完成,请先检查任务状态"
)
except Exception as e:
logger.error(f"获取任务结果失败: {str(e)}")
raise HTTPException(status_code=500, detail=f"获取任务结果失败: {str(e)}")
@app.post("/tasks/batch-submit")
async def batch_submit_tasks(tasks_data: List[dict]):
"""批量提交任务"""
submitted_tasks = []
for task_data in tasks_data:
try:
task_type = task_data.get("type")
task_params = task_data.get("params", {})
if task_type == "image_generation":
task = generate_image.delay(**task_params)
elif task_type == "email_send":
task = send_email.delay(**task_params)
elif task_type == "text_summarization":
task = summarize_text.delay(**task_params)
elif task_type == "batch_predict":
task = batch_predict.delay(**task_params)
elif task_type == "file_process":
task = process_uploaded_files.delay(**task_params)
else:
continue
submitted_tasks.append({
"task_id": task.id,
"type": task_type,
"status": "submitted"
})
except Exception as e:
logger.error(f"批量提交任务失败: {str(e)}")
submitted_tasks.append({
"type": task_data.get("type"),
"status": "failed",
"error": str(e)
})
return {
"status": "partial_success",
"submitted_count": len(submitted_tasks),
"tasks": submitted_tasks
}
@app.get("/tasks/overview")
async def get_tasks_overview():
"""获取任务概览"""
# 获取活跃任务
inspect = celery_app.control.inspect()
active_tasks = inspect.active()
scheduled_tasks = inspect.scheduled()
reserved_tasks = inspect.reserved()
return {
"overview": {
"active_workers": len(active_tasks) if active_tasks else 0,
"scheduled_tasks": len(list(scheduled_tasks.values())) if scheduled_tasks else 0,
"reserved_tasks": len(list(reserved_tasks.values())) if reserved_tasks else 0,
},
"active_tasks": active_tasks,
"scheduled_tasks": scheduled_tasks,
"reserved_tasks": reserved_tasks
}#任务状态轮询优化
# utils/task_polling.py - 任务状态轮询优化
import asyncio
import time
from typing import List, Dict, Any
from fastapi import WebSocket, WebSocketDisconnect
from celery.result import AsyncResult
from celery_app import celery_app
class TaskPollingManager:
"""任务轮询管理器"""
def __init__(self):
self.active_connections: Dict[str, WebSocket] = {}
self.polling_tasks: Dict[str, asyncio.Task] = {}
async def add_connection(self, task_id: str, websocket: WebSocket):
"""添加WebSocket连接"""
await websocket.accept()
self.active_connections[task_id] = websocket
# 启动轮询任务
polling_task = asyncio.create_task(self.poll_task_status(task_id))
self.polling_tasks[task_id] = polling_task
async def remove_connection(self, task_id: str):
"""移除连接"""
if task_id in self.active_connections:
await self.active_connections[task_id].close()
del self.active_connections[task_id]
if task_id in self.polling_tasks:
self.polling_tasks[task_id].cancel()
del self.polling_tasks[task_id]
async def poll_task_status(self, task_id: str):
"""轮询任务状态并通过WebSocket推送"""
try:
while task_id in self.active_connections:
task_result = AsyncResult(task_id, app=celery_app)
status_data = {
"task_id": task_id,
"status": task_result.state,
"timestamp": time.time()
}
if task_result.state == 'PROGRESS':
status_data["info"] = task_result.info
elif task_result.state == 'SUCCESS':
status_data["result"] = task_result.result
# 任务完成后移除连接
await self.remove_connection(task_id)
break
elif task_result.state == 'FAILURE':
status_data["error"] = str(task_result.info)
await self.remove_connection(task_id)
break
# 发送状态更新
try:
await self.active_connections[task_id].send_json(status_data)
except WebSocketDisconnect:
await self.remove_connection(task_id)
break
# 等待下次轮询
await asyncio.sleep(1)
except asyncio.CancelledError:
# 任务被取消
pass
except Exception as e:
print(f"轮询任务状态出错: {str(e)}")
await self.remove_connection(task_id)
# 全局实例
task_polling_manager = TaskPollingManager()
@app.websocket("/ws/task-status/{task_id}")
async def websocket_task_status(websocket: WebSocket, task_id: str):
"""WebSocket任务状态推送"""
await task_polling_manager.add_connection(task_id, websocket)
try:
while True:
# 保持连接
data = await websocket.receive_text()
# 可以处理来自客户端的消息
except WebSocketDisconnect:
await task_polling_manager.remove_connection(task_id)#任务调度与定时任务
#Celery Beat定时任务配置
# celery_beat_config.py - Celery Beat定时任务配置
from celery.schedules import crontab
from celery_app import celery_app
# 定时任务配置
celery_app.conf.beat_schedule = {
# 每日凌晨3点清理临时文件
"cleanup-temporary-files": {
"task": "tasks.maintenance.cleanup_temporary_files",
"schedule": crontab(hour=3, minute=0),
"options": {
"expires": 300, # 任务过期时间5分钟
"queue": "maintenance"
}
},
# 每周一上午9点发送周报
"send-weekly-report": {
"task": "tasks.reports.send_weekly_report",
"schedule": crontab(hour=9, minute=0, day_of_week=1),
"options": {
"queue": "reports"
}
},
# 每小时检查任务队列状态
"check-task-queue-status": {
"task": "tasks.monitoring.check_queue_status",
"schedule": crontab(minute=0), # 每小时执行
"options": {
"queue": "monitoring"
}
},
# 每天下午2点备份数据
"backup-daily-data": {
"task": "tasks.backup.perform_daily_backup",
"schedule": crontab(hour=14, minute=0),
"options": {
"queue": "backup",
"expires": 3600 # 1小时过期
}
},
# 每15分钟清理过期任务结果
"cleanup-expired-results": {
"task": "tasks.cleanup.cleanup_expired_results",
"schedule": crontab(minute="*/15"), # 每15分钟执行
"options": {
"queue": "cleanup"
}
},
# 工作日早上8点发送提醒邮件
"morning-reminders": {
"task": "tasks.notifications.send_morning_reminders",
"schedule": crontab(hour=8, minute=0, day_of_week="1-5"), # 周一到周五
"options": {
"queue": "notifications"
}
},
# 每月第一天生成月度统计
"monthly-statistics": {
"task": "tasks.analytics.generate_monthly_statistics",
"schedule": crontab(hour=0, minute=0, day_of_month=1), # 每月1号
"options": {
"queue": "analytics"
}
}
}
# 时区配置
celery_app.conf.timezone = 'Asia/Shanghai'
celery_app.conf.enable_utc = False#动态任务调度
# tasks/dynamic_scheduler.py - 动态任务调度
from celery_app import celery_app
from celery.schedules import crontab
from datetime import datetime, timedelta
import json
@celery_app.task(bind=True, name="tasks.dynamic_schedule.create_scheduled_task")
def create_scheduled_task(self, task_config: dict):
"""
创建动态调度任务
Args:
task_config: 任务配置,包含任务类型、参数、调度时间等
"""
try:
task_type = task_config.get("task_type")
task_params = task_config.get("params", {})
schedule_time = task_config.get("schedule_time") # ISO格式时间字符串
recurring = task_config.get("recurring", False)
cron_expression = task_config.get("cron_expression")
if recurring and cron_expression:
# 解析cron表达式并添加到beat_schedule
schedule = parse_cron_expression(cron_expression)
task_name = f"scheduled_{task_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
celery_app.conf.beat_schedule[task_name] = {
"task": f"tasks.{task_type}",
"schedule": schedule,
"args": [task_params],
"options": {
"queue": "scheduled_tasks"
}
}
return {
"status": "success",
"message": "周期性任务已创建",
"task_name": task_name,
"cron_expression": cron_expression
}
else:
# 单次执行任务,使用eta参数
if schedule_time:
eta_time = datetime.fromisoformat(schedule_time.replace('Z', '+00:00'))
task = celery_app.send_task(
f"tasks.{task_type}",
args=[task_params],
eta=eta_time,
queue="scheduled_tasks"
)
return {
"status": "success",
"message": "单次调度任务已创建",
"task_id": task.id,
"scheduled_time": schedule_time
}
else:
# 立即执行
task = celery_app.send_task(
f"tasks.{task_type}",
args=[task_params],
queue="scheduled_tasks"
)
return {
"status": "success",
"message": "即时任务已创建",
"task_id": task.id
}
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=3)
def parse_cron_expression(cron_expr: str):
"""解析cron表达式"""
parts = cron_expr.split()
if len(parts) != 5:
raise ValueError("Invalid cron expression format")
minute, hour, day, month, weekday = parts
return crontab(
minute=minute,
hour=hour,
day_of_week=weekday,
day_of_month=day,
month_of_year=month
)
@celery_app.task(bind=True, name="tasks.dynamic_schedule.list_scheduled_tasks")
def list_scheduled_tasks(self):
"""列出所有调度任务"""
scheduled_tasks = []
for task_name, task_config in celery_app.conf.beat_schedule.items():
scheduled_tasks.append({
"name": task_name,
"task": task_config.get("task", ""),
"schedule": str(task_config.get("schedule", "")),
"args": task_config.get("args", []),
"kwargs": task_config.get("kwargs", {}),
"options": task_config.get("options", {})
})
return {
"status": "success",
"scheduled_tasks": scheduled_tasks,
"total_count": len(scheduled_tasks)
}
@celery_app.task(bind=True, name="tasks.dynamic_schedule.remove_scheduled_task")
def remove_scheduled_task(self, task_name: str):
"""移除调度任务"""
if task_name in celery_app.conf.beat_schedule:
del celery_app.conf.beat_schedule[task_name]
# 这里可能需要重启beat服务才能生效
# 或者使用信号机制通知beat重新加载配置
return {
"status": "success",
"message": f"调度任务 {task_name} 已移除",
"removed_task": task_name
}
else:
return {
"status": "error",
"message": f"调度任务 {task_name} 不存在",
"available_tasks": list(celery_app.conf.beat_schedule.keys())
}#错误处理与重试机制
#全面的错误处理策略
# tasks/error_handling.py - 错误处理策略
from celery import Task
from celery.exceptions import Retry
import logging
from typing import Any, Dict
import traceback
logger = logging.getLogger(__name__)
class RobustTask(Task):
"""健壮的任务基类"""
def on_success(self, retval, task_id, args, kwargs):
"""任务成功时的回调"""
logger.info(f"Task {task_id} completed successfully")
logger.info(f"Return value: {retval}")
# 可以在这里添加成功后的处理逻辑
# 例如:发送通知、更新数据库状态、清理临时资源等
def on_failure(self, exc, task_id, args, kwargs, einfo):
"""任务失败时的回调"""
logger.error(f"Task {task_id} failed: {str(exc)}")
logger.error(f"Traceback: {einfo}")
# 根据异常类型进行不同处理
if isinstance(exc, (ConnectionError, TimeoutError)):
logger.info(f"Network-related error, may retry: {task_id}")
elif isinstance(exc, FileNotFoundError):
logger.error(f"File not found error, not retrying: {task_id}")
# 对于文件不存在等永久性错误,可以选择不重试
return
# 记录失败任务到监控系统
self.record_failure(task_id, exc, args, kwargs)
def record_failure(self, task_id: str, exc: Exception, args: tuple, kwargs: dict):
"""记录失败任务"""
try:
# 这里可以将失败的任务信息存储到数据库或监控系统
failure_record = {
"task_id": task_id,
"exception": str(exc),
"args": str(args),
"kwargs": str(kwargs),
"timestamp": time.time(),
"traceback": traceback.format_exc()
}
# 存储到数据库或消息队列
# save_failure_record(failure_record)
except Exception as e:
logger.error(f"Failed to record failure: {str(e)}")
@celery_app.task(base=RobustTask, bind=True, name="tasks.robust_example_task")
def robust_example_task(self, data: Any):
"""示例健壮任务"""
try:
# 模拟可能出错的操作
if not data:
raise ValueError("Data cannot be empty")
# 模拟处理过程
result = process_data_safely(data)
return {
"status": "success",
"processed_data": result,
"original_input": data
}
except (ValueError, TypeError) as e:
# 对于参数错误等,不重试
logger.error(f"Parameter error in task {self.request.id}: {str(e)}")
raise
except ConnectionError as e:
# 对于网络错误,重试
logger.warning(f"Network error in task {self.request.id}, retrying: {str(e)}")
raise self.retry(exc=e, countdown=60, max_retries=3)
except Exception as e:
# 其他错误,记录并重试
logger.error(f"Unexpected error in task {self.request.id}: {str(e)}")
raise self.retry(exc=e, countdown=120, max_retries=2)
def process_data_safely(data: Any) -> Any:
"""安全的数据处理函数"""
# 实现安全的数据处理逻辑
# 包括输入验证、边界检查、资源管理等
return data
# 重试策略配置
@celery_app.task(
bind=True,
autoretry_for=(Exception,),
retry_kwargs={'max_retries': 5},
retry_backoff=True,
retry_backoff_max=700,
retry_jitter=True,
name="tasks.auto_retry_task"
)
def auto_retry_task(self, data: Any):
"""
自动重试任务
- 对所有异常自动重试
- 指数退避策略
- 随机抖动避免雪崩
"""
# 模拟可能失败的操作
import random
if random.random() < 0.3: # 30%概率失败
raise ConnectionError("Random connection error for testing retry")
return {"status": "success", "data": data, "attempts": self.request.retries + 1}
# 自定义重试逻辑
@celery_app.task(bind=True, name="tasks.custom_retry_task")
def custom_retry_task(self, data: Any, max_retries: int = 3):
"""自定义重试逻辑的任务"""
try:
# 模拟业务逻辑
result = perform_business_operation(data)
return {
"status": "success",
"result": result,
"attempts": self.request.retries + 1
}
except TemporaryError as e:
# 临时错误,重试
if self.request.retries < max_retries:
countdown = min(2 ** self.request.retries * 60, 300) # 最大5分钟
raise self.retry(exc=e, countdown=countdown, max_retries=max_retries)
else:
logger.error(f"Max retries exceeded for task {self.request.id}")
raise
except PermanentError as e:
# 永久错误,不重试
logger.error(f"Permanent error in task {self.request.id}: {str(e)}")
raise
except Exception as e:
# 其他错误,根据类型决定是否重试
if should_retry_error(e):
if self.request.retries < max_retries:
raise self.retry(exc=e, countdown=60, max_retries=max_retries)
else:
raise
else:
raise
class TemporaryError(Exception):
"""临时错误,可以重试"""
pass
class PermanentError(Exception):
"""永久错误,不应重试"""
pass
def perform_business_operation(data: Any) -> Any:
"""执行业务操作"""
# 模拟业务逻辑
return f"processed_{data}"
def should_retry_error(exception: Exception) -> bool:
"""判断是否应该重试错误"""
retryable_errors = [
ConnectionError,
TimeoutError,
ConnectionResetError,
BrokenPipeError,
]
return isinstance(exception, tuple(retryable_errors))#死信队列处理
# tasks/dead_letter_queue.py - 死信队列处理
from celery import group
from celery.signals import task_failure
import logging
logger = logging.getLogger(__name__)
@task_failure.connect
def task_failed_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwargs):
"""任务失败信号处理器"""
logger.error(f"Task {task_id} failed: {str(exception)}")
# 将失败任务信息发送到死信队列进行后续处理
dead_letter_task.delay(
task_id=task_id,
exception=str(exception),
traceback=str(traceback),
original_task=sender.name if sender else None
)
@celery_app.task(name="tasks.dead_letter_queue.handle_dead_letter")
def dead_letter_task(task_id: str, exception: str, traceback: str, original_task: str = None):
"""死信队列处理任务"""
try:
logger.warning(f"Processing dead letter task: {task_id}")
# 记录到死信队列数据库
record_dead_letter({
"task_id": task_id,
"exception": exception,
"traceback": traceback,
"original_task": original_task,
"timestamp": time.time(),
"attempts": get_task_attempt_count(task_id)
})
# 根据错误类型决定后续处理
if "permanent" in exception.lower():
# 永久错误,标记为已处理
mark_as_handled(task_id, "permanent_error")
else:
# 临时错误,可以人工介入处理
notify_admin_for_intervention(task_id, exception)
return {
"status": "dead_letter_processed",
"task_id": task_id,
"handled": True
}
except Exception as e:
logger.error(f"Error processing dead letter task {task_id}: {str(e)}")
raise
def record_dead_letter(dead_letter_info: dict):
"""记录死信队列信息"""
# 实现记录到数据库或监控系统的逻辑
pass
def get_task_attempt_count(task_id: str) -> int:
"""获取任务尝试次数"""
# 实现获取尝试次数的逻辑
return 0
def mark_as_handled(task_id: str, reason: str):
"""标记任务为已处理"""
# 实现标记逻辑
pass
def notify_admin_for_intervention(task_id: str, error: str):
"""通知管理员介入处理"""
# 实现通知逻辑,如发送邮件、短信等
pass#性能优化策略
#Worker性能优化
# config/worker_optimization.py - Worker性能优化配置
from celery import Celery
import multiprocessing
def configure_optimized_celery():
"""配置优化的Celery实例"""
celery = Celery('optimized_daoman')
# 基础配置
celery.conf.update(
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/1',
# 序列化优化
task_serializer='msgpack', # 比JSON更快
result_serializer='msgpack',
accept_content=['msgpack', 'json'],
# 性能相关配置
worker_prefetch_multiplier=1, # 防止任务积压
worker_max_tasks_per_child=1000, # 防止内存泄漏
worker_disable_rate_limits=True, # 禁用速率限制以提高性能
# 任务确认机制
task_acks_late=True,
task_reject_on_worker_lost=True,
# 结果存储优化
result_expires=3600,
result_compression='zlib', # 使用zlib压缩
# 事件监控(生产环境可关闭)
worker_send_task_events=False, # 生产环境建议关闭
task_send_sent_event=False, # 生产环境建议关闭
# 连接池优化
broker_pool_limit=100,
broker_connection_retry_on_startup=True,
# 任务路由优化
task_routes={
'tasks.ai_tasks.*': {
'queue': 'ai_tasks',
'routing_key': 'ai.priority'
},
'tasks.email_tasks.*': {
'queue': 'email_tasks',
'routing_key': 'email.normal'
},
},
# 队列配置
task_default_queue='default',
task_default_exchange='default',
task_default_routing_key='default',
)
return celery
# 高性能任务装饰器
def high_performance_task(func):
"""高性能任务装饰器"""
def wrapper(*args, **kwargs):
# 设置任务的高性能选项
kwargs.setdefault('serializer', 'msgpack')
kwargs.setdefault('compression', 'zlib')
kwargs.setdefault('delivery_mode', 2) # 持久化
return func(*args, **kwargs)
return wrapper
@celery_app.task(
bind=True,
serializer='msgpack',
compression='zlib',
delivery_mode=2,
name="tasks.high_performance_ai_processing"
)
def high_performance_ai_processing(self, data_batch: list):
"""高性能AI批处理任务"""
import numpy as np
try:
# 使用numpy进行批量处理以提高性能
processed_batch = []
for i, item in enumerate(data_batch):
# 更新进度(批量更新以减少开销)
if i % 10 == 0: # 每处理10个项目更新一次
self.update_state(state='PROGRESS', meta={
'current': i,
'total': len(data_batch),
'progress': int((i / len(data_batch)) * 100),
'status': f'Processing batch... ({i}/{len(data_batch)})'
})
# 高效的数据处理
processed_item = efficient_process_item(item)
processed_batch.append(processed_item)
return {
'status': 'completed',
'processed_count': len(processed_batch),
'batch_size': len(data_batch),
'results': processed_batch
}
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=2)
def efficient_process_item(item):
"""高效处理单个项目"""
# 实现高效的处理逻辑
# 使用向量化操作、缓存等技术
return item
# 内存优化任务
@celery_app.task(bind=True, name="tasks.memory_efficient_processing")
def memory_efficient_processing(self, large_dataset: list):
"""内存优化处理任务"""
processed_results = []
batch_size = 100 # 批处理大小
for i in range(0, len(large_dataset), batch_size):
batch = large_dataset[i:i + batch_size]
# 处理批次
batch_results = process_batch(batch)
processed_results.extend(batch_results)
# 显式释放内存
del batch, batch_results
# 更新进度
progress = min(i + batch_size, len(large_dataset))
self.update_state(state='PROGRESS', meta={
'current': progress,
'total': len(large_dataset),
'progress': int((progress / len(large_dataset)) * 100),
'status': f'Memory-efficient processing... ({progress}/{len(large_dataset)})'
})
return {
'status': 'completed',
'processed_count': len(processed_results),
'memory_efficiency': 'optimized'
}
def process_batch(batch: list):
"""处理数据批次"""
# 实现批次处理逻辑
return [item for item in batch] # 示例处理#任务分片处理
# tasks/sharding.py - 任务分片处理
from celery import group, chord
from typing import List, Any
@celery_app.task(bind=True, name="tasks.sharded_processing.master")
def sharded_processing_master(self, large_dataset: List[Any], shard_count: int = 4):
"""分片处理主任务"""
try:
# 将大数据集分片
shards = split_into_shards(large_dataset, shard_count)
# 创建分片处理任务组
processing_tasks = [
sharded_processing_worker.s(shard_data, shard_idx)
for shard_idx, shard_data in enumerate(shards)
]
# 使用group并行执行所有分片任务
job = group(processing_tasks)
result = job.apply_async()
# 等待所有分片完成并合并结果
shard_results = result.get(propagate=True)
# 合并结果
final_result = merge_shard_results(shard_results)
return {
'status': 'completed',
'shard_count': shard_count,
'total_items': len(large_dataset),
'shard_results': shard_results,
'final_result': final_result
}
except Exception as exc:
raise self.retry(exc=exc, countdown=120, max_retries=2)
@celery_app.task(bind=True, name="tasks.sharded_processing.worker")
def sharded_processing_worker(self, shard_data: List[Any], shard_idx: int):
"""分片处理工作者任务"""
try:
processed_data = []
for i, item in enumerate(shard_data):
# 更新分片进度
self.update_state(state='PROGRESS', meta={
'shard_idx': shard_idx,
'current': i + 1,
'total': len(shard_data),
'progress': int(((i + 1) / len(shard_data)) * 100),
'status': f'Shard {shard_idx} processing... ({i + 1}/{len(shard_data)})'
})
processed_item = process_single_item(item)
processed_data.append(processed_item)
return {
'shard_idx': shard_idx,
'processed_count': len(processed_data),
'data': processed_data
}
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=3)
def split_into_shards(dataset: List[Any], shard_count: int) -> List[List[Any]]:
"""将数据集分片"""
shard_size = len(dataset) // shard_count
shards = []
for i in range(shard_count):
start_idx = i * shard_size
if i == shard_count - 1: # 最后一片包含余数
end_idx = len(dataset)
else:
end_idx = start_idx + shard_size
shards.append(dataset[start_idx:end_idx])
return shards
def merge_shard_results(shard_results: List[dict]) -> dict:
"""合并分片结果"""
merged_data = []
total_processed = 0
for result in shard_results:
merged_data.extend(result.get('data', []))
total_processed += result.get('processed_count', 0)
return {
'merged_data': merged_data,
'total_processed': total_processed,
'shard_count': len(shard_results)
}
def process_single_item(item: Any) -> Any:
"""处理单个项目"""
# 实现具体的处理逻辑
return item
# MapReduce模式任务
@celery_app.task(bind=True, name="tasks.map_reduce.execute")
def map_reduce_execute(self, dataset: List[Any], map_func: str, reduce_func: str):
"""MapReduce模式执行任务"""
try:
# Map阶段:并行处理数据
mapped_results = []
batch_size = 100
for i in range(0, len(dataset), batch_size):
batch = dataset[i:i + batch_size]
# 更新进度
self.update_state(state='PROGRESS', meta={
'phase': 'mapping',
'current_batch': i // batch_size + 1,
'total_batches': (len(dataset) + batch_size - 1) // batch_size,
'status': 'Executing Map phase...'
})
# 处理批次
batch_mapped = execute_map_function(batch, map_func)
mapped_results.extend(batch_mapped)
# Reduce阶段:聚合结果
final_result = execute_reduce_function(mapped_results, reduce_func)
return {
'status': 'completed',
'phase': 'completed',
'map_results_count': len(mapped_results),
'final_result': final_result
}
except Exception as exc:
raise self.retry(exc=exc, countdown=120, max_retries=2)
def execute_map_function(data_batch: List[Any], func_name: str) -> List[Any]:
"""执行映射函数"""
# 根据函数名执行相应的映射操作
if func_name == 'word_count':
return word_count_mapper(data_batch)
elif func_name == 'data_filter':
return data_filter_mapper(data_batch)
else:
return data_batch # 默认返回原数据
def execute_reduce_function(mapped_data: List[Any], func_name: str) -> Any:
"""执行归约函数"""
# 根据函数名执行相应的归约操作
if func_name == 'sum':
return sum(mapped_data)
elif func_name == 'count':
return len(mapped_data)
else:
return mapped_data
def word_count_mapper(data_batch: List[str]) -> List[tuple]:
"""词频统计映射器"""
word_counts = []
for text in data_batch:
words = text.lower().split()
for word in words:
word_counts.append((word, 1))
return word_counts
def data_filter_mapper(data_batch: List[dict]) -> List[dict]:
"""数据过滤映射器"""
# 示例:过滤出数值大于10的项目
return [item for item in data_batch if item.get('value', 0) > 10]#监控与管理
#自定义监控任务
# tasks/monitoring.py - 监控任务
import psutil
import time
from celery.signals import task_prerun, task_postrun, task_revoked
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@celery_app.task(bind=True, name="tasks.monitoring.system_monitor")
def system_monitor(self):
"""系统监控任务"""
try:
# 收集系统指标
metrics = {
"timestamp": time.time(),
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent,
"load_average": psutil.getloadavg(),
"process_count": len(psutil.pids()),
"network_io": psutil.net_io_counters()._asdict(),
"celery_stats": get_celery_stats()
}
# 检查是否需要告警
alerts = check_system_alerts(metrics)
return {
"status": "completed",
"metrics": metrics,
"alerts": alerts,
"healthy": len(alerts) == 0
}
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=3)
def get_celery_stats():
"""获取Celery统计信息"""
inspect = celery_app.control.inspect()
return {
"active": inspect.active(),
"scheduled": inspect.scheduled(),
"reserved": inspect.reserved(),
"stats": inspect.stats(),
"registered": inspect.registered_tasks(),
"revoked": inspect.revoked()
}
def check_system_alerts(metrics: dict) -> list:
"""检查系统告警"""
alerts = []
# CPU使用率告警
if metrics["cpu_percent"] > 80:
alerts.append({
"type": "high_cpu",
"severity": "warning",
"message": f"CPU使用率过高: {metrics['cpu_percent']}%"
})
# 内存使用率告警
if metrics["memory_percent"] > 85:
alerts.append({
"type": "high_memory",
"severity": "warning",
"message": f"内存使用率过高: {metrics['memory_percent']}%"
})
# 磁盘使用率告警
if metrics["disk_percent"] > 90:
alerts.append({
"type": "high_disk",
"severity": "critical",
"message": f"磁盘使用率过高: {metrics['disk_percent']}%"
})
# 队列积压告警
if metrics["celery_stats"]["active"]:
active_count = sum(len(workers) for workers in metrics["celery_stats"]["active"].values())
if active_count > 50: # 假设超过50个活跃任务为积压
alerts.append({
"type": "queue_backlog",
"severity": "warning",
"message": f"任务队列积压: {active_count} 个活跃任务"
})
return alerts
@celery_app.task(bind=True, name="tasks.monitoring.queue_monitor")
def queue_monitor(self):
"""队列监控任务"""
try:
inspect = celery_app.control.inspect()
# 获取队列状态
active_tasks = inspect.active()
scheduled_tasks = inspect.scheduled()
reserved_tasks = inspect.reserved()
# 统计信息
stats = {
"active_workers": len(active_tasks) if active_tasks else 0,
"total_active_tasks": sum(len(tasks) for tasks in active_tasks.values()) if active_tasks else 0,
"total_scheduled_tasks": sum(len(tasks) for tasks in scheduled_tasks.values()) if scheduled_tasks else 0,
"total_reserved_tasks": sum(len(tasks) for tasks in reserved_tasks.values()) if reserved_tasks else 0,
"queue_sizes": get_queue_sizes(),
"task_rates": get_task_rates()
}
# 检查异常情况
issues = check_queue_issues(stats)
return {
"status": "completed",
"statistics": stats,
"issues": issues,
"healthy": len(issues) == 0
}
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=3)
def get_queue_sizes():
"""获取队列大小"""
# 这里需要访问Redis或其他broker来获取队列长度
# 示例实现(需要根据实际broker调整)
return {
"default": 0,
"ai_tasks": 0,
"email_tasks": 0,
"file_processing": 0
}
def get_task_rates():
"""获取任务速率"""
# 获取最近一段时间内的任务处理速率
return {
"tasks_per_minute": 0,
"tasks_per_hour": 0,
"average_processing_time": 0
}
def check_queue_issues(stats: dict) -> list:
"""检查队列问题"""
issues = []
if stats["total_active_tasks"] > 100:
issues.append({
"type": "high_active_tasks",
"severity": "warning",
"message": f"活跃任务过多: {stats['total_active_tasks']} 个"
})
if stats["total_scheduled_tasks"] > 50:
issues.append({
"type": "high_scheduled_tasks",
"severity": "warning",
"message": f"调度任务过多: {stats['total_scheduled_tasks']} 个"
})
return issues
# 任务生命周期信号监听
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
"""任务运行前信号"""
logger.info(f"Task {task_id} ({task.name}) starting...")
# 可以在这里记录任务开始时间、分配资源等
@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
"""任务运行后信号"""
logger.info(f"Task {task_id} ({task.name}) completed with state: {state}")
# 可以在这里记录任务完成时间、释放资源、发送通知等
@task_revoked.connect
def task_revoked_handler(sender=None, task_id=None, terminated=None, signum=None, **kwds):
"""任务撤销信号"""
logger.warning(f"Task {task_id} was revoked (terminated: {terminated}, signal: {signum})")
# 可以在这里清理被撤销任务的资源#Flower监控配置
# config/flower_config.py - Flower监控配置
"""
Flower监控配置和使用说明
安装:
pip install flower
启动:
celery -A celery_app flower --port=5555
配置选项:
"""
import os
class FlowerConfig:
"""Flower配置类"""
# 基础配置
port = 5555
address = '0.0.0.0'
debug = False
# 认证配置
basic_auth = os.getenv('FLOWER_BASIC_AUTH') # 格式: "username:password"
auth = os.getenv('FLOWER_OAUTH_CREDENTIALS') # OAuth配置
# Broker配置
broker_api = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
# 数据库配置(用于持久化监控数据)
persistent = True
db = os.getenv('FLOWER_DB_PATH', '/tmp/flower.db')
# 时间配置
natural_time = True
# API配置
max_workers = 100
max_tasks = 10000
# 跨域配置
cors_options = {
'origins': ['*'],
'credentials': True,
'methods': ['GET', 'POST'],
'headers': ['*'],
}
# 启动Flower命令示例
"""
celery -A celery_app flower \
--port=5555 \
--basic_auth=user:password \
--broker_api=redis://localhost:6379/0 \
--persistent=True \
--db=/var/lib/flower/flower.db
"""
## 生产环境部署 \{#生产环境部署}
### Docker部署方案
```dockerfile
# Dockerfile - Celery Worker
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
libpq-dev \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd --create-home --shell /bin/bash celery
USER celery
# 启动Celery Worker
CMD ["celery", "-A", "celery_app", "worker", "--loglevel=info", "--concurrency=4"]# Dockerfile.beat - Celery Beat
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 创建非root用户
RUN useradd --create-home --shell /bin/bash celery
USER celery
# 启动Celery Beat
CMD ["celery", "-A", "celery_app", "beat", "--loglevel=info"]# docker-compose.yml - 完整部署配置
version: '3.8'
services:
redis:
image: redis:7-alpine
restart: always
ports:
- "6379:6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
worker:
build:
context: .
dockerfile: Dockerfile
restart: always
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
volumes:
- ./logs:/app/logs
deploy:
replicas: 3 # 多个Worker实例
beat:
build:
context: .
dockerfile: Dockerfile.beat
restart: always
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379/0
- CELERY_BROKER_URL=redis://redis:6379/0
- CELERY_RESULT_BACKEND=redis://redis:6379/1
flower:
image: mher/flower:latest
restart: always
depends_on:
- redis
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
- FLOWER_PORT=5555
ports:
- "5555:5555"
command: ["--broker=redis://redis:6379/0", "--port=5555"]
volumes:
redis_data:#系统服务配置
# /etc/systemd/system/celery-worker.service - Worker服务
[Unit]
Description=Celery Worker
After=network.target redis.service
[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/opt/daoman
Environment="CELERY_CONFIG_MODULE=config.celery_config"
ExecStart=/opt/daoman/venv/bin/celery -A celery_app worker \
--pidfile=/var/run/celery/worker.pid \
--logfile=/var/log/celery/worker.log \
--loglevel=INFO \
--concurrency=8 \
--max-tasks-per-child=1000
ExecReload=/bin/kill -HUP $MAINPID
KillSignal=SIGTERM
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target# /etc/systemd/system/celery-beat.service - Beat服务
[Unit]
Description=Celery Beat
After=network.target redis.service
[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/opt/daoman
Environment="CELERY_CONFIG_MODULE=config.celery_config"
ExecStart=/opt/daoman/venv/bin/celery -A celery_app beat \
--pidfile=/var/run/celery/beat.pid \
--logfile=/var/log/celery/beat.log \
--loglevel=INFO
ExecReload=/bin/kill -HUP $MAINPID
KillSignal=SIGTERM
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target#部署脚本
#!/bin/bash
# deploy_celery.sh - Celery部署脚本
set -e
APP_NAME="daoman"
APP_DIR="/opt/$APP_NAME"
VENV_DIR="$APP_DIR/venv"
LOG_DIR="/var/log/celery"
RUN_DIR="/var/run/celery"
echo "🚀 开始部署 Celery..."
# 创建必要目录
sudo mkdir -p $LOG_DIR $RUN_DIR
sudo chown www-data:www-data $LOG_DIR $RUN_DIR
# 更新代码
cd $APP_DIR
git pull origin main
# 更新依赖
$VENV_DIR/bin/pip install -r requirements.txt
# 重启服务
echo "🔄 重启 Celery Worker..."
sudo systemctl restart celery-worker
sudo systemctl status celery-worker
echo "🔄 重启 Celery Beat..."
sudo systemctl restart celery-beat
sudo systemctl status celery-beat
# 验证部署
sleep 5
if sudo systemctl is-active --quiet celery-worker && sudo systemctl is-active --quiet celery-beat; then
echo "✅ Celery 部署成功"
echo "📊 活跃 Worker: $(celery -A celery_app inspect active | grep -c '->') "
else
echo "❌ Celery 部署失败"
exit 1
fi
echo "🎉 部署完成!"#最佳实践指南
#1. 任务设计最佳实践
# best_practices/tasks.py - 任务设计最佳实践
# ✅ 好的做法:任务幂等性
@celery_app.task(bind=True, name="tasks.update_user_profile")
def update_user_profile(self, user_id: int, profile_data: dict):
"""
更新用户资料 - 幂等操作
同样的输入多次执行结果一致
"""
try:
# 检查任务是否已执行过(防重复)
if has_task_executed_recently(self.request.id):
return {"status": "skipped", "reason": "task_already_executed"}
# 执行更新操作
result = perform_profile_update(user_id, profile_data)
# 记录执行历史
record_task_execution(self.request.id, user_id, profile_data)
return result
except Exception as exc:
raise self.retry(exc=exc, countdown=60, max_retries=3)
# ✅ 好的做法:任务原子性
@celery_app.task(bind=True, name="tasks.transfer_money")
def transfer_money(self, from_account: int, to_account: int, amount: float):
"""
转账操作 - 原子操作
使用数据库事务保证一致性
"""
from sqlalchemy.orm import sessionmaker
from database import engine
Session = sessionmaker(bind=engine)
session = Session()
try:
# 扣款
debit_result = debit_account(session, from_account, amount)
if not debit_result:
raise ValueError(f"Insufficient funds in account {from_account}")
# 入账
credit_result = credit_account(session, to_account, amount)
if not credit_result:
raise ValueError(f"Failed to credit account {to_account}")
# 提交事务
session.commit()
return {
"status": "success",
"from_account": from_account,
"to_account": to_account,
"amount": amount,
"transaction_id": generate_transaction_id()
}
except Exception as exc:
session.rollback()
raise self.retry(exc=exc, countdown=30, max_retries=2)
finally:
session.close()
# ✅ 好的做法:任务数据序列化
@celery_app.task(bind=True, name="tasks.process_complex_data")
def process_complex_data(self, data: dict):
"""
处理复杂数据 - 安全序列化
避免传递不可序列化的对象
"""
# 确保数据可以被序列化
serializable_data = ensure_serializable(data)
# 处理数据
result = perform_complex_processing(serializable_data)
# 确保结果可以被序列化
return ensure_serializable(result)
def ensure_serializable(obj):
"""确保对象可以被序列化"""
import json
from datetime import datetime, date
from decimal import Decimal
def serialize_helper(o):
if isinstance(o, (datetime, date)):
return o.isoformat()
elif isinstance(o, Decimal):
return float(o)
elif hasattr(o, '__dict__'):
return o.__dict__
else:
return str(o)
try:
json.dumps(obj, default=serialize_helper)
return obj
except TypeError:
# 如果无法序列化,转换为字典
return json.loads(json.dumps(obj, default=serialize_helper))#2. 性能优化最佳实践
# best_practices/performance.py - 性能优化最佳实践
# ✅ 批处理优化
@celery_app.task(bind=True, name="tasks.batch_process_emails")
def batch_process_emails(self, email_data_list: list, batch_size: int = 50):
"""
批量处理邮件 - 减少网络请求次数
"""
total_emails = len(email_data_list)
for i in range(0, total_emails, batch_size):
batch = email_data_list[i:i + batch_size]
# 一次请求处理一批邮件
batch_result = process_email_batch(batch)
# 更新进度
processed = min(i + batch_size, total_emails)
self.update_state(state='PROGRESS', meta={
'current': processed,
'total': total_emails,
'progress': int((processed / total_emails) * 100),
'status': f'Processed {processed}/{total_emails} emails'
})
return {
'status': 'completed',
'total_emails': total_emails,
'batches_processed': (total_emails + batch_size - 1) // batch_size
}
# ✅ 缓存优化
from functools import lru_cache
import redis
redis_client = redis.Redis(host='localhost', port=6379, db=2)
@celery_app.task(bind=True, name="tasks.expensive_calculation")
def expensive_calculation(self, input_data: dict):
"""
昂贵计算 - 使用缓存避免重复计算
"""
# 生成缓存键
cache_key = f"calculation:{hash(str(sorted(input_data.items())))}"
# 尝试从缓存获取结果
cached_result = redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# 执行计算
result = perform_expensive_calculation(input_data)
# 缓存结果(1小时过期)
redis_client.setex(cache_key, 3600, json.dumps(result))
return result
# ✅ 数据库连接优化
from sqlalchemy.pool import StaticPool
from sqlalchemy import create_engine
import threading
# 为每个Worker进程创建连接池
_local = threading.local()
def get_db_engine():
"""获取数据库引擎(每个线程一个)"""
if not hasattr(_local, 'engine'):
_local.engine = create_engine(
'sqlite:///example.db', # 实际使用时替换为真实数据库
poolclass=StaticPool,
creator=lambda: sqlite3.connect(':memory:')
)
return _local.engine
@celery_app.task(bind=True, name="tasks.database_intensive_task")
def database_intensive_task(self, query_params: dict):
"""
数据库密集型任务 - 优化连接使用
"""
engine = get_db_engine()
with engine.connect() as conn:
# 执行数据库操作
result = conn.execute(text("SELECT * FROM table WHERE condition=:condition"),
condition=query_params.get('condition'))
return [dict(row) for row in result]#3. 安全最佳实践
# best_practices/security.py - 安全最佳实践
# ✅ 输入验证
from marshmallow import Schema, fields, ValidationError
class TaskInputSchema(Schema):
user_id = fields.Integer(required=True)
data = fields.Dict(required=True)
callback_url = fields.Url(required=False)
task_input_schema = TaskInputSchema()
@celery_app.task(bind=True, name="tasks.secure_task")
def secure_task(self, input_data: dict):
"""
安全任务 - 输入验证和清理
"""
try:
# 验证输入
validated_data = task_input_schema.load(input_data)
# 清理危险字符
sanitized_data = sanitize_input(validated_data)
# 执行任务
return process_secure_task(sanitized_data)
except ValidationError as exc:
raise ValueError(f"Invalid input: {exc.messages}")
def sanitize_input(data: dict) -> dict:
"""清理输入数据"""
import html
import re
def clean_value(value):
if isinstance(value, str):
# HTML转义
value = html.escape(value)
# 移除危险字符
value = re.sub(r'[<>"\']', '', value)
elif isinstance(value, dict):
return {k: clean_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [clean_value(item) for item in value]
return value
return clean_value(data)
# ✅ 任务隔离
import subprocess
import tempfile
import os
@celery_app.task(bind=True, name="tasks.isolated_execution")
def isolated_execution(self, code: str, inputs: dict):
"""
隔离执行 - 在沙箱中运行代码
"""
# 创建临时目录
with tempfile.TemporaryDirectory() as temp_dir:
# 写入代码到临时文件
code_file = os.path.join(temp_dir, "user_code.py")
with open(code_file, "w") as f:
f.write(code)
# 在隔离环境中执行
result = subprocess.run(
[sys.executable, code_file],
input=json.dumps(inputs),
text=True,
capture_output=True,
timeout=30, # 30秒超时
cwd=temp_dir
)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"return_code": result.returncode
}#常见问题解答
#Q1: 如何处理任务超时?
A: 可以通过设置task_time_limit和task_soft_time_limit来控制硬超时和软超时。硬超时会强制终止任务,软超时会抛出SoftTimeLimitExceeded异常,允许任务优雅地处理超时。
#Q2: 如何实现任务优先级?
A: 可以通过配置多个队列,将不同优先级的任务路由到不同的队列。高优先级队列配置更高的消费速率。
#Q3: 如何处理任务失败和重试?
A: Celery提供了丰富的重试机制,可以通过retry()方法手动重试,或使用autoretry_for参数自动重试。建议根据错误类型决定是否重试。
#Q4: 如何监控任务执行情况?
A: 可以使用Flower进行实时监控,或通过Celery的inspect接口获取任务状态。也可以自定义信号处理器来记录任务执行情况。
#Q5: 如何处理大量任务积压?
A: 可以通过增加Worker数量、优化任务执行效率、使用任务分片等方式处理任务积压。也可以考虑使用优先级队列,确保重要任务优先执行。
#总结
Celery为FastAPI应用提供了强大的异步任务处理能力,适用于各种耗时操作:
- 架构优势:生产者-消费者模式,解耦服务,提高响应性
- 扩展性:支持多Worker、多队列,轻松水平扩展
- 可靠性:任务持久化、失败重试、死信队列
- 灵活性:定时任务、动态调度、任务链式执行
💡 核心要点:合理设计任务粒度,优化任务执行效率,建立完善的监控体系,确保系统的稳定性和可维护性。
#SEO优化建议
为了提高这篇Celery教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:
#标题优化
- 主标题:使用包含核心关键词的标题,如"FastAPI异步任务队列Celery完全指南"
- 二级标题:每个章节标题都包含相关的长尾关键词
- H1-H6层次结构:保持正确的标题层级,便于搜索引擎理解内容结构
#内容优化
- 关键词密度:在内容中自然地融入关键词如"Celery", "异步任务", "任务队列", "AI模型推理", "定时任务"等
- 元描述:在文章开头的元数据中包含吸引人的描述
- 内部链接:链接到其他相关教程,如流式响应 StreamingResponse等
- 外部权威链接:引用官方文档和权威资源
#技术SEO
- 页面加载速度:优化代码块和图片加载
- 移动端适配:确保在移动设备上良好显示
- 结构化数据:使用适当的HTML标签和语义化元素
#用户体验优化
- 内容可读性:使用清晰的段落结构和代码示例
- 互动元素:提供实际可运行的代码示例
- 更新频率:定期更新内容以保持时效性
#常见问题解答(FAQ)
#Q1: Celery和FastAPI如何集成?
A: 通过在FastAPI应用中调用Celery任务的delay()方法提交任务,然后通过AsyncResult获取任务状态。可以使用WebSocket实现实时状态推送。
#Q2: 如何处理大量并发任务?
A: 需要合理配置Worker数量,使用多队列分离不同类型的任务,实施任务分片处理,监控系统资源使用情况。
#Q3: 任务失败如何处理?
A: 可以配置自动重试机制,使用死信队列处理永久失败的任务,记录失败日志便于排查问题。
#Q4: 如何监控Celery性能?
A: 使用Flower进行可视化监控,配置系统监控收集性能指标,自定义监控任务检查队列状态。
#Q5: 生产环境部署需要注意什么?
A: 配置持久化存储,设置合理的资源限制,实施安全措施,建立完整的监控告警体系。
🔗 相关教程推荐
- 流式响应 StreamingResponse - 实时数据推送技术
- Redis 集成 - 缓存与消息队列
- Nginx与Gunicorn生产部署 - 生产环境部署
- Docker容器化部署 - 容器化部署策略
- Pydantic Settings多环境配置 - 环境配置管理
🏷️ 标签云: FastAPI Celery 异步任务队列 AI模型推理 定时任务 任务调度 后台处理 消息队列 任务监控 性能优化

