FastAPIasync-task-queue-celery完全指南
📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:流式响应 StreamingResponse · Redis 集成
异步任务队列:为什么我们需要它?
假设你的用户请求背后藏着这些操作:
- 🎨 AI一秒生成头像,模型加载就要5秒
- 📧 批量发送5000封周报邮件
- 🎥 转码一段10GB的高清视频
如果直接用普通的 async def 或同步函数处理,会发生什么?
- 用户盯着空白的浏览器等待,甚至超时报错
- 服务器资源被单个请求严重占用,后面的请求排起长队
- 一旦中途失败,连重试都无从下手
异步任务队列正是为了解决这类“耗时/高风险操作”而生的。它的核心思想非常简单:把复杂的工作从请求线程里抽出来,交给后台的专职工人(Worker)去慢慢消化。
用 Celery + FastAPI 实现这个流程,大体是这样:
sequenceDiagram
participant User as 用户
participant FastAPI as FastAPI(Producer)
participant Broker as Broker(Redis/RabbitMQ)
participant Worker as Celery Worker
participant Backend as Backend(Redis)
User->>FastAPI: 生成一张动漫头像
FastAPI->>Broker: 提交任务到队列
FastAPI->>User: 立即返回「任务ID + 预估完成时间」
Broker->>Worker: 分配空闲Worker
Worker->>Backend: 存储进度/结果
User->>FastAPI: 轮询或WebSocket查询状态
FastAPI->>Backend: 获取最新状态
Backend->>FastAPI: 返回头像URL
FastAPI->>User: 展示结果
整个过程中,用户只是提交了一个任务描述,然后立刻拿到一个“取货单号”,后续可以随时来问进度,完全不用傻等。而服务器端的FastAPI应用始终保持轻量、响应迅速。
快速上手:三步搭好基础架构
1. 安装核心依赖
使用 Redis 作为任务队列的 Broker(消息代理)和 Backend(结果存储)是目前最简洁、高效的方式:
pip install fastapi uvicorn celery[redis] redis
其中 celery[redis] 会自动安装 Celery 与 Redis 交互所需要的所有组件。
2. 编写 Celery 应用与一个示例任务
创建文件 celery_app.py,这是整个后台任务的大脑:
# celery_app.py
from celery import Celery
import time
# 创建 Celery 实例,broker 和 backend 可以分别指定不同的 Redis 数据库
celery = Celery(
'daoman_fastapi_celery',
broker='redis://localhost:6379/0', # 任务队列存储在 Redis 0 号库
backend='redis://localhost:6379/1' # 结果存放在 Redis 1 号库(隔离,便于管理)
)
# 定义一个模拟邮件发送任务
@celery.task(bind=True, name="tasks.send_email")
def send_email(self, to: str, subject: str, body: str):
"""
异步发送邮件,附带进度上报与自动重试
"""
try:
# 更新任务状态为 PROGRESS,并告知当前步骤
self.update_state(state='PROGRESS', meta={'step': 'preparing'})
time.sleep(1) # 模拟准备阶段耗时
self.update_state(state='PROGRESS', meta={'step': 'sending'})
time.sleep(2) # 模拟实际发送
# 成功完成,返回结果字典
return {'status': 'success', 'to': to, 'time': time.time()}
except Exception as exc:
# 失败时自动重试:延迟60秒,最多重试3次
raise self.retry(exc=exc, countdown=60, max_retries=3)
3. 集成 FastAPI 并启动整个服务
新建 main.py,提供两个接口:提交任务与查询状态。
# main.py
from fastapi import FastAPI, HTTPException
from celery.result import AsyncResult
from celery_app import celery, send_email
from pydantic import BaseModel, EmailStr
app = FastAPI(title="道满FastAPI-Celery教程")
# 请求体模型,利用 Pydantic 自动校验入参
class EmailReq(BaseModel):
to: EmailStr
subject: str
body: str
@app.post("/tasks/email")
async def create_email_task(req: EmailReq):
# 将任务推入队列,传入收件人、主题、正文
task = send_email.delay(req.to, req.subject, req.body)
return {
"task_id": task.id,
"status": "submitted",
"message": "邮件已进入队列,预计3秒左右完成"
}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
task = AsyncResult(task_id, app=celery)
# 根据任务状态返回不同的信息
return {
"task_id": task_id,
"state": task.state,
"info": task.info if task.state in ['PROGRESS', 'FAILURE'] else None,
"result": task.result if task.state == 'SUCCESS' else None
}
依次启动三个进程(本地调试推荐使用三个终端窗口):
-
启动 Redis 服务(若未启动):
-
启动 Celery Worker,并指定并发数:
celery -A celery_app worker --loglevel=INFO --concurrency=4
💡 --concurrency=4 表示同时处理4个任务,可根据CPU核心数调整。
-
启动 FastAPI 应用(开启热重载方便开发):
uvicorn main:app --reload
此时,访问 http://127.0.0.1:8000/docs 就能看到 Swagger 文档,可以直接测试提交任务和查询状态。
核心进阶:企业级常用配置
前面的示例虽然能跑,但生产环境需要考虑更多细节:超时、重试策略、队列隔离、内存泄漏保护等。我们把这些最佳实践汇总成一个统一的配置类。
1. 完整的 Celery 配置文件
新建 config/celery_config.py:
# config/celery_config.py
from kombu import Queue
import os
class CeleryConfig:
# ---------- Broker & Backend ----------
BROKER_URL = os.getenv('CELERY_BROKER', 'redis://localhost:6379/0')
RESULT_BACKEND = os.getenv('CELERY_BACKEND', 'redis://localhost:6379/1')
# ---------- 序列化方式 ----------
TASK_SERIALIZER = 'json'
ACCEPT_CONTENT = ['json']
RESULT_SERIALIZER = 'json'
# ---------- 时区 ----------
TIMEZONE = 'Asia/Shanghai'
ENABLE_UTC = False
# ---------- 任务执行控制 ----------
TASK_TRACK_STARTED = True # 开启 STARTED 状态跟踪
TASK_TIME_LIMIT = 300 # 硬超时5分钟(强制杀死子进程)
TASK_SOFT_TIME_LIMIT = 240 # 软超时4分钟(会抛出 SoftTimeLimitExceeded 异常,可在任务内处理)
# ---------- 重试策略 ----------
TASK_RETRY_BACKOFF = True # 指数退避:重试间隔逐渐增加,防止雪崩
TASK_RETRY_BACKOFF_MAX = 700 # 最大退避间隔700秒
TASK_RETRY_JITTER = True # 随机抖动:避免大量任务在同一时刻集中重试
# ---------- Worker 优化 ----------
WORKER_PREFETCH_MULTIPLIER = 1 # 每个 Worker 一次只预取1个任务,防止任务积压在某个 Worker 上
WORKER_MAX_TASKS_PER_CHILD = 1000 # 子进程处理1000个任务后重启,避免内存泄漏
WORKER_DISABLE_RATE_LIMITS = True # 在应用层不限制速率(利用 Redis 更快)
# ---------- 队列路由:按任务类型分流 ----------
TASK_QUEUES = (
Queue('default', routing_key='default'),
Queue('ai_tasks', routing_key='ai.#'),
Queue('email_tasks', routing_key='email.#'),
)
TASK_ROUTES = {
'tasks.ai.*': {'queue': 'ai_tasks'}, # AI相关任务进入 ai_tasks 队列
'tasks.email.*': {'queue': 'email_tasks'}, # 邮件任务进入 email_tasks 队列
}
在 celery_app.py 中引入配置:
from config.celery_config import CeleryConfig
celery = Celery('daoman_fastapi_celery')
celery.config_from_object(CeleryConfig)
这样一来,你的 Celery 就具备了生产级别的可靠性和灵活性。
2. 定时任务配置(Celery Beat)
许多业务需要定期执行的工作,比如每天凌晨清理临时文件、每隔15分钟检查队列健康状态。这可以通过 Celery Beat + 定时调度实现。
在 celery_beat.py 中配置节拍:
# celery_beat.py
from celery.schedules import crontab
from celery_app import celery
import os
celery.conf.beat_schedule = {
# 每天凌晨3点清理过期临时文件
"cleanup-temp-files": {
"task": "tasks.maintenance.cleanup_temp",
"schedule": crontab(hour=3, minute=0),
"options": {"queue": "maintenance"}
},
# 每15分钟检查队列健康状态
"check-queue-health": {
"task": "tasks.monitoring.check_queue",
"schedule": crontab(minute="*/15"),
"options": {"queue": "monitoring"}
}
}
启动 Beat 服务(需要单独进程):
celery -A celery_app beat --loglevel=INFO
⚠️ 注意:不要和 Worker 共用同一个进程,Beat 只负责按时发送任务,Worker 负责执行。
生产环境关键要点
1. 监控告警 —— Flower
Celery 官方提供了一个极其好用的实时监控面板 Flower,你可以直观地看到所有任务的状态、Worker 的负载、成功率等关键指标。
pip install flower
celery -A celery_app flower --port=5555 --basic_auth=admin:你的强密码
启动后访问 http://your-server:5555,输入你设置的用户名密码,就能一览任务全局。
强烈建议在生产环境配合反向代理(如 Nginx)并启用 HTTPS。
2. Docker 化部署
容器化部署能让 Worker 的环境保持统一、快速扩容。一个安全的 Dockerfile 示例如下:
# Dockerfile.worker
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# 创建非 root 用户,提升安全性
RUN useradd -m celery
USER celery
# 启动命令,指定监听的队列
CMD ["celery", "-A", "celery_app", "worker", \
"--loglevel=INFO", "--queues=default,ai_tasks"]
💡 通过 --queues 参数,你可以让专门的 Worker 只消费某些队列,例如让具有 GPU 的机器只跑 ai_tasks。
总结
Celery 是 FastAPI 生态中最成熟、最灵活的异步任务队列方案,它的价值可以浓缩为:
- ✅ 将用户请求与耗时操作彻底解耦
- ✅ Worker 支持横向扩展,随流量轻松增减
- ✅ 强大的重试、死信队列、定时任务功能
- ✅ 成熟的监控工具(Flower / Prometheus 集成)
- ✅ 与 FastAPI 的集成优雅、自然
当你面对AI 推理、图像/视频处理、批量邮件、报表生成等场景时,Celery + FastAPI 的组合会让你事半功倍。
常见问题FAQ
Q1: 任务提交后一直没有被执行?
可能的原因:
- Redis 服务未启动,或 Worker 连接的 Broker 地址不正确
- Worker 多队列配置错误,任务去了某个队列但没有 Worker 在监听
- 任务路由 key 写错,导致任务被丢弃到
celery 默认队列,而 Worker 只监听特定队列
快速排查:
- 在 Worker 日志中查看是否出现
received task 字样
- 使用 Flower 面板检查队列堆积情况
- 临时将所有 Worker 改为监听
default 队列测试
Q2: 如何实现任务优先级?
Celery 原生没有直接的任务优先级字段,但可以通过多队列 + 不同数量的 Worker 来间接实现:
- 为高优先级任务创建一个
high_priority 队列
- 为该队列分配更多的 Worker 进程或独立的高配服务器
- 低优先级队列可以稍后消费甚至限流
另一种更精细的控制方式是通过 Redis 的 BRPOP 配合 LPUSH 手动插入队列头部,但一般不推荐过度依赖,多队列方案已经足够应对绝大多数场景。
Q3: 任务超时后还能重试吗?
- 软超时(
SOFT_TIME_LIMIT):任务内可以捕获 SoftTimeLimitExceeded 异常,自行决定是否重试或做清理工作。
- 硬超时(
TIME_LIMIT):操作系统直接 SIGKILL 杀死子进程,子进程无法感知,因此无法在任务内重试。但这种情况下,Celery 本身会认为任务失败,你可以根据配置的 autoretry_for 或全局重试策略让 Worker 自动重试。
建议始终设置一个略长于软超时的硬超时,作为最后的保底手段。
🔗 相关教程