#异步任务与 Celery:处理耗时的后台任务
📂 所属阶段:第五阶段 — 高级进阶(性能与架构)
🔗 相关章节:Redis 集成 · RESTful API 开发
#1. 为什么需要后台任务?
用户上传图片 → 压缩处理(3秒)→ 保存
问题:用户等待 3 秒才能看到上传结果,体验差
解决方案:用户上传图片 → 立即返回 → 后台压缩 → 完成#2. Celery 安装与配置
pip install celery redis# app/celery_app.py
from celery import Celery
celery_app = Celery(
"daoman",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
celery_app.conf.update(
task_serializer="json",
result_serializer="json",
timezone="Asia/Shanghai",
task_track_started=True,
task_time_limit=300,
)#3. 定义后台任务
# app/tasks/image_tasks.py
from app.celery_app import celery_app
from PIL import Image
import os
@celery_app.task(bind=True, name="tasks.compress_image")
def compress_image(self, image_path, output_path, quality=85):
"""压缩图片(后台执行)"""
img = Image.open(image_path)
img = img.convert("RGB")
img.save(output_path, "JPEG", quality=quality, optimize=True)
os.remove(image_path) # 删除原图
return output_path
@celery_app.task(name="tasks.send_welcome_email")
def send_welcome_email(user_email, username):
"""发送欢迎邮件"""
from flask_mail import Message
from app.extensions import mail
msg = Message(
subject="欢迎来到道满博客!",
recipients=[user_email],
body=f"亲爱的 {username},欢迎注册道满博客!"
)
mail.send(msg)
return True#4. 调用任务
# app/routes/upload.py
@upload_bp.route("/upload", methods=["POST"])
@login_required
def upload():
file = request.files["image"]
filepath = save_image(file)
# 提交后台压缩任务
task = compress_image.delay(
image_path=filepath,
output_path=filepath.replace(".png", "_compressed.jpg"),
)
# 立即返回任务 ID,前端可轮询
return jsonify({
"task_id": task.id,
"status": "processing"
})
# 查询任务状态
@api_bp.route("/task/<task_id>")
def get_task_status(task_id):
task = compress_image.AsyncResult(task_id)
return jsonify({
"task_id": task.id,
"state": task.state,
"result": task.result if task.ready() else None,
})#5. 启动 Worker
# 启动 Celery Worker
celery -A app.celery_app worker --loglevel=info --concurrency=4
# 生产环境守护进程
celery -A app.celery_app beat --loglevel=info#6. 小结
# Celery 流程
1. 安装:pip install celery redis
2. 配置 broker=redis://...
3. 定义:@celery_app.task
4. 调用:task.delay(*args)
5. 启动:celery -A app.celery_app worker💡 适用场景:图片/视频处理、批量导出、邮件发送、数据统计、外部 API 调用。所有超过 1 秒的操作都建议后台化。
🔗 扩展阅读

