异步任务队列 Celery:处理耗时的 AI 模型推理与图像生成任务

📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:流式响应 StreamingResponse · Redis 集成


1. 为什么需要任务队列?

1.1 场景问题

用户请求:生成一张 AI 图片
耗时:30 秒

同步处理:HTTP 请求超时 → 用户等 30 秒 → 返回结果
         ❌ 前端超时、连接断开、用户体验差

Celery 异步:立即返回任务 ID → 后台处理 → 用户轮询/回调通知
             ✅ 立即响应、后台执行、实时查询

1.2 Celery 架构

┌──────────┐    ┌──────────┐    ┌──────────┐    ┌──────────┐
│  FastAPI │───→│  Broker  │───→│  Worker  │───→│  Redis/  │
│ (提交任务)│    │ (Redis/  │    │ (执行任务)│    │  DB     │
│          │    │  RabbitMQ)│    │          │    │ (存储结果)│
└──────────┘    └──────────┘    └──────────┘    └──────────┘
     ↑                                        │
     └──────────── 轮询任务状态 ←─────────────┘

2. 安装与配置

2.1 安装

pip install celery[redis] redis

2.2 Celery 配置

# celery_app.py
from celery import Celery

celery_app = Celery(
    "daoman",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
    include=["tasks.ai_tasks", "tasks.email_tasks"],
)

# 配置
celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Shanghai",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,          # 任务最多运行 5 分钟
    task_soft_time_limit=240,     # 4 分钟时发出警告
    worker_prefetch_multiplier=1,  # 防止积压
    task_acks_late=True,           # 任务完成后才确认
    task_reject_on_worker_lost=True,
    result_expires=3600,          # 结果 1 小时后自动清除
)

3. 定义任务

3.1 基础任务

# tasks/email_tasks.py
from celery_app import celery_app
import time

@celery_app.task(bind=True, name="tasks.send_email")
def send_email(self, to: str, subject: str, body: str):
    """发送邮件任务(后台执行,不阻塞请求)"""
    print(f"正在发送邮件到 {to}...")
    time.sleep(5)  # 模拟邮件发送耗时
    print(f"邮件已发送到 {to}")
    return {"status": "sent", "to": to, "subject": subject}

3.2 AI 图片生成任务

# tasks/ai_tasks.py
from celery_app import celery_app
import asyncio
from typing import Optional

@celery_app.task(bind=True, name="tasks.generate_image")
def generate_image(self, prompt: str, style: str = "realistic", user_id: int = None):
    """
    AI 图片生成任务
    """
    from services.image_generator import ImageGenerator

    self.update_state(
        state="PROGRESS",
        meta={"progress": 10, "status": "正在初始化模型..."}
    )

    generator = ImageGenerator()

    self.update_state(
        state="PROGRESS",
        meta={"progress": 30, "status": "正在生成图片..."}
    )

    image_path = generator.generate(prompt=prompt, style=style)

    self.update_state(
        state="PROGRESS",
        meta={"progress": 90, "status": "保存到存储..."}
    )

    # 保存记录到数据库
    if user_id:
        save_image_record(user_id, image_path, prompt)

    return {
        "image_url": image_path,
        "prompt": prompt,
        "style": style,
    }


@celery_app.task(bind=True, name="tasks.batch_process_articles")
def batch_process_articles(self, article_ids: list[int]):
    """批量处理文章(摘要生成、SEO 优化等)"""
    total = len(article_ids)
    results = []

    for i, article_id in enumerate(article_ids):
        # 更新进度
        self.update_state(
            state="PROGRESS",
            meta={"current": i + 1, "total": total, "progress": int((i / total) * 100)}
        )

        # 处理每篇文章
        result = process_single_article(article_id)
        results.append(result)

    return {"processed": len(results), "results": results}

4. FastAPI 调用任务

4.1 提交任务

# routers/ai.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from celery.result import AsyncResult
from tasks.ai_tasks import generate_image, batch_process_articles
from pydantic import BaseModel

router = APIRouter()

class ImageRequest(BaseModel):
    prompt: str
    style: str = "realistic"
    user_id: int | None = None

# 提交任务(立即返回任务 ID)
@router.post("/generate-image")
async def create_image_task(request: ImageRequest):
    task = generate_image.delay(
        prompt=request.prompt,
        style=request.style,
        user_id=request.user_id,
    )
    return {
        "task_id": task.id,
        "status": "pending",
        "message": "任务已提交,请稍后查询结果",
    }

# 查询任务状态
@router.get("/task/{task_id}")
async def get_task_status(task_id: str):
    task_result = AsyncResult(task_id, app=celery_app)

    response = {
        "task_id": task_id,
        "status": task_result.state,
    }

    if task_result.state == "PROGRESS":
        response["info"] = task_result.info

    elif task_result.state == "SUCCESS":
        response["result"] = task_result.result

    elif task_result.state == "FAILURE":
        response["error"] = str(task_result.info)

    return response

4.2 批量提交 + 汇总结果

@router.post("/batch-generate")
async def batch_generate(prompts: list[str]):
    """一次性提交多个任务"""
    tasks = [generate_image.delay(prompt=p) for p in prompts]
    task_ids = [t.id for t in tasks]
    return {"task_ids": task_ids, "total": len(task_ids)}

@router.get("/batch-status")
async def batch_status(task_ids: list[str]):
    """批量查询任务状态"""
    results = {}
    for tid in task_ids:
        result = AsyncResult(tid, app=celery_app)
        results[tid] = {
            "state": result.state,
            "result": result.result if result.ready() else None,
        }
    return results

5. 启动 Worker

5.1 启动命令

# 开发环境
celery -A celery_app worker --loglevel=info --concurrency=4

# 生产环境(守护进程 + 日志)
celery -A celery_app worker \
    --loglevel=info \
    --concurrency=8 \
    --max-tasks-per-child=1000 \
    --detach \
    --logfile=/var/log/celery/worker.log \
    --pidfile=/var/run/celery/worker.pid

# Flower 监控(实时查看任务状态)
pip install flower
celery -A celery_app flower --port=5555
# 打开 http://localhost:5555 查看任务面板

5.2 Systemd 管理

# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker
After=network.target redis.service

[Service]
Type=forking
User=www-data
WorkingDirectory=/opt/daoman
Environment="ENV=production"
ExecStart=/opt/daoman/venv/bin/celery -A celery_app worker \
    --concurrency=8 --loglevel=info
Restart=always

[Install]
WantedBy=multi-user.target

6. Celery Beat 定时任务

# celery_beat_config.py
from celery.schedules import crontab

celery_app.conf.beat_schedule = {
    "daily-cleanup": {
        "task": "tasks.cleanup_old_files",
        "schedule": crontab(hour=3, minute=0),  # 每天凌晨 3 点
    },
    "weekly-report": {
        "task": "tasks.send_weekly_report",
        "schedule": crontab(hour=9, minute=0, day_of_week=1),  # 每周一早上 9 点
    },
}

# 启动 Beat
celery -A celery_app beat --loglevel=info

7. 小结

# Celery 五步走