FastAPI异步任务队列Celery完全指南

📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:流式响应 StreamingResponse · Redis 集成

目录

异步任务队列基础概念

为什么需要任务队列?

在现代Web应用中,许多操作都是耗时的,比如:

传统同步处理:
┌─────────────────────────────────────────────────────┐
│  用户请求 → 服务器 → 执行耗时任务 → 返回响应      │
│  (用户需等待任务完成,可能超时)                     │
└─────────────────────────────────────────────────────┘

异步任务队列处理:
┌─────────────────────────────────────────────────────┐
│  用户请求 → 服务器 → 添加到任务队列 → 立即响应    │
│  后台Worker → 执行任务 → 存储结果 → 通知用户      │
└─────────────────────────────────────────────────────┘

任务队列的核心优势

优势说明应用场景
响应性立即响应用户请求AI模型推理、图像生成
可扩展性水平扩展Worker处理能力大规模数据处理
可靠性任务持久化,失败重试邮件发送、文件处理
解耦性服务间松耦合微服务架构

Celery架构详解

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   Producer      │───→│   Message       │───→│   Celery        │
│   (FastAPI)     │    │   Broker        │    │   Workers       │
│   (提交任务)     │    │   (Redis/RabbitMQ)│    │   (执行任务)   │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         ↑                                              │
         └──────────────────────────────────────────────┘
                           ┌─────────────────┐
                           │   Results       │
                           │   Backend       │
                           │   (Redis/DB)    │
                           └─────────────────┘

典型应用场景

  1. AI模型推理:批量处理AI模型预测任务
  2. 图像/视频处理:图像生成、视频转码
  3. 邮件发送:批量邮件、通知推送
  4. 数据处理:ETL作业、报表生成
  5. 文件操作:大文件上传、文档转换
  6. 第三方API调用:外部服务集成

Celery安装与配置

安装依赖

# 基础安装
pip install celery[redis] redis

# 包含更多功能的安装
pip install celery[redis,auth,msgpack,librabbitmq,pyro,yaml,sqs,gevent,eventlet,couchbase,cassandra,mongodb]

# 如果使用RabbitMQ作为Broker
pip install celery[rabbitmq]

Celery配置文件

# celery_app.py - Celery应用配置
from celery import Celery
from kombu import Queue
import os

def make_celery(app_name=__name__):
    """创建Celery应用实例"""
    redis_url = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
    
    celery = Celery(
        app_name,
        broker=redis_url,
        backend=redis_url.replace('/0', '/1'),  # 使用不同的数据库
        include=[
            'tasks.ai_tasks',
            'tasks.email_tasks', 
            'tasks.file_processing',
            'tasks.data_analysis'
        ]
    )
    
    # 详细配置
    celery.conf.update(
        # 序列化配置
        task_serializer='json',
        accept_content=['json'],
        result_serializer='json',
        timezone='Asia/Shanghai',
        enable_utc=True,
        
        # 任务跟踪
        task_track_started=True,
        
        # 任务超时设置
        task_time_limit=300,          # 硬超时:5分钟
        task_soft_time_limit=240,     # 软超时:4分钟
        
        # 工作进程配置
        worker_prefetch_multiplier=1,  # 防止任务积压
        worker_max_tasks_per_child=1000,  # 每个工作进程处理1000个任务后重启
        
        # 确认机制
        task_acks_late=True,           # 任务完成后才确认
        task_reject_on_worker_lost=True,
        
        # 结果存储
        result_expires=3600,          # 结果1小时后自动清除
        result_compression='gzip',    # 结果压缩
        
        # 重试配置
        task_retry_kwargs={'max_retries': 3},
        task_retry_backoff=True,
        task_retry_backoff_max=700,
        task_retry_jitter=True,
        
        # 队列配置
        task_default_queue='default',
        task_routes={
            'tasks.ai_tasks.*': {'queue': 'ai_tasks'},
            'tasks.email_tasks.*': {'queue': 'email_tasks'},
            'tasks.file_processing.*': {'queue': 'file_processing'},
        },
        task_create_missing_queues=True,
        
        # 监控配置
        worker_send_task_events=True,
        task_send_sent_event=True,
        
        # 安全配置
        security_key=os.getenv('CELERY_SECURITY_KEY'),
        security_certificate=os.getenv('CELERY_CERT_PATH'),
        security_authority=os.getenv('CELERY_AUTHORITY_PATH'),
    )
    
    return celery

# 创建全局Celery实例
celery_app = make_celery('daoman_celery')

# 队列定义
celery_app.conf.task_queues = (
    Queue('default', routing_key='default'),
    Queue('ai_tasks', routing_key='ai.#'),
    Queue('email_tasks', routing_key='email.#'),
    Queue('file_processing', routing_key='file.#'),
)

环境配置

# config/celery_config.py - 配置管理
import os
from kombu import Queue

class CeleryConfig:
    """Celery配置类"""
    
    # Broker配置
    BROKER_URL = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
    RESULT_BACKEND = os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/1')
    
    # 序列化配置
    TASK_SERIALIZER = 'json'
    ACCEPT_CONTENT = ['json']
    RESULT_SERIALIZER = 'json'
    
    # 时区配置
    TIMEZONE = 'Asia/Shanghai'
    ENABLE_UTC = True
    
    # 任务配置
    TASK_TRACK_STARTED = True
    TASK_TIME_LIMIT = 300
    TASK_SOFT_TIME_LIMIT = 240
    
    # 重试配置
    TASK_RETRY_KWARGS = {'max_retries': 3}
    TASK_RETRY_BACKOFF = True
    TASK_RETRY_BACKOFF_MAX = 700
    TASK_RETRY_JITTER = True
    
    # 结果配置
    RESULT_EXPIRES = 3600
    RESULT_COMPRESSION = 'gzip'
    
    # 工作进程配置
    WORKER_PREFETCH_MULTIPLIER = 1
    WORKER_MAX_TASKS_PER_CHILD = 1000
    WORKER_SEND_TASK_EVENTS = True
    TASK_SEND_SENT_EVENT = True
    
    # 队列路由
    TASK_ROUTES = {
        'tasks.ai_tasks.*': {'queue': 'ai_tasks'},
        'tasks.email_tasks.*': {'queue': 'email_tasks'},
        'tasks.file_processing.*': {'queue': 'file_processing'},
    }
    
    # 自定义队列
    TASK_QUEUES = (
        Queue('default', routing_key='default'),
        Queue('ai_tasks', routing_key='ai.#'),
        Queue('email_tasks', routing_key='email.#'),
        Queue('file_processing', routing_key='file.#'),
    )
    
    # 安全配置
    SECURITY_KEY = os.getenv('CELERY_SECURITY_KEY')
    SECURITY_CERTIFICATE = os.getenv('CELERY_CERT_PATH')
    SECURITY_AUTHORITY = os.getenv('CELERY_AUTHORITY_PATH')

# 应用配置
celery_config = CeleryConfig()

任务定义与实现

基础任务定义

# tasks/basic_tasks.py - 基础任务示例
from celery_app import celery_app
import time
import logging

logger = logging.getLogger(__name__)

@celery_app.task(bind=True, name="tasks.send_email")
def send_email(self, to_email: str, subject: str, body: str, attachments: list = None):
    """
    发送邮件任务
    
    Args:
        to_email: 收件人邮箱
        subject: 邮件主题
        body: 邮件正文
        attachments: 附件列表
    """
    try:
        # 模拟邮件发送过程
        logger.info(f"开始发送邮件到 {to_email}")
        
        # 更新任务状态
        self.update_state(state='PROGRESS', meta={
            'current': 10,
            'total': 100,
            'status': '正在准备邮件内容...'
        })
        
        # 模拟邮件准备时间
        time.sleep(2)
        
        # 更新状态
        self.update_state(state='PROGRESS', meta={
            'current': 50,
            'total': 100,
            'status': '正在发送邮件...'
        })
        
        # 模拟邮件发送时间
        time.sleep(3)
        
        # 模拟邮件发送成功
        logger.info(f"邮件已成功发送到 {to_email}")
        
        return {
            'status': 'sent',
            'to_email': to_email,
            'subject': subject,
            'sent_at': time.time()
        }
        
    except Exception as exc:
        logger.error(f"邮件发送失败: {str(exc)}")
        # 标记任务失败,触发重试
        raise self.retry(exc=exc, countdown=60, max_retries=3)

@celery_app.task(bind=True, name="tasks.process_data")
def process_data(self, data_list: list, operation: str = 'transform'):
    """
    数据处理任务
    
    Args:
        data_list: 待处理数据列表
        operation: 操作类型
    """
    total_items = len(data_list)
    
    for i, item in enumerate(data_list):
        # 更新进度
        self.update_state(state='PROGRESS', meta={
            'current': i + 1,
            'total': total_items,
            'progress': int(((i + 1) / total_items) * 100),
            'status': f'正在处理第 {i + 1} 个项目'
        })
        
        # 模拟数据处理
        time.sleep(0.1)
        
        # 根据操作类型处理数据
        if operation == 'transform':
            processed_item = f"transformed_{item}"
        elif operation == 'validate':
            processed_item = f"validated_{item}"
        else:
            processed_item = item
    
    return {
        'status': 'completed',
        'processed_count': total_items,
        'operation': operation
    }

AI模型推理任务

# tasks/ai_tasks.py - AI模型推理任务
import numpy as np
import pickle
from celery_app import celery_app
from typing import Dict, Any, List
import torch
import logging

logger = logging.getLogger(__name__)

@celery_app.task(bind=True, name="tasks.ai_image_generation")
def generate_image(self, prompt: str, style: str = "realistic", 
                  width: int = 512, height: int = 512, user_id: int = None):
    """
    AI图像生成任务
    
    Args:
        prompt: 图像生成提示词
        style: 图像风格
        width: 图像宽度
        height: 图像高度
        user_id: 用户ID
    """
    try:
        logger.info(f"开始生成图像: {prompt}")
        
        # 更新初始化状态
        self.update_state(state='PROGRESS', meta={
            'progress': 5,
            'status': '正在加载AI模型...',
            'step': 'model_loading'
        })
        
        # 模拟模型加载(实际应用中会加载预训练模型)
        time.sleep(2)
        
        # 更新生成状态
        self.update_state(state='PROGRESS', meta={
            'progress': 20,
            'status': '正在处理提示词...',
            'step': 'prompt_processing'
        })
        
        # 模拟提示词处理
        time.sleep(1)
        
        self.update_state(state='PROGRESS', meta={
            'progress': 40,
            'status': '正在生成图像...',
            'step': 'image_generation'
        })
        
        # 模拟图像生成过程(实际应用中会调用AI模型)
        for i in range(10):  # 模拟生成步骤
            time.sleep(0.5)  # 模拟每个步骤的耗时
            progress = 40 + int((i + 1) * 6)  # 从40%到100%
            
            self.update_state(state='PROGRESS', meta={
                'progress': progress,
                'status': f'正在生成图像... ({i + 1}/10)',
                'step': 'image_generation'
            })
        
        # 模拟生成图像文件路径
        import uuid
        image_filename = f"generated_{uuid.uuid4().hex[:8]}_{int(time.time())}.png"
        image_path = f"/uploads/images/{image_filename}"
        
        # 模拟保存图像
        self.update_state(state='PROGRESS', meta={
            'progress': 95,
            'status': '正在保存图像...',
            'step': 'saving'
        })
        
        time.sleep(1)
        
        # 保存记录到数据库(如果提供用户ID)
        if user_id:
            save_image_record(user_id, image_path, prompt, style)
        
        logger.info(f"图像生成完成: {image_path}")
        
        return {
            'status': 'success',
            'image_url': image_path,
            'prompt': prompt,
            'style': style,
            'dimensions': f"{width}x{height}",
            'generated_at': time.time()
        }
        
    except Exception as exc:
        logger.error(f"图像生成失败: {str(exc)}")
        raise self.retry(exc=exc, countdown=120, max_retries=2)

@celery_app.task(bind=True, name="tasks.ai_text_summarization")
def summarize_text(self, text: str, max_length: int = 100, 
                  model_name: str = "default_summary_model"):
    """
    文本摘要生成任务
    
    Args:
        text: 待摘要的文本
        max_length: 摘要最大长度
        model_name: 使用的模型名称
    """
    try:
        logger.info(f"开始文本摘要生成,原文长度: {len(text)}")
        
        self.update_state(state='PROGRESS', meta={
            'progress': 10,
            'status': '正在分析文本结构...',
            'original_length': len(text)
        })
        
        # 模拟文本分析
        time.sleep(1)
        
        self.update_state(state='PROGRESS', meta={
            'progress': 30,
            'status': '正在生成摘要...',
            'original_length': len(text)
        })
        
        # 模拟摘要生成过程
        # 在实际应用中,这里会调用预训练的摘要模型
        import random
        sentences = text.split('.')
        summary_sentences = random.sample(sentences, min(3, len(sentences)))
        summary = '.'.join(summary_sentences[:max_length//20])  # 简化摘要逻辑
        
        self.update_state(state='PROGRESS', meta={
            'progress': 80,
            'status': '正在优化摘要质量...',
            'original_length': len(text),
            'summary_length': len(summary)
        })
        
        time.sleep(1)
        
        return {
            'status': 'success',
            'original_text_length': len(text),
            'summary': summary,
            'summary_length': len(summary),
            'compression_ratio': round(len(summary) / len(text), 2),
            'model_used': model_name
        }
        
    except Exception as exc:
        logger.error(f"文本摘要生成失败: {str(exc)}")
        raise self.retry(exc=exc, countdown=60, max_retries=3)

@celery_app.task(bind=True, name="tasks.ai_batch_prediction")
def batch_predict(self, data_list: List[Dict], model_name: str = "default_model"):
    """
    批量AI预测任务
    
    Args:
        data_list: 待预测数据列表
        model_name: 使用的模型名称
    """
    total_items = len(data_list)
    predictions = []
    
    for i, data_item in enumerate(data_list):
        # 更新进度
        self.update_state(state='PROGRESS', meta={
            'current': i + 1,
            'total': total_items,
            'progress': int(((i + 1) / total_items) * 100),
            'status': f'正在处理第 {i + 1}/{total_items} 个项目'
        })
        
        try:
            # 模拟AI预测过程
            # 在实际应用中,这里会调用具体的AI模型
            prediction = {
                'input': data_item,
                'prediction': f"predicted_value_{i}",
                'confidence': round(random.uniform(0.7, 0.99), 2),
                'model': model_name,
                'timestamp': time.time()
            }
            predictions.append(prediction)
            
            time.sleep(0.1)  # 模拟预测耗时
            
        except Exception as item_exc:
            logger.warning(f"单项预测失败: {str(item_exc)}, 继续处理其他项目")
            predictions.append({
                'input': data_item,
                'error': str(item_exc),
                'status': 'failed'
            })
    
    success_count = len([p for p in predictions if 'error' not in p])
    
    return {
        'status': 'completed',
        'total_items': total_items,
        'successful_predictions': success_count,
        'failed_predictions': total_items - success_count,
        'predictions': predictions,
        'model_used': model_name
    }

文件处理任务

# tasks/file_processing.py - 文件处理任务
import os
import shutil
from pathlib import Path
from celery_app import celery_app
import zipfile
import tempfile
from typing import List, Dict, Any

@celery_app.task(bind=True, name="tasks.process_uploaded_files")
def process_uploaded_files(self, file_paths: List[str], operations: List[str]):
    """
    批量文件处理任务
    
    Args:
        file_paths: 文件路径列表
        operations: 要执行的操作列表
    """
    total_files = len(file_paths)
    results = []
    
    for i, file_path in enumerate(file_paths):
        # 更新进度
        self.update_state(state='PROGRESS', meta={
            'current': i + 1,
            'total': total_files,
            'progress': int(((i + 1) / total_files) * 100),
            'current_file': os.path.basename(file_path),
            'status': f'正在处理文件 {i + 1}/{total_files}'
        })
        
        file_result = {
            'file_path': file_path,
            'original_size': os.path.getsize(file_path) if os.path.exists(file_path) else 0,
            'operations': [],
            'errors': []
        }
        
        try:
            for operation in operations:
                if operation == 'compress':
                    result = compress_file(file_path)
                elif operation == 'resize_image':
                    result = resize_image(file_path)
                elif operation == 'convert_format':
                    result = convert_file_format(file_path)
                elif operation == 'extract_metadata':
                    result = extract_file_metadata(file_path)
                else:
                    result = {'error': f'Unknown operation: {operation}'}
                
                file_result['operations'].append({
                    'operation': operation,
                    'result': result
                })
                
                # 检查操作是否出错
                if 'error' in result:
                    file_result['errors'].append(result['error'])
        
        except Exception as exc:
            file_result['errors'].append(str(exc))
        
        results.append(file_result)
    
    return {
        'status': 'completed',
        'total_files': total_files,
        'processed_files': len([r for r in results if not r['errors']]),
        'results': results
    }

def compress_file(file_path: str) -> Dict[str, Any]:
    """压缩文件"""
    try:
        original_size = os.path.getsize(file_path)
        compressed_path = f"{file_path}.zip"
        
        with zipfile.ZipFile(compressed_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
            zipf.write(file_path, os.path.basename(file_path))
        
        compressed_size = os.path.getsize(compressed_path)
        compression_ratio = round((original_size - compressed_size) / original_size * 100, 2)
        
        return {
            'compressed_path': compressed_path,
            'original_size': original_size,
            'compressed_size': compressed_size,
            'compression_ratio': compression_ratio
        }
    except Exception as e:
        return {'error': str(e)}

def resize_image(file_path: str) -> Dict[str, Any]:
    """调整图像大小(需要Pillow库)"""
    try:
        from PIL import Image
        
        if not file_path.lower().endswith(('.png', '.jpg', '.jpeg', '.gif', '.bmp')):
            return {'error': 'Not an image file'}
        
        with Image.open(file_path) as img:
            original_size = img.size
            # 调整到512x512(保持宽高比)
            img.thumbnail((512, 512), Image.Resampling.LANCZOS)
            resized_path = f"{file_path}_resized.jpg"
            img.save(resized_path, 'JPEG', quality=85)
            
        return {
            'resized_path': resized_path,
            'original_size': original_size,
            'resized_size': img.size
        }
    except ImportError:
        return {'error': 'Pillow library not installed'}
    except Exception as e:
        return {'error': str(e)}

def convert_file_format(file_path: str) -> Dict[str, Any]:
    """转换文件格式(示例)"""
    try:
        # 这里只是一个示例,实际转换需要具体实现
        return {
            'converted': True,
            'original_format': os.path.splitext(file_path)[1],
            'new_path': file_path  # 实际应用中会创建新文件
        }
    except Exception as e:
        return {'error': str(e)}

def extract_file_metadata(file_path: str) -> Dict[str, Any]:
    """提取文件元数据"""
    try:
        stat_info = os.stat(file_path)
        return {
            'size': stat_info.st_size,
            'created': stat_info.st_ctime,
            'modified': stat_info.st_mtime,
            'permissions': oct(stat_info.st_mode)[-3:]
        }
    except Exception as e:
        return {'error': str(e)}

FastAPI集成方案

FastAPI应用集成

# main.py - FastAPI应用与Celery集成
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.responses import JSONResponse
from celery.result import AsyncResult
from pydantic import BaseModel
from typing import List, Optional
import asyncio
from tasks.ai_tasks import generate_image, summarize_text, batch_predict
from tasks.email_tasks import send_email
from tasks.file_processing import process_uploaded_files
import logging

app = FastAPI(title="Daoman Celery Integration API")

logger = logging.getLogger(__name__)

class ImageGenerationRequest(BaseModel):
    """图像生成请求模型"""
    prompt: str
    style: str = "realistic"
    width: int = 512
    height: int = 512
    user_id: Optional[int] = None

class EmailRequest(BaseModel):
    """邮件发送请求模型"""
    to_email: str
    subject: str
    body: str
    attachments: Optional[List[str]] = []

class BatchPredictionRequest(BaseModel):
    """批量预测请求模型"""
    data_list: List[dict]
    model_name: str = "default_model"

class FileProcessingRequest(BaseModel):
    """文件处理请求模型"""
    file_paths: List[str]
    operations: List[str]

@app.post("/tasks/image-generation")
async def create_image_generation_task(request: ImageGenerationRequest):
    """创建图像生成任务"""
    try:
        task = generate_image.delay(
            prompt=request.prompt,
            style=request.style,
            width=request.width,
            height=request.height,
            user_id=request.user_id
        )
        
        logger.info(f"图像生成任务已创建: {task.id}")
        
        return {
            "task_id": task.id,
            "status": "submitted",
            "message": "图像生成任务已提交,正在后台处理",
            "estimated_completion": "30-60秒"
        }
    except Exception as e:
        logger.error(f"创建图像生成任务失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"任务创建失败: {str(e)}")

@app.post("/tasks/email-send")
async def create_email_task(request: EmailRequest):
    """创建邮件发送任务"""
    try:
        task = send_email.delay(
            to_email=request.to_email,
            subject=request.subject,
            body=request.body,
            attachments=request.attachments
        )
        
        logger.info(f"邮件发送任务已创建: {task.id}")
        
        return {
            "task_id": task.id,
            "status": "submitted",
            "message": "邮件发送任务已提交,正在后台处理"
        }
    except Exception as e:
        logger.error(f"创建邮件发送任务失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"任务创建失败: {str(e)}")

@app.post("/tasks/text-summarization")
async def create_text_summarization_task(text: str, max_length: int = 100):
    """创建文本摘要任务"""
    try:
        task = summarize_text.delay(text=text, max_length=max_length)
        
        logger.info(f"文本摘要任务已创建: {task.id}")
        
        return {
            "task_id": task.id,
            "status": "submitted",
            "message": "文本摘要任务已提交,正在后台处理"
        }
    except Exception as e:
        logger.error(f"创建文本摘要任务失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"任务创建失败: {str(e)}")

@app.post("/tasks/batch-predict")
async def create_batch_prediction_task(request: BatchPredictionRequest):
    """创建批量预测任务"""
    try:
        task = batch_predict.delay(
            data_list=request.data_list,
            model_name=request.model_name
        )
        
        logger.info(f"批量预测任务已创建: {task.id}, 数据量: {len(request.data_list)}")
        
        return {
            "task_id": task.id,
            "status": "submitted",
            "message": f"批量预测任务已提交,共{len(request.data_list)}条数据",
            "estimated_completion": f"{len(request.data_list) * 2}秒左右"
        }
    except Exception as e:
        logger.error(f"创建批量预测任务失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"任务创建失败: {str(e)}")

@app.post("/tasks/file-process")
async def create_file_processing_task(request: FileProcessingRequest):
    """创建文件处理任务"""
    try:
        task = process_uploaded_files.delay(
            file_paths=request.file_paths,
            operations=request.operations
        )
        
        logger.info(f"文件处理任务已创建: {task.id}, 文件数: {len(request.file_paths)}")
        
        return {
            "task_id": task.id,
            "status": "submitted",
            "message": f"文件处理任务已提交,共{len(request.file_paths)}个文件",
            "operations": request.operations
        }
    except Exception as e:
        logger.error(f"创建文件处理任务失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"任务创建失败: {str(e)}")

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    """查询任务状态"""
    try:
        task_result = AsyncResult(task_id, app=celery_app)
        
        response = {
            "task_id": task_id,
            "status": task_result.state,
            "result": None,
            "traceback": None
        }
        
        if task_result.state == 'PENDING':
            response["status"] = "pending"
            response["message"] = "任务尚未开始执行"
        elif task_result.state == 'PROGRESS':
            response["status"] = "in_progress"
            response["info"] = task_result.info
            response["message"] = "任务正在执行中"
        elif task_result.state == 'SUCCESS':
            response["status"] = "completed"
            response["result"] = task_result.result
            response["message"] = "任务执行成功"
        elif task_result.state == 'FAILURE':
            response["status"] = "failed"
            response["error"] = str(task_result.info)
            response["traceback"] = task_result.traceback
            response["message"] = "任务执行失败"
        elif task_result.state == 'RETRY':
            response["status"] = "retrying"
            response["info"] = task_result.info
            response["message"] = "任务正在重试"
        elif task_result.state == 'REVOKED':
            response["status"] = "cancelled"
            response["message"] = "任务已被取消"
        
        return response
    except Exception as e:
        logger.error(f"查询任务状态失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"查询任务状态失败: {str(e)}")

@app.delete("/tasks/{task_id}/cancel")
async def cancel_task(task_id: str):
    """取消任务"""
    try:
        # revoke方法可以取消待执行的任务
        celery_app.control.revoke(task_id, terminate=True)
        
        logger.info(f"任务已取消: {task_id}")
        
        return {
            "task_id": task_id,
            "status": "cancelled",
            "message": "任务已成功取消"
        }
    except Exception as e:
        logger.error(f"取消任务失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"取消任务失败: {str(e)}")

@app.get("/tasks/{task_id}/result")
async def get_task_result(task_id: str):
    """获取任务结果(仅在完成时返回)"""
    try:
        task_result = AsyncResult(task_id, app=celery_app)
        
        if task_result.ready():
            if task_result.successful():
                return {
                    "task_id": task_id,
                    "status": "completed",
                    "result": task_result.result
                }
            else:
                return {
                    "task_id": task_id,
                    "status": "failed",
                    "error": str(task_result.info)
                }
        else:
            raise HTTPException(
                status_code=400,
                detail="任务尚未完成,请先检查任务状态"
            )
    except Exception as e:
        logger.error(f"获取任务结果失败: {str(e)}")
        raise HTTPException(status_code=500, detail=f"获取任务结果失败: {str(e)}")

@app.post("/tasks/batch-submit")
async def batch_submit_tasks(tasks_data: List[dict]):
    """批量提交任务"""
    submitted_tasks = []
    
    for task_data in tasks_data:
        try:
            task_type = task_data.get("type")
            task_params = task_data.get("params", {})
            
            if task_type == "image_generation":
                task = generate_image.delay(**task_params)
            elif task_type == "email_send":
                task = send_email.delay(**task_params)
            elif task_type == "text_summarization":
                task = summarize_text.delay(**task_params)
            elif task_type == "batch_predict":
                task = batch_predict.delay(**task_params)
            elif task_type == "file_process":
                task = process_uploaded_files.delay(**task_params)
            else:
                continue
            
            submitted_tasks.append({
                "task_id": task.id,
                "type": task_type,
                "status": "submitted"
            })
        except Exception as e:
            logger.error(f"批量提交任务失败: {str(e)}")
            submitted_tasks.append({
                "type": task_data.get("type"),
                "status": "failed",
                "error": str(e)
            })
    
    return {
        "status": "partial_success",
        "submitted_count": len(submitted_tasks),
        "tasks": submitted_tasks
    }

@app.get("/tasks/overview")
async def get_tasks_overview():
    """获取任务概览"""
    # 获取活跃任务
    inspect = celery_app.control.inspect()
    
    active_tasks = inspect.active()
    scheduled_tasks = inspect.scheduled()
    reserved_tasks = inspect.reserved()
    
    return {
        "overview": {
            "active_workers": len(active_tasks) if active_tasks else 0,
            "scheduled_tasks": len(list(scheduled_tasks.values())) if scheduled_tasks else 0,
            "reserved_tasks": len(list(reserved_tasks.values())) if reserved_tasks else 0,
        },
        "active_tasks": active_tasks,
        "scheduled_tasks": scheduled_tasks,
        "reserved_tasks": reserved_tasks
    }

任务状态轮询优化

# utils/task_polling.py - 任务状态轮询优化
import asyncio
import time
from typing import List, Dict, Any
from fastapi import WebSocket, WebSocketDisconnect
from celery.result import AsyncResult
from celery_app import celery_app

class TaskPollingManager:
    """任务轮询管理器"""
    
    def __init__(self):
        self.active_connections: Dict[str, WebSocket] = {}
        self.polling_tasks: Dict[str, asyncio.Task] = {}
    
    async def add_connection(self, task_id: str, websocket: WebSocket):
        """添加WebSocket连接"""
        await websocket.accept()
        self.active_connections[task_id] = websocket
        
        # 启动轮询任务
        polling_task = asyncio.create_task(self.poll_task_status(task_id))
        self.polling_tasks[task_id] = polling_task
    
    async def remove_connection(self, task_id: str):
        """移除连接"""
        if task_id in self.active_connections:
            await self.active_connections[task_id].close()
            del self.active_connections[task_id]
        
        if task_id in self.polling_tasks:
            self.polling_tasks[task_id].cancel()
            del self.polling_tasks[task_id]
    
    async def poll_task_status(self, task_id: str):
        """轮询任务状态并通过WebSocket推送"""
        try:
            while task_id in self.active_connections:
                task_result = AsyncResult(task_id, app=celery_app)
                
                status_data = {
                    "task_id": task_id,
                    "status": task_result.state,
                    "timestamp": time.time()
                }
                
                if task_result.state == 'PROGRESS':
                    status_data["info"] = task_result.info
                elif task_result.state == 'SUCCESS':
                    status_data["result"] = task_result.result
                    # 任务完成后移除连接
                    await self.remove_connection(task_id)
                    break
                elif task_result.state == 'FAILURE':
                    status_data["error"] = str(task_result.info)
                    await self.remove_connection(task_id)
                    break
                
                # 发送状态更新
                try:
                    await self.active_connections[task_id].send_json(status_data)
                except WebSocketDisconnect:
                    await self.remove_connection(task_id)
                    break
                
                # 等待下次轮询
                await asyncio.sleep(1)
                
        except asyncio.CancelledError:
            # 任务被取消
            pass
        except Exception as e:
            print(f"轮询任务状态出错: {str(e)}")
            await self.remove_connection(task_id)

# 全局实例
task_polling_manager = TaskPollingManager()

@app.websocket("/ws/task-status/{task_id}")
async def websocket_task_status(websocket: WebSocket, task_id: str):
    """WebSocket任务状态推送"""
    await task_polling_manager.add_connection(task_id, websocket)
    
    try:
        while True:
            # 保持连接
            data = await websocket.receive_text()
            # 可以处理来自客户端的消息
    except WebSocketDisconnect:
        await task_polling_manager.remove_connection(task_id)

任务调度与定时任务

Celery Beat定时任务配置

# celery_beat_config.py - Celery Beat定时任务配置
from celery.schedules import crontab
from celery_app import celery_app

# 定时任务配置
celery_app.conf.beat_schedule = {
    # 每日凌晨3点清理临时文件
    "cleanup-temporary-files": {
        "task": "tasks.maintenance.cleanup_temporary_files",
        "schedule": crontab(hour=3, minute=0),
        "options": {
            "expires": 300,  # 任务过期时间5分钟
            "queue": "maintenance"
        }
    },
    
    # 每周一上午9点发送周报
    "send-weekly-report": {
        "task": "tasks.reports.send_weekly_report",
        "schedule": crontab(hour=9, minute=0, day_of_week=1),
        "options": {
            "queue": "reports"
        }
    },
    
    # 每小时检查任务队列状态
    "check-task-queue-status": {
        "task": "tasks.monitoring.check_queue_status",
        "schedule": crontab(minute=0),  # 每小时执行
        "options": {
            "queue": "monitoring"
        }
    },
    
    # 每天下午2点备份数据
    "backup-daily-data": {
        "task": "tasks.backup.perform_daily_backup",
        "schedule": crontab(hour=14, minute=0),
        "options": {
            "queue": "backup",
            "expires": 3600  # 1小时过期
        }
    },
    
    # 每15分钟清理过期任务结果
    "cleanup-expired-results": {
        "task": "tasks.cleanup.cleanup_expired_results",
        "schedule": crontab(minute="*/15"),  # 每15分钟执行
        "options": {
            "queue": "cleanup"
        }
    },
    
    # 工作日早上8点发送提醒邮件
    "morning-reminders": {
        "task": "tasks.notifications.send_morning_reminders",
        "schedule": crontab(hour=8, minute=0, day_of_week="1-5"),  # 周一到周五
        "options": {
            "queue": "notifications"
        }
    },
    
    # 每月第一天生成月度统计
    "monthly-statistics": {
        "task": "tasks.analytics.generate_monthly_statistics",
        "schedule": crontab(hour=0, minute=0, day_of_month=1),  # 每月1号
        "options": {
            "queue": "analytics"
        }
    }
}

# 时区配置
celery_app.conf.timezone = 'Asia/Shanghai'
celery_app.conf.enable_utc = False

动态任务调度

# tasks/dynamic_scheduler.py - 动态任务调度
from celery_app import celery_app
from celery.schedules import crontab
from datetime import datetime, timedelta
import json

@celery_app.task(bind=True, name="tasks.dynamic_schedule.create_scheduled_task")
def create_scheduled_task(self, task_config: dict):
    """
    创建动态调度任务
    
    Args:
        task_config: 任务配置,包含任务类型、参数、调度时间等
    """
    try:
        task_type = task_config.get("task_type")
        task_params = task_config.get("params", {})
        schedule_time = task_config.get("schedule_time")  # ISO格式时间字符串
        recurring = task_config.get("recurring", False)
        cron_expression = task_config.get("cron_expression")
        
        if recurring and cron_expression:
            # 解析cron表达式并添加到beat_schedule
            schedule = parse_cron_expression(cron_expression)
            task_name = f"scheduled_{task_type}_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
            
            celery_app.conf.beat_schedule[task_name] = {
                "task": f"tasks.{task_type}",
                "schedule": schedule,
                "args": [task_params],
                "options": {
                    "queue": "scheduled_tasks"
                }
            }
            
            return {
                "status": "success",
                "message": "周期性任务已创建",
                "task_name": task_name,
                "cron_expression": cron_expression
            }
        else:
            # 单次执行任务,使用eta参数
            if schedule_time:
                eta_time = datetime.fromisoformat(schedule_time.replace('Z', '+00:00'))
                task = celery_app.send_task(
                    f"tasks.{task_type}",
                    args=[task_params],
                    eta=eta_time,
                    queue="scheduled_tasks"
                )
                
                return {
                    "status": "success",
                    "message": "单次调度任务已创建",
                    "task_id": task.id,
                    "scheduled_time": schedule_time
                }
            else:
                # 立即执行
                task = celery_app.send_task(
                    f"tasks.{task_type}",
                    args=[task_params],
                    queue="scheduled_tasks"
                )
                
                return {
                    "status": "success",
                    "message": "即时任务已创建",
                    "task_id": task.id
                }
                
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60, max_retries=3)

def parse_cron_expression(cron_expr: str):
    """解析cron表达式"""
    parts = cron_expr.split()
    if len(parts) != 5:
        raise ValueError("Invalid cron expression format")
    
    minute, hour, day, month, weekday = parts
    
    return crontab(
        minute=minute,
        hour=hour,
        day_of_week=weekday,
        day_of_month=day,
        month_of_year=month
    )

@celery_app.task(bind=True, name="tasks.dynamic_schedule.list_scheduled_tasks")
def list_scheduled_tasks(self):
    """列出所有调度任务"""
    scheduled_tasks = []
    
    for task_name, task_config in celery_app.conf.beat_schedule.items():
        scheduled_tasks.append({
            "name": task_name,
            "task": task_config.get("task", ""),
            "schedule": str(task_config.get("schedule", "")),
            "args": task_config.get("args", []),
            "kwargs": task_config.get("kwargs", {}),
            "options": task_config.get("options", {})
        })
    
    return {
        "status": "success",
        "scheduled_tasks": scheduled_tasks,
        "total_count": len(scheduled_tasks)
    }

@celery_app.task(bind=True, name="tasks.dynamic_schedule.remove_scheduled_task")
def remove_scheduled_task(self, task_name: str):
    """移除调度任务"""
    if task_name in celery_app.conf.beat_schedule:
        del celery_app.conf.beat_schedule[task_name]
        
        # 这里可能需要重启beat服务才能生效
        # 或者使用信号机制通知beat重新加载配置
        
        return {
            "status": "success",
            "message": f"调度任务 {task_name} 已移除",
            "removed_task": task_name
        }
    else:
        return {
            "status": "error",
            "message": f"调度任务 {task_name} 不存在",
            "available_tasks": list(celery_app.conf.beat_schedule.keys())
        }

错误处理与重试机制

全面的错误处理策略

# tasks/error_handling.py - 错误处理策略
from celery import Task
from celery.exceptions import Retry
import logging
from typing import Any, Dict
import traceback

logger = logging.getLogger(__name__)

class RobustTask(Task):
    """健壮的任务基类"""
    
    def on_success(self, retval, task_id, args, kwargs):
        """任务成功时的回调"""
        logger.info(f"Task {task_id} completed successfully")
        logger.info(f"Return value: {retval}")
        
        # 可以在这里添加成功后的处理逻辑
        # 例如:发送通知、更新数据库状态、清理临时资源等
    
    def on_failure(self, exc, task_id, args, kwargs, einfo):
        """任务失败时的回调"""
        logger.error(f"Task {task_id} failed: {str(exc)}")
        logger.error(f"Traceback: {einfo}")
        
        # 根据异常类型进行不同处理
        if isinstance(exc, (ConnectionError, TimeoutError)):
            logger.info(f"Network-related error, may retry: {task_id}")
        elif isinstance(exc, FileNotFoundError):
            logger.error(f"File not found error, not retrying: {task_id}")
            # 对于文件不存在等永久性错误,可以选择不重试
            return
        
        # 记录失败任务到监控系统
        self.record_failure(task_id, exc, args, kwargs)
    
    def record_failure(self, task_id: str, exc: Exception, args: tuple, kwargs: dict):
        """记录失败任务"""
        try:
            # 这里可以将失败的任务信息存储到数据库或监控系统
            failure_record = {
                "task_id": task_id,
                "exception": str(exc),
                "args": str(args),
                "kwargs": str(kwargs),
                "timestamp": time.time(),
                "traceback": traceback.format_exc()
            }
            
            # 存储到数据库或消息队列
            # save_failure_record(failure_record)
            
        except Exception as e:
            logger.error(f"Failed to record failure: {str(e)}")

@celery_app.task(base=RobustTask, bind=True, name="tasks.robust_example_task")
def robust_example_task(self, data: Any):
    """示例健壮任务"""
    try:
        # 模拟可能出错的操作
        if not data:
            raise ValueError("Data cannot be empty")
        
        # 模拟处理过程
        result = process_data_safely(data)
        
        return {
            "status": "success",
            "processed_data": result,
            "original_input": data
        }
        
    except (ValueError, TypeError) as e:
        # 对于参数错误等,不重试
        logger.error(f"Parameter error in task {self.request.id}: {str(e)}")
        raise
        
    except ConnectionError as e:
        # 对于网络错误,重试
        logger.warning(f"Network error in task {self.request.id}, retrying: {str(e)}")
        raise self.retry(exc=e, countdown=60, max_retries=3)
        
    except Exception as e:
        # 其他错误,记录并重试
        logger.error(f"Unexpected error in task {self.request.id}: {str(e)}")
        raise self.retry(exc=e, countdown=120, max_retries=2)

def process_data_safely(data: Any) -> Any:
    """安全的数据处理函数"""
    # 实现安全的数据处理逻辑
    # 包括输入验证、边界检查、资源管理等
    return data

# 重试策略配置
@celery_app.task(
    bind=True,
    autoretry_for=(Exception,),
    retry_kwargs={'max_retries': 5},
    retry_backoff=True,
    retry_backoff_max=700,
    retry_jitter=True,
    name="tasks.auto_retry_task"
)
def auto_retry_task(self, data: Any):
    """
    自动重试任务
    - 对所有异常自动重试
    - 指数退避策略
    - 随机抖动避免雪崩
    """
    # 模拟可能失败的操作
    import random
    if random.random() < 0.3:  # 30%概率失败
        raise ConnectionError("Random connection error for testing retry")
    
    return {"status": "success", "data": data, "attempts": self.request.retries + 1}

# 自定义重试逻辑
@celery_app.task(bind=True, name="tasks.custom_retry_task")
def custom_retry_task(self, data: Any, max_retries: int = 3):
    """自定义重试逻辑的任务"""
    try:
        # 模拟业务逻辑
        result = perform_business_operation(data)
        
        return {
            "status": "success",
            "result": result,
            "attempts": self.request.retries + 1
        }
        
    except TemporaryError as e:
        # 临时错误,重试
        if self.request.retries < max_retries:
            countdown = min(2 ** self.request.retries * 60, 300)  # 最大5分钟
            raise self.retry(exc=e, countdown=countdown, max_retries=max_retries)
        else:
            logger.error(f"Max retries exceeded for task {self.request.id}")
            raise
            
    except PermanentError as e:
        # 永久错误,不重试
        logger.error(f"Permanent error in task {self.request.id}: {str(e)}")
        raise
        
    except Exception as e:
        # 其他错误,根据类型决定是否重试
        if should_retry_error(e):
            if self.request.retries < max_retries:
                raise self.retry(exc=e, countdown=60, max_retries=max_retries)
            else:
                raise
        else:
            raise

class TemporaryError(Exception):
    """临时错误,可以重试"""
    pass

class PermanentError(Exception):
    """永久错误,不应重试"""
    pass

def perform_business_operation(data: Any) -> Any:
    """执行业务操作"""
    # 模拟业务逻辑
    return f"processed_{data}"

def should_retry_error(exception: Exception) -> bool:
    """判断是否应该重试错误"""
    retryable_errors = [
        ConnectionError,
        TimeoutError,
        ConnectionResetError,
        BrokenPipeError,
    ]
    
    return isinstance(exception, tuple(retryable_errors))

死信队列处理

# tasks/dead_letter_queue.py - 死信队列处理
from celery import group
from celery.signals import task_failure
import logging

logger = logging.getLogger(__name__)

@task_failure.connect
def task_failed_handler(sender=None, task_id=None, exception=None, traceback=None, einfo=None, **kwargs):
    """任务失败信号处理器"""
    logger.error(f"Task {task_id} failed: {str(exception)}")
    
    # 将失败任务信息发送到死信队列进行后续处理
    dead_letter_task.delay(
        task_id=task_id,
        exception=str(exception),
        traceback=str(traceback),
        original_task=sender.name if sender else None
    )

@celery_app.task(name="tasks.dead_letter_queue.handle_dead_letter")
def dead_letter_task(task_id: str, exception: str, traceback: str, original_task: str = None):
    """死信队列处理任务"""
    try:
        logger.warning(f"Processing dead letter task: {task_id}")
        
        # 记录到死信队列数据库
        record_dead_letter({
            "task_id": task_id,
            "exception": exception,
            "traceback": traceback,
            "original_task": original_task,
            "timestamp": time.time(),
            "attempts": get_task_attempt_count(task_id)
        })
        
        # 根据错误类型决定后续处理
        if "permanent" in exception.lower():
            # 永久错误,标记为已处理
            mark_as_handled(task_id, "permanent_error")
        else:
            # 临时错误,可以人工介入处理
            notify_admin_for_intervention(task_id, exception)
        
        return {
            "status": "dead_letter_processed",
            "task_id": task_id,
            "handled": True
        }
        
    except Exception as e:
        logger.error(f"Error processing dead letter task {task_id}: {str(e)}")
        raise

def record_dead_letter(dead_letter_info: dict):
    """记录死信队列信息"""
    # 实现记录到数据库或监控系统的逻辑
    pass

def get_task_attempt_count(task_id: str) -> int:
    """获取任务尝试次数"""
    # 实现获取尝试次数的逻辑
    return 0

def mark_as_handled(task_id: str, reason: str):
    """标记任务为已处理"""
    # 实现标记逻辑
    pass

def notify_admin_for_intervention(task_id: str, error: str):
    """通知管理员介入处理"""
    # 实现通知逻辑,如发送邮件、短信等
    pass

性能优化策略

Worker性能优化

# config/worker_optimization.py - Worker性能优化配置
from celery import Celery
import multiprocessing

def configure_optimized_celery():
    """配置优化的Celery实例"""
    celery = Celery('optimized_daoman')
    
    # 基础配置
    celery.conf.update(
        broker_url='redis://localhost:6379/0',
        result_backend='redis://localhost:6379/1',
        
        # 序列化优化
        task_serializer='msgpack',  # 比JSON更快
        result_serializer='msgpack',
        accept_content=['msgpack', 'json'],
        
        # 性能相关配置
        worker_prefetch_multiplier=1,  # 防止任务积压
        worker_max_tasks_per_child=1000,  # 防止内存泄漏
        worker_disable_rate_limits=True,  # 禁用速率限制以提高性能
        
        # 任务确认机制
        task_acks_late=True,
        task_reject_on_worker_lost=True,
        
        # 结果存储优化
        result_expires=3600,
        result_compression='zlib',  # 使用zlib压缩
        
        # 事件监控(生产环境可关闭)
        worker_send_task_events=False,  # 生产环境建议关闭
        task_send_sent_event=False,    # 生产环境建议关闭
        
        # 连接池优化
        broker_pool_limit=100,
        broker_connection_retry_on_startup=True,
        
        # 任务路由优化
        task_routes={
            'tasks.ai_tasks.*': {
                'queue': 'ai_tasks',
                'routing_key': 'ai.priority'
            },
            'tasks.email_tasks.*': {
                'queue': 'email_tasks',
                'routing_key': 'email.normal'
            },
        },
        
        # 队列配置
        task_default_queue='default',
        task_default_exchange='default',
        task_default_routing_key='default',
    )
    
    return celery

# 高性能任务装饰器
def high_performance_task(func):
    """高性能任务装饰器"""
    def wrapper(*args, **kwargs):
        # 设置任务的高性能选项
        kwargs.setdefault('serializer', 'msgpack')
        kwargs.setdefault('compression', 'zlib')
        kwargs.setdefault('delivery_mode', 2)  # 持久化
        return func(*args, **kwargs)
    return wrapper

@celery_app.task(
    bind=True,
    serializer='msgpack',
    compression='zlib',
    delivery_mode=2,
    name="tasks.high_performance_ai_processing"
)
def high_performance_ai_processing(self, data_batch: list):
    """高性能AI批处理任务"""
    import numpy as np
    
    try:
        # 使用numpy进行批量处理以提高性能
        processed_batch = []
        
        for i, item in enumerate(data_batch):
            # 更新进度(批量更新以减少开销)
            if i % 10 == 0:  # 每处理10个项目更新一次
                self.update_state(state='PROGRESS', meta={
                    'current': i,
                    'total': len(data_batch),
                    'progress': int((i / len(data_batch)) * 100),
                    'status': f'Processing batch... ({i}/{len(data_batch)})'
                })
            
            # 高效的数据处理
            processed_item = efficient_process_item(item)
            processed_batch.append(processed_item)
        
        return {
            'status': 'completed',
            'processed_count': len(processed_batch),
            'batch_size': len(data_batch),
            'results': processed_batch
        }
        
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60, max_retries=2)

def efficient_process_item(item):
    """高效处理单个项目"""
    # 实现高效的处理逻辑
    # 使用向量化操作、缓存等技术
    return item

# 内存优化任务
@celery_app.task(bind=True, name="tasks.memory_efficient_processing")
def memory_efficient_processing(self, large_dataset: list):
    """内存优化处理任务"""
    processed_results = []
    batch_size = 100  # 批处理大小
    
    for i in range(0, len(large_dataset), batch_size):
        batch = large_dataset[i:i + batch_size]
        
        # 处理批次
        batch_results = process_batch(batch)
        processed_results.extend(batch_results)
        
        # 显式释放内存
        del batch, batch_results
        
        # 更新进度
        progress = min(i + batch_size, len(large_dataset))
        self.update_state(state='PROGRESS', meta={
            'current': progress,
            'total': len(large_dataset),
            'progress': int((progress / len(large_dataset)) * 100),
            'status': f'Memory-efficient processing... ({progress}/{len(large_dataset)})'
        })
    
    return {
        'status': 'completed',
        'processed_count': len(processed_results),
        'memory_efficiency': 'optimized'
    }

def process_batch(batch: list):
    """处理数据批次"""
    # 实现批次处理逻辑
    return [item for item in batch]  # 示例处理

任务分片处理

# tasks/sharding.py - 任务分片处理
from celery import group, chord
from typing import List, Any

@celery_app.task(bind=True, name="tasks.sharded_processing.master")
def sharded_processing_master(self, large_dataset: List[Any], shard_count: int = 4):
    """分片处理主任务"""
    try:
        # 将大数据集分片
        shards = split_into_shards(large_dataset, shard_count)
        
        # 创建分片处理任务组
        processing_tasks = [
            sharded_processing_worker.s(shard_data, shard_idx)
            for shard_idx, shard_data in enumerate(shards)
        ]
        
        # 使用group并行执行所有分片任务
        job = group(processing_tasks)
        result = job.apply_async()
        
        # 等待所有分片完成并合并结果
        shard_results = result.get(propagate=True)
        
        # 合并结果
        final_result = merge_shard_results(shard_results)
        
        return {
            'status': 'completed',
            'shard_count': shard_count,
            'total_items': len(large_dataset),
            'shard_results': shard_results,
            'final_result': final_result
        }
        
    except Exception as exc:
        raise self.retry(exc=exc, countdown=120, max_retries=2)

@celery_app.task(bind=True, name="tasks.sharded_processing.worker")
def sharded_processing_worker(self, shard_data: List[Any], shard_idx: int):
    """分片处理工作者任务"""
    try:
        processed_data = []
        
        for i, item in enumerate(shard_data):
            # 更新分片进度
            self.update_state(state='PROGRESS', meta={
                'shard_idx': shard_idx,
                'current': i + 1,
                'total': len(shard_data),
                'progress': int(((i + 1) / len(shard_data)) * 100),
                'status': f'Shard {shard_idx} processing... ({i + 1}/{len(shard_data)})'
            })
            
            processed_item = process_single_item(item)
            processed_data.append(processed_item)
        
        return {
            'shard_idx': shard_idx,
            'processed_count': len(processed_data),
            'data': processed_data
        }
        
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60, max_retries=3)

def split_into_shards(dataset: List[Any], shard_count: int) -> List[List[Any]]:
    """将数据集分片"""
    shard_size = len(dataset) // shard_count
    shards = []
    
    for i in range(shard_count):
        start_idx = i * shard_size
        if i == shard_count - 1:  # 最后一片包含余数
            end_idx = len(dataset)
        else:
            end_idx = start_idx + shard_size
        
        shards.append(dataset[start_idx:end_idx])
    
    return shards

def merge_shard_results(shard_results: List[dict]) -> dict:
    """合并分片结果"""
    merged_data = []
    total_processed = 0
    
    for result in shard_results:
        merged_data.extend(result.get('data', []))
        total_processed += result.get('processed_count', 0)
    
    return {
        'merged_data': merged_data,
        'total_processed': total_processed,
        'shard_count': len(shard_results)
    }

def process_single_item(item: Any) -> Any:
    """处理单个项目"""
    # 实现具体的处理逻辑
    return item

# MapReduce模式任务
@celery_app.task(bind=True, name="tasks.map_reduce.execute")
def map_reduce_execute(self, dataset: List[Any], map_func: str, reduce_func: str):
    """MapReduce模式执行任务"""
    try:
        # Map阶段:并行处理数据
        mapped_results = []
        batch_size = 100
        
        for i in range(0, len(dataset), batch_size):
            batch = dataset[i:i + batch_size]
            
            # 更新进度
            self.update_state(state='PROGRESS', meta={
                'phase': 'mapping',
                'current_batch': i // batch_size + 1,
                'total_batches': (len(dataset) + batch_size - 1) // batch_size,
                'status': 'Executing Map phase...'
            })
            
            # 处理批次
            batch_mapped = execute_map_function(batch, map_func)
            mapped_results.extend(batch_mapped)
        
        # Reduce阶段:聚合结果
        final_result = execute_reduce_function(mapped_results, reduce_func)
        
        return {
            'status': 'completed',
            'phase': 'completed',
            'map_results_count': len(mapped_results),
            'final_result': final_result
        }
        
    except Exception as exc:
        raise self.retry(exc=exc, countdown=120, max_retries=2)

def execute_map_function(data_batch: List[Any], func_name: str) -> List[Any]:
    """执行映射函数"""
    # 根据函数名执行相应的映射操作
    if func_name == 'word_count':
        return word_count_mapper(data_batch)
    elif func_name == 'data_filter':
        return data_filter_mapper(data_batch)
    else:
        return data_batch  # 默认返回原数据

def execute_reduce_function(mapped_data: List[Any], func_name: str) -> Any:
    """执行归约函数"""
    # 根据函数名执行相应的归约操作
    if func_name == 'sum':
        return sum(mapped_data)
    elif func_name == 'count':
        return len(mapped_data)
    else:
        return mapped_data

def word_count_mapper(data_batch: List[str]) -> List[tuple]:
    """词频统计映射器"""
    word_counts = []
    for text in data_batch:
        words = text.lower().split()
        for word in words:
            word_counts.append((word, 1))
    return word_counts

def data_filter_mapper(data_batch: List[dict]) -> List[dict]:
    """数据过滤映射器"""
    # 示例:过滤出数值大于10的项目
    return [item for item in data_batch if item.get('value', 0) > 10]

监控与管理

自定义监控任务

# tasks/monitoring.py - 监控任务
import psutil
import time
from celery.signals import task_prerun, task_postrun, task_revoked
from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@celery_app.task(bind=True, name="tasks.monitoring.system_monitor")
def system_monitor(self):
    """系统监控任务"""
    try:
        # 收集系统指标
        metrics = {
            "timestamp": time.time(),
            "cpu_percent": psutil.cpu_percent(interval=1),
            "memory_percent": psutil.virtual_memory().percent,
            "disk_percent": psutil.disk_usage('/').percent,
            "load_average": psutil.getloadavg(),
            "process_count": len(psutil.pids()),
            "network_io": psutil.net_io_counters()._asdict(),
            "celery_stats": get_celery_stats()
        }
        
        # 检查是否需要告警
        alerts = check_system_alerts(metrics)
        
        return {
            "status": "completed",
            "metrics": metrics,
            "alerts": alerts,
            "healthy": len(alerts) == 0
        }
        
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60, max_retries=3)

def get_celery_stats():
    """获取Celery统计信息"""
    inspect = celery_app.control.inspect()
    
    return {
        "active": inspect.active(),
        "scheduled": inspect.scheduled(),
        "reserved": inspect.reserved(),
        "stats": inspect.stats(),
        "registered": inspect.registered_tasks(),
        "revoked": inspect.revoked()
    }

def check_system_alerts(metrics: dict) -> list:
    """检查系统告警"""
    alerts = []
    
    # CPU使用率告警
    if metrics["cpu_percent"] > 80:
        alerts.append({
            "type": "high_cpu",
            "severity": "warning",
            "message": f"CPU使用率过高: {metrics['cpu_percent']}%"
        })
    
    # 内存使用率告警
    if metrics["memory_percent"] > 85:
        alerts.append({
            "type": "high_memory",
            "severity": "warning", 
            "message": f"内存使用率过高: {metrics['memory_percent']}%"
        })
    
    # 磁盘使用率告警
    if metrics["disk_percent"] > 90:
        alerts.append({
            "type": "high_disk",
            "severity": "critical",
            "message": f"磁盘使用率过高: {metrics['disk_percent']}%"
        })
    
    # 队列积压告警
    if metrics["celery_stats"]["active"]:
        active_count = sum(len(workers) for workers in metrics["celery_stats"]["active"].values())
        if active_count > 50:  # 假设超过50个活跃任务为积压
            alerts.append({
                "type": "queue_backlog",
                "severity": "warning",
                "message": f"任务队列积压: {active_count} 个活跃任务"
            })
    
    return alerts

@celery_app.task(bind=True, name="tasks.monitoring.queue_monitor")
def queue_monitor(self):
    """队列监控任务"""
    try:
        inspect = celery_app.control.inspect()
        
        # 获取队列状态
        active_tasks = inspect.active()
        scheduled_tasks = inspect.scheduled()
        reserved_tasks = inspect.reserved()
        
        # 统计信息
        stats = {
            "active_workers": len(active_tasks) if active_tasks else 0,
            "total_active_tasks": sum(len(tasks) for tasks in active_tasks.values()) if active_tasks else 0,
            "total_scheduled_tasks": sum(len(tasks) for tasks in scheduled_tasks.values()) if scheduled_tasks else 0,
            "total_reserved_tasks": sum(len(tasks) for tasks in reserved_tasks.values()) if reserved_tasks else 0,
            "queue_sizes": get_queue_sizes(),
            "task_rates": get_task_rates()
        }
        
        # 检查异常情况
        issues = check_queue_issues(stats)
        
        return {
            "status": "completed",
            "statistics": stats,
            "issues": issues,
            "healthy": len(issues) == 0
        }
        
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60, max_retries=3)

def get_queue_sizes():
    """获取队列大小"""
    # 这里需要访问Redis或其他broker来获取队列长度
    # 示例实现(需要根据实际broker调整)
    return {
        "default": 0,
        "ai_tasks": 0,
        "email_tasks": 0,
        "file_processing": 0
    }

def get_task_rates():
    """获取任务速率"""
    # 获取最近一段时间内的任务处理速率
    return {
        "tasks_per_minute": 0,
        "tasks_per_hour": 0,
        "average_processing_time": 0
    }

def check_queue_issues(stats: dict) -> list:
    """检查队列问题"""
    issues = []
    
    if stats["total_active_tasks"] > 100:
        issues.append({
            "type": "high_active_tasks",
            "severity": "warning",
            "message": f"活跃任务过多: {stats['total_active_tasks']} 个"
        })
    
    if stats["total_scheduled_tasks"] > 50:
        issues.append({
            "type": "high_scheduled_tasks", 
            "severity": "warning",
            "message": f"调度任务过多: {stats['total_scheduled_tasks']} 个"
        })
    
    return issues

# 任务生命周期信号监听
@task_prerun.connect
def task_prerun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):
    """任务运行前信号"""
    logger.info(f"Task {task_id} ({task.name}) starting...")
    # 可以在这里记录任务开始时间、分配资源等

@task_postrun.connect
def task_postrun_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds):
    """任务运行后信号"""
    logger.info(f"Task {task_id} ({task.name}) completed with state: {state}")
    # 可以在这里记录任务完成时间、释放资源、发送通知等

@task_revoked.connect
def task_revoked_handler(sender=None, task_id=None, terminated=None, signum=None, **kwds):
    """任务撤销信号"""
    logger.warning(f"Task {task_id} was revoked (terminated: {terminated}, signal: {signum})")
    # 可以在这里清理被撤销任务的资源

Flower监控配置

# config/flower_config.py - Flower监控配置
"""
Flower监控配置和使用说明

安装:
pip install flower

启动:
celery -A celery_app flower --port=5555

配置选项:
"""
import os

class FlowerConfig:
    """Flower配置类"""
    
    # 基础配置
    port = 5555
    address = '0.0.0.0'
    debug = False
    
    # 认证配置
    basic_auth = os.getenv('FLOWER_BASIC_AUTH')  # 格式: "username:password"
    auth = os.getenv('FLOWER_OAUTH_CREDENTIALS')  # OAuth配置
    
    # Broker配置
    broker_api = os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0')
    
    # 数据库配置(用于持久化监控数据)
    persistent = True
    db = os.getenv('FLOWER_DB_PATH', '/tmp/flower.db')
    
    # 时间配置
    natural_time = True
    
    # API配置
    max_workers = 100
    max_tasks = 10000
    
    # 跨域配置
    cors_options = {
        'origins': ['*'],
        'credentials': True,
        'methods': ['GET', 'POST'],
        'headers': ['*'],
    }

# 启动Flower命令示例
"""
celery -A celery_app flower \
    --port=5555 \
    --basic_auth=user:password \
    --broker_api=redis://localhost:6379/0 \
    --persistent=True \
    --db=/var/lib/flower/flower.db
"""


## 生产环境部署 \{#生产环境部署}

### Docker部署方案

```dockerfile
# Dockerfile - Celery Worker
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建非root用户
RUN useradd --create-home --shell /bin/bash celery
USER celery

# 启动Celery Worker
CMD ["celery", "-A", "celery_app", "worker", "--loglevel=info", "--concurrency=4"]
# Dockerfile.beat - Celery Beat
FROM python:3.11-slim

WORKDIR /app

# 安装系统依赖
RUN apt-get update && apt-get install -y \
    gcc \
    && rm -rf /var/lib/apt/lists/*

# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# 复制应用代码
COPY . .

# 创建非root用户
RUN useradd --create-home --shell /bin/bash celery
USER celery

# 启动Celery Beat
CMD ["celery", "-A", "celery_app", "beat", "--loglevel=info"]
# docker-compose.yml - 完整部署配置
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    restart: always
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
    command: redis-server --appendonly yes

  worker:
    build:
      context: .
      dockerfile: Dockerfile
    restart: always
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379/0
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1
    volumes:
      - ./logs:/app/logs
    deploy:
      replicas: 3  # 多个Worker实例

  beat:
    build:
      context: .
      dockerfile: Dockerfile.beat
    restart: always
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379/0
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/1

  flower:
    image: mher/flower:latest
    restart: always
    depends_on:
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - FLOWER_PORT=5555
    ports:
      - "5555:5555"
    command: ["--broker=redis://redis:6379/0", "--port=5555"]

volumes:
  redis_data:

系统服务配置

# /etc/systemd/system/celery-worker.service - Worker服务
[Unit]
Description=Celery Worker
After=network.target redis.service

[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/opt/daoman
Environment="CELERY_CONFIG_MODULE=config.celery_config"
ExecStart=/opt/daoman/venv/bin/celery -A celery_app worker \
    --pidfile=/var/run/celery/worker.pid \
    --logfile=/var/log/celery/worker.log \
    --loglevel=INFO \
    --concurrency=8 \
    --max-tasks-per-child=1000
ExecReload=/bin/kill -HUP $MAINPID
KillSignal=SIGTERM
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target
# /etc/systemd/system/celery-beat.service - Beat服务
[Unit]
Description=Celery Beat
After=network.target redis.service

[Service]
Type=forking
User=www-data
Group=www-data
WorkingDirectory=/opt/daoman
Environment="CELERY_CONFIG_MODULE=config.celery_config"
ExecStart=/opt/daoman/venv/bin/celery -A celery_app beat \
    --pidfile=/var/run/celery/beat.pid \
    --logfile=/var/log/celery/beat.log \
    --loglevel=INFO
ExecReload=/bin/kill -HUP $MAINPID
KillSignal=SIGTERM
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

部署脚本

#!/bin/bash
# deploy_celery.sh - Celery部署脚本

set -e

APP_NAME="daoman"
APP_DIR="/opt/$APP_NAME"
VENV_DIR="$APP_DIR/venv"
LOG_DIR="/var/log/celery"
RUN_DIR="/var/run/celery"

echo "🚀 开始部署 Celery..."

# 创建必要目录
sudo mkdir -p $LOG_DIR $RUN_DIR
sudo chown www-data:www-data $LOG_DIR $RUN_DIR

# 更新代码
cd $APP_DIR
git pull origin main

# 更新依赖
$VENV_DIR/bin/pip install -r requirements.txt

# 重启服务
echo "🔄 重启 Celery Worker..."
sudo systemctl restart celery-worker
sudo systemctl status celery-worker

echo "🔄 重启 Celery Beat..."
sudo systemctl restart celery-beat
sudo systemctl status celery-beat

# 验证部署
sleep 5
if sudo systemctl is-active --quiet celery-worker && sudo systemctl is-active --quiet celery-beat; then
    echo "✅ Celery 部署成功"
    echo "📊 活跃 Worker: $(celery -A celery_app inspect active | grep -c '->') "
else
    echo "❌ Celery 部署失败"
    exit 1
fi

echo "🎉 部署完成!"

最佳实践指南

1. 任务设计最佳实践

# best_practices/tasks.py - 任务设计最佳实践

# ✅ 好的做法:任务幂等性
@celery_app.task(bind=True, name="tasks.update_user_profile")
def update_user_profile(self, user_id: int, profile_data: dict):
    """
    更新用户资料 - 幂等操作
    同样的输入多次执行结果一致
    """
    try:
        # 检查任务是否已执行过(防重复)
        if has_task_executed_recently(self.request.id):
            return {"status": "skipped", "reason": "task_already_executed"}
        
        # 执行更新操作
        result = perform_profile_update(user_id, profile_data)
        
        # 记录执行历史
        record_task_execution(self.request.id, user_id, profile_data)
        
        return result
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60, max_retries=3)

# ✅ 好的做法:任务原子性
@celery_app.task(bind=True, name="tasks.transfer_money")
def transfer_money(self, from_account: int, to_account: int, amount: float):
    """
    转账操作 - 原子操作
    使用数据库事务保证一致性
    """
    from sqlalchemy.orm import sessionmaker
    from database import engine
    
    Session = sessionmaker(bind=engine)
    session = Session()
    
    try:
        # 扣款
        debit_result = debit_account(session, from_account, amount)
        if not debit_result:
            raise ValueError(f"Insufficient funds in account {from_account}")
        
        # 入账
        credit_result = credit_account(session, to_account, amount)
        if not credit_result:
            raise ValueError(f"Failed to credit account {to_account}")
        
        # 提交事务
        session.commit()
        
        return {
            "status": "success",
            "from_account": from_account,
            "to_account": to_account,
            "amount": amount,
            "transaction_id": generate_transaction_id()
        }
    except Exception as exc:
        session.rollback()
        raise self.retry(exc=exc, countdown=30, max_retries=2)
    finally:
        session.close()

# ✅ 好的做法:任务数据序列化
@celery_app.task(bind=True, name="tasks.process_complex_data")
def process_complex_data(self, data: dict):
    """
    处理复杂数据 - 安全序列化
    避免传递不可序列化的对象
    """
    # 确保数据可以被序列化
    serializable_data = ensure_serializable(data)
    
    # 处理数据
    result = perform_complex_processing(serializable_data)
    
    # 确保结果可以被序列化
    return ensure_serializable(result)

def ensure_serializable(obj):
    """确保对象可以被序列化"""
    import json
    from datetime import datetime, date
    from decimal import Decimal
    
    def serialize_helper(o):
        if isinstance(o, (datetime, date)):
            return o.isoformat()
        elif isinstance(o, Decimal):
            return float(o)
        elif hasattr(o, '__dict__'):
            return o.__dict__
        else:
            return str(o)
    
    try:
        json.dumps(obj, default=serialize_helper)
        return obj
    except TypeError:
        # 如果无法序列化,转换为字典
        return json.loads(json.dumps(obj, default=serialize_helper))

2. 性能优化最佳实践

# best_practices/performance.py - 性能优化最佳实践

# ✅ 批处理优化
@celery_app.task(bind=True, name="tasks.batch_process_emails")
def batch_process_emails(self, email_data_list: list, batch_size: int = 50):
    """
    批量处理邮件 - 减少网络请求次数
    """
    total_emails = len(email_data_list)
    
    for i in range(0, total_emails, batch_size):
        batch = email_data_list[i:i + batch_size]
        
        # 一次请求处理一批邮件
        batch_result = process_email_batch(batch)
        
        # 更新进度
        processed = min(i + batch_size, total_emails)
        self.update_state(state='PROGRESS', meta={
            'current': processed,
            'total': total_emails,
            'progress': int((processed / total_emails) * 100),
            'status': f'Processed {processed}/{total_emails} emails'
        })
    
    return {
        'status': 'completed',
        'total_emails': total_emails,
        'batches_processed': (total_emails + batch_size - 1) // batch_size
    }

# ✅ 缓存优化
from functools import lru_cache
import redis

redis_client = redis.Redis(host='localhost', port=6379, db=2)

@celery_app.task(bind=True, name="tasks.expensive_calculation")
def expensive_calculation(self, input_data: dict):
    """
    昂贵计算 - 使用缓存避免重复计算
    """
    # 生成缓存键
    cache_key = f"calculation:{hash(str(sorted(input_data.items())))}"
    
    # 尝试从缓存获取结果
    cached_result = redis_client.get(cache_key)
    if cached_result:
        return json.loads(cached_result)
    
    # 执行计算
    result = perform_expensive_calculation(input_data)
    
    # 缓存结果(1小时过期)
    redis_client.setex(cache_key, 3600, json.dumps(result))
    
    return result

# ✅ 数据库连接优化
from sqlalchemy.pool import StaticPool
from sqlalchemy import create_engine
import threading

# 为每个Worker进程创建连接池
_local = threading.local()

def get_db_engine():
    """获取数据库引擎(每个线程一个)"""
    if not hasattr(_local, 'engine'):
        _local.engine = create_engine(
            'sqlite:///example.db',  # 实际使用时替换为真实数据库
            poolclass=StaticPool,
            creator=lambda: sqlite3.connect(':memory:')
        )
    return _local.engine

@celery_app.task(bind=True, name="tasks.database_intensive_task")
def database_intensive_task(self, query_params: dict):
    """
    数据库密集型任务 - 优化连接使用
    """
    engine = get_db_engine()
    with engine.connect() as conn:
        # 执行数据库操作
        result = conn.execute(text("SELECT * FROM table WHERE condition=:condition"), 
                             condition=query_params.get('condition'))
        return [dict(row) for row in result]

3. 安全最佳实践

# best_practices/security.py - 安全最佳实践

# ✅ 输入验证
from marshmallow import Schema, fields, ValidationError

class TaskInputSchema(Schema):
    user_id = fields.Integer(required=True)
    data = fields.Dict(required=True)
    callback_url = fields.Url(required=False)

task_input_schema = TaskInputSchema()

@celery_app.task(bind=True, name="tasks.secure_task")
def secure_task(self, input_data: dict):
    """
    安全任务 - 输入验证和清理
    """
    try:
        # 验证输入
        validated_data = task_input_schema.load(input_data)
        
        # 清理危险字符
        sanitized_data = sanitize_input(validated_data)
        
        # 执行任务
        return process_secure_task(sanitized_data)
        
    except ValidationError as exc:
        raise ValueError(f"Invalid input: {exc.messages}")

def sanitize_input(data: dict) -> dict:
    """清理输入数据"""
    import html
    import re
    
    def clean_value(value):
        if isinstance(value, str):
            # HTML转义
            value = html.escape(value)
            # 移除危险字符
            value = re.sub(r'[<>"\']', '', value)
        elif isinstance(value, dict):
            return {k: clean_value(v) for k, v in value.items()}
        elif isinstance(value, list):
            return [clean_value(item) for item in value]
        return value
    
    return clean_value(data)

# ✅ 任务隔离
import subprocess
import tempfile
import os

@celery_app.task(bind=True, name="tasks.isolated_execution")
def isolated_execution(self, code: str, inputs: dict):
    """
    隔离执行 - 在沙箱中运行代码
    """
    # 创建临时目录
    with tempfile.TemporaryDirectory() as temp_dir:
        # 写入代码到临时文件
        code_file = os.path.join(temp_dir, "user_code.py")
        with open(code_file, "w") as f:
            f.write(code)
        
        # 在隔离环境中执行
        result = subprocess.run(
            [sys.executable, code_file],
            input=json.dumps(inputs),
            text=True,
            capture_output=True,
            timeout=30,  # 30秒超时
            cwd=temp_dir
        )
        
        return {
            "stdout": result.stdout,
            "stderr": result.stderr,
            "return_code": result.returncode
        }

常见问题解答

Q1: 如何处理任务超时?

A: 可以通过设置task_time_limittask_soft_time_limit来控制硬超时和软超时。硬超时会强制终止任务,软超时会抛出SoftTimeLimitExceeded异常,允许任务优雅地处理超时。

Q2: 如何实现任务优先级?

A: 可以通过配置多个队列,将不同优先级的任务路由到不同的队列。高优先级队列配置更高的消费速率。

Q3: 如何处理任务失败和重试?

A: Celery提供了丰富的重试机制,可以通过retry()方法手动重试,或使用autoretry_for参数自动重试。建议根据错误类型决定是否重试。

Q4: 如何监控任务执行情况?

A: 可以使用Flower进行实时监控,或通过Celery的inspect接口获取任务状态。也可以自定义信号处理器来记录任务执行情况。

Q5: 如何处理大量任务积压?

A: 可以通过增加Worker数量、优化任务执行效率、使用任务分片等方式处理任务积压。也可以考虑使用优先级队列,确保重要任务优先执行。

总结

Celery为FastAPI应用提供了强大的异步任务处理能力,适用于各种耗时操作:

  1. 架构优势:生产者-消费者模式,解耦服务,提高响应性
  2. 扩展性:支持多Worker、多队列,轻松水平扩展
  3. 可靠性:任务持久化、失败重试、死信队列
  4. 灵活性:定时任务、动态调度、任务链式执行

💡 核心要点:合理设计任务粒度,优化任务执行效率,建立完善的监控体系,确保系统的稳定性和可维护性。


SEO优化建议

为了提高这篇Celery教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:

标题优化

  • 主标题:使用包含核心关键词的标题,如"FastAPI异步任务队列Celery完全指南"
  • 二级标题:每个章节标题都包含相关的长尾关键词
  • H1-H6层次结构:保持正确的标题层级,便于搜索引擎理解内容结构

内容优化

  • 关键词密度:在内容中自然地融入关键词如"Celery", "异步任务", "任务队列", "AI模型推理", "定时任务"等
  • 元描述:在文章开头的元数据中包含吸引人的描述
  • 内部链接:链接到其他相关教程,如流式响应 StreamingResponse
  • 外部权威链接:引用官方文档和权威资源

技术SEO

  • 页面加载速度:优化代码块和图片加载
  • 移动端适配:确保在移动设备上良好显示
  • 结构化数据:使用适当的HTML标签和语义化元素

用户体验优化

  • 内容可读性:使用清晰的段落结构和代码示例
  • 互动元素:提供实际可运行的代码示例
  • 更新频率:定期更新内容以保持时效性

常见问题解答(FAQ)

Q1: Celery和FastAPI如何集成?

A: 通过在FastAPI应用中调用Celery任务的delay()方法提交任务,然后通过AsyncResult获取任务状态。可以使用WebSocket实现实时状态推送。

Q2: 如何处理大量并发任务?

A: 需要合理配置Worker数量,使用多队列分离不同类型的任务,实施任务分片处理,监控系统资源使用情况。

Q3: 任务失败如何处理?

A: 可以配置自动重试机制,使用死信队列处理永久失败的任务,记录失败日志便于排查问题。

Q4: 如何监控Celery性能?

A: 使用Flower进行可视化监控,配置系统监控收集性能指标,自定义监控任务检查队列状态。

Q5: 生产环境部署需要注意什么?

A: 配置持久化存储,设置合理的资源限制,实施安全措施,建立完整的监控告警体系。


🔗 相关教程推荐

🏷️ 标签云: FastAPI Celery 异步任务队列 AI模型推理 定时任务 任务调度 后台处理 消息队列 任务监控 性能优化