异步任务与 Celery:处理耗时的后台任务

📂 所属阶段:第五阶段 — 高级进阶(性能与架构)
🔗 相关章节:Redis 集成 · RESTful API 开发


1. 为什么需要后台任务?

用户上传图片 → 压缩处理(3秒)→ 保存
问题:用户等待 3 秒才能看到上传结果,体验差

解决方案:用户上传图片 → 立即返回 → 后台压缩 → 完成

2. Celery 安装与配置

pip install celery redis
# app/celery_app.py
from celery import Celery

celery_app = Celery(
    "daoman",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
)
celery_app.conf.update(
    task_serializer="json",
    result_serializer="json",
    timezone="Asia/Shanghai",
    task_track_started=True,
    task_time_limit=300,
)

3. 定义后台任务

# app/tasks/image_tasks.py
from app.celery_app import celery_app
from PIL import Image
import os

@celery_app.task(bind=True, name="tasks.compress_image")
def compress_image(self, image_path, output_path, quality=85):
    """压缩图片(后台执行)"""
    img = Image.open(image_path)
    img = img.convert("RGB")
    img.save(output_path, "JPEG", quality=quality, optimize=True)
    os.remove(image_path)  # 删除原图
    return output_path

@celery_app.task(name="tasks.send_welcome_email")
def send_welcome_email(user_email, username):
    """发送欢迎邮件"""
    from flask_mail import Message
    from app.extensions import mail

    msg = Message(
        subject="欢迎来到道满博客!",
        recipients=[user_email],
        body=f"亲爱的 {username},欢迎注册道满博客!"
    )
    mail.send(msg)
    return True

4. 调用任务

# app/routes/upload.py
@upload_bp.route("/upload", methods=["POST"])
@login_required
def upload():
    file = request.files["image"]
    filepath = save_image(file)

    # 提交后台压缩任务
    task = compress_image.delay(
        image_path=filepath,
        output_path=filepath.replace(".png", "_compressed.jpg"),
    )

    # 立即返回任务 ID,前端可轮询
    return jsonify({
        "task_id": task.id,
        "status": "processing"
    })

# 查询任务状态
@api_bp.route("/task/<task_id>")
def get_task_status(task_id):
    task = compress_image.AsyncResult(task_id)
    return jsonify({
        "task_id": task.id,
        "state": task.state,
        "result": task.result if task.ready() else None,
    })

5. 启动 Worker

# 启动 Celery Worker
celery -A app.celery_app worker --loglevel=info --concurrency=4

# 生产环境守护进程
celery -A app.celery_app beat --loglevel=info

6. 小结

# Celery 流程

1. 安装:pip install celery redis
2. 配置 broker=redis://...
3. 定义:@celery_app.task
4. 调用:task.delay(*args)
5. 启动:celery -A app.celery_app worker

💡 适用场景:图片/视频处理、批量导出、邮件发送、数据统计、外部 API 调用。所有超过 1 秒的操作都建议后台化。


🔗 扩展阅读