异步任务与 Celery:处理耗时的后台任务
📂 所属阶段:第五阶段 — 高级进阶(性能与架构)
🔗 相关章节:Redis 集成 · RESTful API 开发
0. 一个真实的卡顿现场
上周给博客后台上传头像时,我顺手甩了一张用 iPhone 拍的 12MB HEIC 照片(转成 PNG 后提交)。结果前端图标硬生生转了 3 秒多才拿到保存链接,差点以为接口挂了。查日志才发现,罪魁祸首是 PIL 图片压缩——处理这张图整整花了 2.8 秒。
其实很多 Web 请求里都藏着类似的“时间刺客”:压缩图片、发送邮件、导出报表、调用第三方 API……这些操作一旦同步执行,用户的感受就是无尽的 loading,服务器资源也会被慢慢耗光。
解决这个问题的标准姿势,就是引入 异步任务队列 Celery。
1. 核心思路:把“等人”变成“先回、后干”
用一句话概括:把“用户必须等结果”的同步流程,拆成“立即返回受理回执,后台暗搓搓干活”的异步流程。
以前面提到的图片上传为例,改造后的流程是这样:
flowchart LR
A[用户上传原图] --> B{Web 服务器接收}
B --> C[保存临时原图]
C --> D[提交 「压缩任务」到消息队列]
D --> E[立即返回 「任务ID + 处理中状态」 给前端]
F[Celery Worker 监听队列] --> G[拿到任务ID执行压缩]
G --> H[删除临时图/更新数据库]
E --> I[前端用任务ID轮询状态接口]
H --> J[轮询接口返回「完成状态 + 压缩后链接」]
对比改造前,体验上的改变是:从“傻等 3 秒”变成“100ms 内收到提交反馈”;对服务器来说,Web 进程能立刻释放去处理下一个请求,不会被阻塞住的耗时任务拖垮。
2. 基础设施搭建
我们选用 Redis 同时作为 消息中间件(Broker) 和 结果存储(Backend)。理由很实际:绝大多数项目已经用 Redis 做缓存了,没必要为 Celery 单独引入 RabbitMQ 或 Kafka,徒增维护复杂度。
2.1 安装依赖
在虚拟环境中直接安装:
pip install celery redis pillow flask-mail
(pillow 和 flask-mail 是本次示例用到的额外库,按需增减)
2.2 最小化 Celery 配置
新建 app/celery_app.py,将 Celery 实例独立出来,既能避免循环导入,也方便生产环境的管理:
# app/celery_app.py
from celery import Celery
# 1. 初始化 Celery 核心实例
celery_app = Celery(
main="daoman", # 项目名,会在日志和任务命名空间中用到
broker="redis://localhost:6379/0", # Broker:存放待处理的任务
backend="redis://localhost:6379/1", # Backend:存放任务结果和状态
)
# 2. 补充通用配置(生产环境建议抽取为独立配置类)
celery_app.conf.update(
# 序列化格式:JSON 兼容性最好,但不能传函数、类等复杂对象
task_serializer="json",
result_serializer="json",
accept_content=["json"],
# 时区:与项目保持一致
timezone="Asia/Shanghai",
enable_utc=False,
# 追踪任务开始状态
task_track_started=True,
# 超时保护,避免某个任务卡死 Worker
task_time_limit=300, # 硬超时:5 分钟后强制终止
task_soft_time_limit=270, # 软超时:4.5 分钟时抛出异常,可捕获处理
)
3. 编写可复用的后台任务
Celery 任务本质上就是给普通函数加个装饰器,我们可以把常见的耗时操作抽成独立的任务模块。
3.1 图片压缩任务
新建 app/tasks/image_tasks.py,带上必要的容错逻辑,比如处理损坏图片、删除失败清理等:
# app/tasks/image_tasks.py
import os
from PIL import Image
from PIL.UnidentifiedImageError import UnidentifiedImageError
from app.celery_app import celery_app
@celery_app.task(bind=True, name="tasks.compress_image") # bind=True 能拿到 self(当前任务实例)
def compress_image(
self,
temp_path: str,
output_path: str,
quality: int = 85,
max_width: int = 1920,
max_height: int = 1080,
):
"""后台压缩图片:统一转 JPG、限制尺寸、删除临时原图"""
try:
# 1. 读取并转换格式
with Image.open(temp_path) as img:
# 统一转成 RGB(丢弃 Alpha 通道,适配 JPG)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
# 等比例缩放,不超过最大宽高
img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
# 保存压缩后的图片
img.save(output_path, "JPEG", quality=quality, optimize=True)
# 2. 压缩成功后删除临时原图
if os.path.exists(temp_path):
os.remove(temp_path)
# 3. 返回最终结果(会存入 Backend)
return {"status": "success", "path": output_path}
except UnidentifiedImageError as e:
# 遇到损坏或非图片格式,5 秒后重试,最多 2 次
self.retry(exc=e, countdown=5, max_retries=2)
except Exception as e:
# 其他错误(磁盘满、权限不足……)直接返回错误信息
return {"status": "error", "msg": str(e)}
3.2 发送欢迎邮件
新建 app/tasks/mail_tasks.py。注意,Worker 进程和 Web 进程是分离的,所以使用 Flask-Mail 等需要应用上下文的扩展时,必须手动激活:
# app/tasks/mail_tasks.py
from flask import current_app
from flask_mail import Message
from app.celery_app import celery_app
from app.extensions import mail
@celery_app.task(name="tasks.send_welcome_email")
def send_welcome_email(user_email: str, username: str):
"""后台发送注册欢迎邮件"""
try:
with current_app.app_context(): # 手动激活 Flask 应用上下文
msg = Message(
subject=f"🎉 欢迎来到{current_app.config['SITE_NAME']}!",
recipients=[user_email],
body=f"亲爱的 {username},\n\n感谢注册道满博客!\n\n你可以开始发布文章、评论互动了~\n\n道满博客团队",
)
mail.send(msg)
return {"status": "success", "to": user_email}
except Exception as e:
return {"status": "error", "msg": str(e)}
4. 在 Web 路由中调用任务
调用 Celery 任务不能用 function(),而得用:
function.delay() — 简化版,只支持位置参数和关键字参数
function.apply_async() — 完整版,可以设置倒计时、重试策略、指定队列等
4.1 提交压缩任务
假设有一个上传蓝图 app/routes/upload.py:
# app/routes/upload.py
import os, uuid
from flask import Blueprint, request, jsonify, current_app
from flask_login import login_required, current_user
from app.tasks.image_tasks import compress_image
upload_bp = Blueprint("upload", __name__, url_prefix="/api/upload")
@upload_bp.route("/avatar", methods=["POST"])
@login_required
def upload_avatar():
# 1. 接收并保存临时原图
file = request.files.get("avatar")
if not file or not file.filename:
return jsonify({"code": 400, "msg": "请上传头像"}), 400
# 生成临时文件名(用 UUID 防止重复)
temp_name = f"temp_{current_user.id}_{uuid.uuid4().hex}{os.path.splitext(file.filename)[0]}.png"
temp_path = os.path.join(current_app.config["TEMP_UPLOAD_FOLDER"], temp_name)
file.save(temp_path)
# 2. 生成压缩后的文件名
compressed_name = f"avatar_{current_user.id}_{uuid.uuid4().hex}.jpg"
compressed_path = os.path.join(current_app.config["AVATAR_UPLOAD_FOLDER"], compressed_name)
# 3. 提交后台压缩任务(delay 简化调用)
task = compress_image.delay(
temp_path=temp_path,
output_path=compressed_path,
quality=90,
max_width=256,
max_height=256,
)
# 4. 立即返回任务 ID 和当前状态
return jsonify({
"code": 200,
"msg": "头像上传中,请稍后刷新",
"data": {"task_id": task.id, "state": task.state},
})
4.2 查询任务状态
前端拿到任务 ID 后,可以每 200ms~1s 轮询以下接口:
# app/routes/task.py
from flask import Blueprint, jsonify
from app.celery_app import celery_app
task_bp = Blueprint("task", __name__, url_prefix="/api/task")
@task_bp.route("/<task_id>", methods=["GET"])
def get_task_status(task_id):
"""查询任意任务的状态与结果"""
task = celery_app.AsyncResult(task_id)
resp = {
"code": 200,
"data": {
"task_id": task.id,
"state": task.state, # PENDING / STARTED / SUCCESS / FAILURE / RETRY
},
}
# 只有任务执行完毕后才返回结果
if task.ready():
resp["data"]["result"] = task.result
return jsonify(resp)
5. 启动 Celery 服务
切记:Worker 和 Web 进程必须分开启动,不能挤在同一个终端里。
5.1 开发环境
# 注意:Celery 5.x 官方不再支持 Windows,建议使用 WSL2 或 Docker
# 确实需要在 Windows 上调试,可降级到 Celery 4.4.7 + gevent
celery -A app.celery_app worker --loglevel=info --concurrency=2
-A 指定 Celery 实例位置(会自动找 celery_app)
--loglevel=info 输出详情,方便调试
--concurrency=2 启动 2 个 Worker 进程(一般设为 CPU 核心数的 1~2 倍)
5.2 生产环境
生产环境不能直接终端启动,推荐用 systemd(Linux)或 Docker 进行守护进程化。下面是一个简单的 Docker Compose 配置片段:
# docker-compose.yml
version: '3.8'
services:
# 你的 Web 服务和 Redis 等省略...
celery-worker:
build: .
command: celery -A app.celery_app worker --loglevel=warning --concurrency=4
volumes:
- ./uploads:/app/uploads # 挂载上传目录,保证 Worker 能操作文件
depends_on:
- redis
environment:
- FLASK_ENV=production
6. 核心流程速记
一张极简清单,帮你把整个流程刻进脑子里:
Celery 极简流程
1. 准备环境:安装 celery + redis
2. 抽离配置:单独建 celery_app.py
3. 封装任务:用 @celery_app.task 装饰普通函数
4. 提交任务:用 task.delay() 或 apply_async()
5. 启动监听:终端 / 守护进程启动 Worker
6. 查询结果:用 AsyncResult(task_id)
7. 场合适配 & 避坑指南
✅ 强烈建议走后台任务的场景
- 图片、视频、音频等媒体处理
- 批量数据导出(Excel、PDF)
- 邮件、短信、推送等通知发送
- 耗时超过 1 秒的第三方 API 调用
- 后台统计、数据清洗等定时任务
❌ 不要用后台任务的场景
- 必须立即返回结果的操作(比如登录验证、简单查询)
- 逻辑极简单、耗时 < 50ms 的操作(提交到 Redis 本身也有开销)
🔗 扩展阅读