FastAPIasync-task-queue-celery完全指南

📂 所属阶段:第六阶段 — 2026 特色专题(AI 集成篇)
🔗 相关章节:流式响应 StreamingResponse · Redis 集成


异步任务队列:为什么我们需要它?

假设你的用户请求背后藏着这些操作:

  • 🎨 AI一秒生成头像,模型加载就要5秒
  • 📧 批量发送5000封周报邮件
  • 🎥 转码一段10GB的高清视频

如果直接用普通的 async def 或同步函数处理,会发生什么?

  • 用户盯着空白的浏览器等待,甚至超时报错
  • 服务器资源被单个请求严重占用,后面的请求排起长队
  • 一旦中途失败,连重试都无从下手

异步任务队列正是为了解决这类“耗时/高风险操作”而生的。它的核心思想非常简单:把复杂的工作从请求线程里抽出来,交给后台的专职工人(Worker)去慢慢消化

用 Celery + FastAPI 实现这个流程,大体是这样:

sequenceDiagram
    participant User as 用户
    participant FastAPI as FastAPI(Producer)
    participant Broker as Broker(Redis/RabbitMQ)
    participant Worker as Celery Worker
    participant Backend as Backend(Redis)
    
    User->>FastAPI: 生成一张动漫头像
    FastAPI->>Broker: 提交任务到队列
    FastAPI->>User: 立即返回「任务ID + 预估完成时间」
    Broker->>Worker: 分配空闲Worker
    Worker->>Backend: 存储进度/结果
    User->>FastAPI: 轮询或WebSocket查询状态
    FastAPI->>Backend: 获取最新状态
    Backend->>FastAPI: 返回头像URL
    FastAPI->>User: 展示结果

整个过程中,用户只是提交了一个任务描述,然后立刻拿到一个“取货单号”,后续可以随时来问进度,完全不用傻等。而服务器端的FastAPI应用始终保持轻量、响应迅速。


快速上手:三步搭好基础架构

1. 安装核心依赖

使用 Redis 作为任务队列的 Broker(消息代理)和 Backend(结果存储)是目前最简洁、高效的方式:

pip install fastapi uvicorn celery[redis] redis

其中 celery[redis] 会自动安装 Celery 与 Redis 交互所需要的所有组件。

2. 编写 Celery 应用与一个示例任务

创建文件 celery_app.py,这是整个后台任务的大脑:

# celery_app.py
from celery import Celery
import time

# 创建 Celery 实例,broker 和 backend 可以分别指定不同的 Redis 数据库
celery = Celery(
    'daoman_fastapi_celery',
    broker='redis://localhost:6379/0',   # 任务队列存储在 Redis 0 号库
    backend='redis://localhost:6379/1'   # 结果存放在 Redis 1 号库(隔离,便于管理)
)

# 定义一个模拟邮件发送任务
@celery.task(bind=True, name="tasks.send_email")
def send_email(self, to: str, subject: str, body: str):
    """
    异步发送邮件,附带进度上报与自动重试
    """
    try:
        # 更新任务状态为 PROGRESS,并告知当前步骤
        self.update_state(state='PROGRESS', meta={'step': 'preparing'})
        time.sleep(1)   # 模拟准备阶段耗时

        self.update_state(state='PROGRESS', meta={'step': 'sending'})
        time.sleep(2)   # 模拟实际发送

        # 成功完成,返回结果字典
        return {'status': 'success', 'to': to, 'time': time.time()}
    except Exception as exc:
        # 失败时自动重试:延迟60秒,最多重试3次
        raise self.retry(exc=exc, countdown=60, max_retries=3)

3. 集成 FastAPI 并启动整个服务

新建 main.py,提供两个接口:提交任务与查询状态。

# main.py
from fastapi import FastAPI, HTTPException
from celery.result import AsyncResult
from celery_app import celery, send_email
from pydantic import BaseModel, EmailStr

app = FastAPI(title="道满FastAPI-Celery教程")

# 请求体模型,利用 Pydantic 自动校验入参
class EmailReq(BaseModel):
    to: EmailStr
    subject: str
    body: str

@app.post("/tasks/email")
async def create_email_task(req: EmailReq):
    # 将任务推入队列,传入收件人、主题、正文
    task = send_email.delay(req.to, req.subject, req.body)
    return {
        "task_id": task.id,
        "status": "submitted",
        "message": "邮件已进入队列,预计3秒左右完成"
    }

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    task = AsyncResult(task_id, app=celery)
    # 根据任务状态返回不同的信息
    return {
        "task_id": task_id,
        "state": task.state,
        "info": task.info if task.state in ['PROGRESS', 'FAILURE'] else None,
        "result": task.result if task.state == 'SUCCESS' else None
    }

依次启动三个进程(本地调试推荐使用三个终端窗口):

  1. 启动 Redis 服务(若未启动):

    redis-server
  2. 启动 Celery Worker,并指定并发数:

    celery -A celery_app worker --loglevel=INFO --concurrency=4

    💡 --concurrency=4 表示同时处理4个任务,可根据CPU核心数调整。

  3. 启动 FastAPI 应用(开启热重载方便开发):

    uvicorn main:app --reload

此时,访问 http://127.0.0.1:8000/docs 就能看到 Swagger 文档,可以直接测试提交任务和查询状态。


核心进阶:企业级常用配置

前面的示例虽然能跑,但生产环境需要考虑更多细节:超时、重试策略、队列隔离、内存泄漏保护等。我们把这些最佳实践汇总成一个统一的配置类。

1. 完整的 Celery 配置文件

新建 config/celery_config.py

# config/celery_config.py
from kombu import Queue
import os

class CeleryConfig:
    # ---------- Broker & Backend ----------
    BROKER_URL = os.getenv('CELERY_BROKER', 'redis://localhost:6379/0')
    RESULT_BACKEND = os.getenv('CELERY_BACKEND', 'redis://localhost:6379/1')
    
    # ---------- 序列化方式 ----------
    TASK_SERIALIZER = 'json'
    ACCEPT_CONTENT = ['json']
    RESULT_SERIALIZER = 'json'
    
    # ---------- 时区 ----------
    TIMEZONE = 'Asia/Shanghai'
    ENABLE_UTC = False
    
    # ---------- 任务执行控制 ----------
    TASK_TRACK_STARTED = True          # 开启 STARTED 状态跟踪
    TASK_TIME_LIMIT = 300               # 硬超时5分钟(强制杀死子进程)
    TASK_SOFT_TIME_LIMIT = 240          # 软超时4分钟(会抛出 SoftTimeLimitExceeded 异常,可在任务内处理)
    
    # ---------- 重试策略 ----------
    TASK_RETRY_BACKOFF = True           # 指数退避:重试间隔逐渐增加,防止雪崩
    TASK_RETRY_BACKOFF_MAX = 700        # 最大退避间隔700秒
    TASK_RETRY_JITTER = True            # 随机抖动:避免大量任务在同一时刻集中重试
    
    # ---------- Worker 优化 ----------
    WORKER_PREFETCH_MULTIPLIER = 1      # 每个 Worker 一次只预取1个任务,防止任务积压在某个 Worker 上
    WORKER_MAX_TASKS_PER_CHILD = 1000   # 子进程处理1000个任务后重启,避免内存泄漏
    WORKER_DISABLE_RATE_LIMITS = True   # 在应用层不限制速率(利用 Redis 更快)
    
    # ---------- 队列路由:按任务类型分流 ----------
    TASK_QUEUES = (
        Queue('default', routing_key='default'),
        Queue('ai_tasks', routing_key='ai.#'),
        Queue('email_tasks', routing_key='email.#'),
    )
    TASK_ROUTES = {
        'tasks.ai.*': {'queue': 'ai_tasks'},      # AI相关任务进入 ai_tasks 队列
        'tasks.email.*': {'queue': 'email_tasks'}, # 邮件任务进入 email_tasks 队列
    }

celery_app.py 中引入配置:

from config.celery_config import CeleryConfig

celery = Celery('daoman_fastapi_celery')
celery.config_from_object(CeleryConfig)

这样一来,你的 Celery 就具备了生产级别的可靠性和灵活性。

2. 定时任务配置(Celery Beat)

许多业务需要定期执行的工作,比如每天凌晨清理临时文件、每隔15分钟检查队列健康状态。这可以通过 Celery Beat + 定时调度实现。

celery_beat.py 中配置节拍:

# celery_beat.py
from celery.schedules import crontab
from celery_app import celery
import os

celery.conf.beat_schedule = {
    # 每天凌晨3点清理过期临时文件
    "cleanup-temp-files": {
        "task": "tasks.maintenance.cleanup_temp",
        "schedule": crontab(hour=3, minute=0),
        "options": {"queue": "maintenance"}
    },
    # 每15分钟检查队列健康状态
    "check-queue-health": {
        "task": "tasks.monitoring.check_queue",
        "schedule": crontab(minute="*/15"),
        "options": {"queue": "monitoring"}
    }
}

启动 Beat 服务(需要单独进程):

celery -A celery_app beat --loglevel=INFO

⚠️ 注意:不要和 Worker 共用同一个进程,Beat 只负责按时发送任务,Worker 负责执行。


生产环境关键要点

1. 监控告警 —— Flower

Celery 官方提供了一个极其好用的实时监控面板 Flower,你可以直观地看到所有任务的状态、Worker 的负载、成功率等关键指标。

pip install flower
celery -A celery_app flower --port=5555 --basic_auth=admin:你的强密码

启动后访问 http://your-server:5555,输入你设置的用户名密码,就能一览任务全局。
强烈建议在生产环境配合反向代理(如 Nginx)并启用 HTTPS。

2. Docker 化部署

容器化部署能让 Worker 的环境保持统一、快速扩容。一个安全的 Dockerfile 示例如下:

# Dockerfile.worker
FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .

# 创建非 root 用户,提升安全性
RUN useradd -m celery
USER celery

# 启动命令,指定监听的队列
CMD ["celery", "-A", "celery_app", "worker", \
     "--loglevel=INFO", "--queues=default,ai_tasks"]

💡 通过 --queues 参数,你可以让专门的 Worker 只消费某些队列,例如让具有 GPU 的机器只跑 ai_tasks


总结

Celery 是 FastAPI 生态中最成熟、最灵活的异步任务队列方案,它的价值可以浓缩为:

  • ✅ 将用户请求与耗时操作彻底解耦
  • ✅ Worker 支持横向扩展,随流量轻松增减
  • ✅ 强大的重试、死信队列、定时任务功能
  • ✅ 成熟的监控工具(Flower / Prometheus 集成)
  • ✅ 与 FastAPI 的集成优雅、自然

当你面对AI 推理、图像/视频处理、批量邮件、报表生成等场景时,Celery + FastAPI 的组合会让你事半功倍。


常见问题FAQ

Q1: 任务提交后一直没有被执行?

可能的原因:

  • Redis 服务未启动,或 Worker 连接的 Broker 地址不正确
  • Worker 多队列配置错误,任务去了某个队列但没有 Worker 在监听
  • 任务路由 key 写错,导致任务被丢弃到 celery 默认队列,而 Worker 只监听特定队列

快速排查:

  1. 在 Worker 日志中查看是否出现 received task 字样
  2. 使用 Flower 面板检查队列堆积情况
  3. 临时将所有 Worker 改为监听 default 队列测试

Q2: 如何实现任务优先级?

Celery 原生没有直接的任务优先级字段,但可以通过多队列 + 不同数量的 Worker 来间接实现:

  • 为高优先级任务创建一个 high_priority 队列
  • 为该队列分配更多的 Worker 进程或独立的高配服务器
  • 低优先级队列可以稍后消费甚至限流

另一种更精细的控制方式是通过 Redis 的 BRPOP 配合 LPUSH 手动插入队列头部,但一般不推荐过度依赖,多队列方案已经足够应对绝大多数场景。

Q3: 任务超时后还能重试吗?

  • 软超时(SOFT_TIME_LIMIT:任务内可以捕获 SoftTimeLimitExceeded 异常,自行决定是否重试或做清理工作。
  • 硬超时(TIME_LIMIT:操作系统直接 SIGKILL 杀死子进程,子进程无法感知,因此无法在任务内重试。但这种情况下,Celery 本身会认为任务失败,你可以根据配置的 autoretry_for 或全局重试策略让 Worker 自动重试。

建议始终设置一个略长于软超时的硬超时,作为最后的保底手段。


🔗 相关教程