#异步任务队列 Celery:处理耗时的 AI 模型推理与图像生成任务
📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:流式响应 StreamingResponse · Redis 集成
#1. 为什么需要任务队列?
#1.1 场景问题
用户请求:生成一张 AI 图片
耗时:30 秒
同步处理:HTTP 请求超时 → 用户等 30 秒 → 返回结果
❌ 前端超时、连接断开、用户体验差
Celery 异步:立即返回任务 ID → 后台处理 → 用户轮询/回调通知
✅ 立即响应、后台执行、实时查询#1.2 Celery 架构
┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ FastAPI │───→│ Broker │───→│ Worker │───→│ Redis/ │
│ (提交任务)│ │ (Redis/ │ │ (执行任务)│ │ DB │
│ │ │ RabbitMQ)│ │ │ │ (存储结果)│
└──────────┘ └──────────┘ └──────────┘ └──────────┘
↑ │
└──────────── 轮询任务状态 ←─────────────┘#2. 安装与配置
#2.1 安装
pip install celery[redis] redis#2.2 Celery 配置
# celery_app.py
from celery import Celery
celery_app = Celery(
"daoman",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
include=["tasks.ai_tasks", "tasks.email_tasks"],
)
# 配置
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Asia/Shanghai",
enable_utc=True,
task_track_started=True,
task_time_limit=300, # 任务最多运行 5 分钟
task_soft_time_limit=240, # 4 分钟时发出警告
worker_prefetch_multiplier=1, # 防止积压
task_acks_late=True, # 任务完成后才确认
task_reject_on_worker_lost=True,
result_expires=3600, # 结果 1 小时后自动清除
)#3. 定义任务
#3.1 基础任务
# tasks/email_tasks.py
from celery_app import celery_app
import time
@celery_app.task(bind=True, name="tasks.send_email")
def send_email(self, to: str, subject: str, body: str):
"""发送邮件任务(后台执行,不阻塞请求)"""
print(f"正在发送邮件到 {to}...")
time.sleep(5) # 模拟邮件发送耗时
print(f"邮件已发送到 {to}")
return {"status": "sent", "to": to, "subject": subject}#3.2 AI 图片生成任务
# tasks/ai_tasks.py
from celery_app import celery_app
import asyncio
from typing import Optional
@celery_app.task(bind=True, name="tasks.generate_image")
def generate_image(self, prompt: str, style: str = "realistic", user_id: int = None):
"""
AI 图片生成任务
"""
from services.image_generator import ImageGenerator
self.update_state(
state="PROGRESS",
meta={"progress": 10, "status": "正在初始化模型..."}
)
generator = ImageGenerator()
self.update_state(
state="PROGRESS",
meta={"progress": 30, "status": "正在生成图片..."}
)
image_path = generator.generate(prompt=prompt, style=style)
self.update_state(
state="PROGRESS",
meta={"progress": 90, "status": "保存到存储..."}
)
# 保存记录到数据库
if user_id:
save_image_record(user_id, image_path, prompt)
return {
"image_url": image_path,
"prompt": prompt,
"style": style,
}
@celery_app.task(bind=True, name="tasks.batch_process_articles")
def batch_process_articles(self, article_ids: list[int]):
"""批量处理文章(摘要生成、SEO 优化等)"""
total = len(article_ids)
results = []
for i, article_id in enumerate(article_ids):
# 更新进度
self.update_state(
state="PROGRESS",
meta={"current": i + 1, "total": total, "progress": int((i / total) * 100)}
)
# 处理每篇文章
result = process_single_article(article_id)
results.append(result)
return {"processed": len(results), "results": results}#4. FastAPI 调用任务
#4.1 提交任务
# routers/ai.py
from fastapi import APIRouter, HTTPException, BackgroundTasks
from celery.result import AsyncResult
from tasks.ai_tasks import generate_image, batch_process_articles
from pydantic import BaseModel
router = APIRouter()
class ImageRequest(BaseModel):
prompt: str
style: str = "realistic"
user_id: int | None = None
# 提交任务(立即返回任务 ID)
@router.post("/generate-image")
async def create_image_task(request: ImageRequest):
task = generate_image.delay(
prompt=request.prompt,
style=request.style,
user_id=request.user_id,
)
return {
"task_id": task.id,
"status": "pending",
"message": "任务已提交,请稍后查询结果",
}
# 查询任务状态
@router.get("/task/{task_id}")
async def get_task_status(task_id: str):
task_result = AsyncResult(task_id, app=celery_app)
response = {
"task_id": task_id,
"status": task_result.state,
}
if task_result.state == "PROGRESS":
response["info"] = task_result.info
elif task_result.state == "SUCCESS":
response["result"] = task_result.result
elif task_result.state == "FAILURE":
response["error"] = str(task_result.info)
return response#4.2 批量提交 + 汇总结果
@router.post("/batch-generate")
async def batch_generate(prompts: list[str]):
"""一次性提交多个任务"""
tasks = [generate_image.delay(prompt=p) for p in prompts]
task_ids = [t.id for t in tasks]
return {"task_ids": task_ids, "total": len(task_ids)}
@router.get("/batch-status")
async def batch_status(task_ids: list[str]):
"""批量查询任务状态"""
results = {}
for tid in task_ids:
result = AsyncResult(tid, app=celery_app)
results[tid] = {
"state": result.state,
"result": result.result if result.ready() else None,
}
return results#5. 启动 Worker
#5.1 启动命令
# 开发环境
celery -A celery_app worker --loglevel=info --concurrency=4
# 生产环境(守护进程 + 日志)
celery -A celery_app worker \
--loglevel=info \
--concurrency=8 \
--max-tasks-per-child=1000 \
--detach \
--logfile=/var/log/celery/worker.log \
--pidfile=/var/run/celery/worker.pid
# Flower 监控(实时查看任务状态)
pip install flower
celery -A celery_app flower --port=5555
# 打开 http://localhost:5555 查看任务面板#5.2 Systemd 管理
# /etc/systemd/system/celery-worker.service
[Unit]
Description=Celery Worker
After=network.target redis.service
[Service]
Type=forking
User=www-data
WorkingDirectory=/opt/daoman
Environment="ENV=production"
ExecStart=/opt/daoman/venv/bin/celery -A celery_app worker \
--concurrency=8 --loglevel=info
Restart=always
[Install]
WantedBy=multi-user.target#6. Celery Beat 定时任务
# celery_beat_config.py
from celery.schedules import crontab
celery_app.conf.beat_schedule = {
"daily-cleanup": {
"task": "tasks.cleanup_old_files",
"schedule": crontab(hour=3, minute=0), # 每天凌晨 3 点
},
"weekly-report": {
"task": "tasks.send_weekly_report",
"schedule": crontab(hour=9, minute=0, day_of_week=1), # 每周一早上 9 点
},
}
# 启动 Beat
celery -A celery_app beat --loglevel=info#7. 小结
# Celery 五步走
