FastAPI Nginx与Gunicorn生产部署完全指南

📂 所属阶段:第五阶段 — 工程化与部署(实战篇)
🔗 相关章节:Docker容器化部署 · Pydantic Settings多环境配置

目录

生产部署架构概述

为什么需要Nginx + Gunicorn架构?

在生产环境中,直接使用Uvicorn运行FastAPI应用存在诸多局限性:

单一进程架构问题:
┌─────────────────────────────────────────────────────┐
│  用户请求 → Uvicorn → FastAPI → 处理请求         │
│  (单进程,阻塞时所有请求等待)                       │
└─────────────────────────────────────────────────────┘

Nginx + Gunicorn架构优势:
┌─────────────────────────────────────────────────────┐
│  用户请求 → Nginx → Gunicorn Workers → FastAPI    │
│  (多进程,负载均衡,SSL终止,静态文件)               │
└─────────────────────────────────────────────────────┘

核心组件作用

组件职责优势
Nginx反向代理、SSL终止、静态文件服务高并发、低内存占用、成熟稳定
GunicornWSGI服务器、进程管理多进程、热重启、优雅关闭
UvicornASGI服务器、异步处理异步支持、高性能、低延迟
FastAPI应用逻辑处理类型提示、自动生成文档、依赖注入

典型生产架构

互联网用户 → DNS → CDN → Nginx集群 → Gunicorn Workers → 数据库集群

                 监控系统 ← 日志收集 ← 应用日志

Nginx反向代理配置

基础反向代理配置

# /etc/nginx/sites-available/daoman-api
upstream daoman_api {
    # Gunicorn workers负载均衡
    server 127.0.0.1:8000 weight=3 max_fails=2 fail_timeout=30s;
    server 127.0.0.1:8001 weight=3 max_fails=2 fail_timeout=30s;
    server 127.0.0.1:8002 weight=2 max_fails=2 fail_timeout=30s;
    server 127.0.0.1:8003 weight=2 max_fails=2 fail_timeout=30s;
    
    # 保持连接
    keepalive 32;
}

server {
    listen 80;
    server_name your-domain.com www.your-domain.com;
    
    # HTTP自动重定向到HTTPS
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name your-domain.com www.your-domain.com;
    
    # SSL证书配置
    ssl_certificate /etc/ssl/certs/your-domain.crt;
    ssl_certificate_key /etc/ssl/private/your-domain.key;
    ssl_trusted_certificate /etc/ssl/certs/ca-bundle.crt;
    
    # SSL安全配置
    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA384;
    ssl_prefer_server_ciphers off;
    ssl_session_cache shared:SSL:10m;
    ssl_session_timeout 10m;
    ssl_session_tickets off;
    
    # 安全头部
    add_header X-Frame-Options "SAMEORIGIN" always;
    add_header X-Content-Type-Options "nosniff" always;
    add_header X-XSS-Protection "1; mode=block" always;
    add_header Referrer-Policy "no-referrer-when-downgrade" always;
    add_header Content-Security-Policy "default-src 'self' http: https: data: blob: 'unsafe-inline'" always;
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;
    
    # 日志配置
    access_log /var/log/nginx/daoman-access.log main buffer=32k flush=5m;
    error_log /var/log/nginx/daoman-error.log warn;
    
    # 客户端请求限制
    client_max_body_size 100M;
    client_body_timeout 120s;
    client_header_timeout 120s;
    
    # Gzip压缩
    gzip on;
    gzip_vary on;
    gzip_min_length 1024;
    gzip_proxied expired no-cache no-store private must-revalidate auth;
    gzip_types
        application/atom+xml
        application/javascript
        application/json
        application/ld+json
        application/manifest+json
        application/rss+xml
        application/vnd.geo+json
        application/vnd.ms-fontobject
        application/wasm
        application/x-font-ttf
        application/x-web-app-manifest+json
        application/xhtml+xml
        application/xml
        font/opentype
        image/bmp
        image/svg+xml
        text/cache-manifest
        text/calendar
        text/css
        text/javascript
        text/plain
        text/xml;
    
    # 主API代理
    location / {
        proxy_pass http://daoman_api;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_set_header X-Forwarded-Host $host;
        proxy_set_header X-Original-Forwarded-For $remote_addr;
        
        # 超时配置
        proxy_connect_timeout 60s;
        proxy_send_timeout 60s;
        proxy_read_timeout 60s;
        
        # 缓冲配置
        proxy_buffering on;
        proxy_buffer_size 128k;
        proxy_buffers 4 256k;
        proxy_busy_buffers_size 256k;
        
        # 重试和错误处理
        proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504;
        proxy_next_upstream_tries 3;
        proxy_next_upstream_timeout 10s;
    }
    
    # 静态文件服务
    location /static/ {
        alias /var/www/daoman/static/;
        expires 30d;
        add_header Cache-Control "public, immutable";
        access_log off;
    }
    
    # 媒体文件服务
    location /media/ {
        alias /var/www/daoman/media/;
        expires 7d;
        add_header Cache-Control "public";
        access_log off;
    }
    
    # 健康检查端点
    location /health {
        access_log off;
        proxy_pass http://daoman_api/health;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
    
    # WebSocket支持
    location /ws/ {
        proxy_pass http://daoman_api;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_read_timeout 86400;
        proxy_send_timeout 86400;
    }
    
    # API文档
    location /docs {
        proxy_pass http://daoman_api;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
    }
    
    location /redoc {
        proxy_pass http://daoman_api;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
    }
    
    location /openapi.json {
        proxy_pass http://daoman_api;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
    }
}

Nginx性能优化配置

# /etc/nginx/nginx.conf - 主配置文件优化
user www-data;
worker_processes auto;
worker_rlimit_nofile 100000;

events {
    worker_connections 2048;
    multi_accept on;
    use epoll;
}

http {
    # 基础配置
    sendfile on;
    tcp_nopush on;
    tcp_nodelay on;
    keepalive_timeout 65;
    keepalive_requests 100;
    types_hash_max_size 2048;
    
    # 客户端请求限制
    client_body_timeout 12;
    client_header_timeout 12;
    send_timeout 10;
    client_max_body_size 100m;
    
    # 限制模块配置
    limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
    limit_req_zone $binary_remote_addr zone=login:10m rate=1r/s;
    limit_conn_zone $binary_remote_addr zone=perip:10m;
    
    # 日志格式
    log_format main '$remote_addr - $remote_user [$time_local] "$request" '
                   '$status $body_bytes_sent "$http_referer" '
                   '"$http_user_agent" "$http_x_forwarded_for" '
                   'rt=$request_time uct="$upstream_connect_time" '
                   'uht="$upstream_header_time" urt="$upstream_response_time"';
    
    # Gzip配置
    gzip on;
    gzip_vary on;
    gzip_min_length 1024;
    gzip_comp_level 6;
    gzip_types
        text/plain
        text/css
        text/xml
        text/javascript
        application/json
        application/javascript
        application/xml+rss
        application/atom+xml
        image/svg+xml;
    
    # 包含站点配置
    include /etc/nginx/conf.d/*.conf;
    include /etc/nginx/sites-enabled/*;
}

Gunicorn高性能配置

Gunicorn配置文件

# gunicorn.conf.py - Gunicorn生产配置
import multiprocessing
import os

# 服务器配置
bind = "0.0.0.0:8000"  # 绑定地址和端口
backlog = 2048  # 监听队列大小

# 进程配置
workers = multiprocessing.cpu_count() * 2 + 1  # 工作进程数 (推荐: CPU核心数 * 2 + 1)
worker_class = "uvicorn.workers.UvicornWorker"  # 使用Uvicorn作为worker
worker_connections = 1000  # 每个worker的最大连接数
max_requests = 1000  # 每个worker处理的最大请求数后重启
max_requests_jitter = 100  # 随机抖动,避免同时重启
timeout = 300  # 请求超时时间
graceful_timeout = 30  # 优雅关闭超时时间
keepalive = 5  # HTTP长连接超时时间

# 进程命名
proc_name = "daoman_api"

# 服务器软件标识
server_software = "DaomanAPI/1.0"

# 重启配置
max_requests = 1000  # 每个worker处理1000个请求后重启
max_requests_jitter = 100  # 随机抖动避免同时重启

# 日志配置
accesslog = "/var/log/gunicorn/access.log"  # 访问日志
errorlog = "/var/log/gunicorn/error.log"    # 错误日志
loglevel = "info"  # 日志级别
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'

# 进程管理
preload_app = True  # 预加载应用
daemon = False  # 不作为守护进程运行
enable_stdio_inheritance = True  # 继承stdio

# 资源限制
limit_request_line = 4094  # HTTP请求行最大长度
limit_request_fields = 100  # HTTP头部字段最大数量
limit_request_field_size = 8190  # HTTP头部字段最大长度

# SSL配置(如果需要)
# keyfile = "/path/to/keyfile"
# certfile = "/path/to/certfile"

启动脚本优化

#!/bin/bash
# start_gunicorn.sh - Gunicorn启动脚本

# 设置环境变量
export PYTHONPATH="/opt/daoman:$PYTHONPATH"
export ENV="production"
export LOG_LEVEL="info"

# 创建日志目录
mkdir -p /var/log/gunicorn
chown www-data:www-data /var/log/gunicorn

# 获取CPU核心数
CPU_CORES=$(nproc)
WORKERS=$((CPU_CORES * 2 + 1))

echo "Starting Daoman API with $WORKERS workers..."

# 启动Gunicorn
exec gunicorn main:app \
    --config gunicorn.conf.py \
    --workers $WORKERS \
    --bind 0.0.0.0:8000 \
    --worker-class uvicorn.workers.UvicornWorker \
    --worker-connections 1000 \
    --max-requests 1000 \
    --max-requests-jitter 100 \
    --timeout 300 \
    --graceful-timeout 30 \
    --keep-alive 5 \
    --preload-app \
    --access-logfile /var/log/gunicorn/access.log \
    --error-logfile /var/log/gunicorn/error.log \
    --log-level info \
    --capture-output \
    --enable-stdio-inheritance

多端口Gunicorn配置

# multi_port_gunicorn.py - 多端口配置示例
import multiprocessing
import socket
from contextlib import closing

def find_free_port():
    """查找可用端口"""
    with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
        s.bind(('', 0))
        s.listen(1)
        port = s.getsockname()[1]
    return port

# 生成多个Gunicorn配置
def generate_multi_port_config():
    """生成多端口配置"""
    cpu_cores = multiprocessing.cpu_count()
    ports = [find_free_port() for _ in range(min(4, cpu_cores))]
    
    configs = []
    for i, port in enumerate(ports):
        config = {
            'bind': f'0.0.0.0:{port}',
            'workers': 1,
            'worker_class': 'uvicorn.workers.UvicornWorker',
            'worker_connections': 1000,
            'max_requests': 1000,
            'max_requests_jitter': 100,
            'timeout': 300,
            'graceful_timeout': 30,
            'keepalive': 5,
            'preload_app': True,
        }
        configs.append(config)
    
    return configs, ports

# 使用supervisor管理多个Gunicorn实例
"""
[group:daoman_api]
programs:daoman_api_8000,daoman_api_8001,daoman_api_8002,daoman_api_8003

[program:daoman_api_8000]
command=gunicorn main:app --bind 0.0.0.0:8000 --workers 1 --worker-class uvicorn.workers.UvicornWorker
directory=/opt/daoman
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/gunicorn/daoman_api_8000.log

[program:daoman_api_8001]
command=gunicorn main:app --bind 0.0.0.0:8001 --workers 1 --worker-class uvicorn.workers.UvicornWorker
directory=/opt/daoman
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/gunicorn/daoman_api_8001.log
"""

SSL证书配置与HTTPS

Let's Encrypt自动化配置

#!/bin/bash
# ssl_setup.sh - SSL证书自动化配置

# 安装Certbot
sudo apt update
sudo apt install certbot python3-certbot-nginx -y

# 获取SSL证书
sudo certbot --nginx -d your-domain.com -d www.your-domain.com --agree-tos --email your-email@example.com

# 测试自动续期
sudo certbot renew --dry-run

# 设置自动续期定时任务
(crontab -l 2>/dev/null; echo "0 12 * * * /usr/bin/certbot renew --quiet") | crontab -

SSL安全配置

# ssl_configuration.conf - SSL安全配置
# SSL协议和加密套件
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA384:ECDHE-RSA-CHACHA20-POLY1305;
ssl_prefer_server_ciphers off;

# 会话管理
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
ssl_session_tickets off;

# OCSP装订
ssl_stapling on;
ssl_stapling_verify on;
resolver 8.8.8.8 8.8.4.4 valid=300s;
resolver_timeout 5s;

# HSTS
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;

# SSL安全头部
add_header X-Frame-Options DENY always;
add_header X-Content-Type-Options nosniff always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;

证书监控脚本

# ssl_monitor.py - SSL证书监控
import ssl
import socket
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
import subprocess

def check_ssl_expiry(domain, port=443, warning_days=30):
    """检查SSL证书到期时间"""
    context = ssl.create_default_context()
    
    with socket.create_connection((domain, port), timeout=10) as sock:
        with context.wrap_socket(sock, server_hostname=domain) as ssock:
            cert = ssock.getpeercert()
            
    # 解析证书到期时间
    expiry_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
    days_until_expiry = (expiry_date - datetime.now()).days
    
    return {
        'domain': domain,
        'expiry_date': expiry_date.strftime('%Y-%m-%d'),
        'days_until_expiry': days_until_expiry,
        'is_expired': days_until_expiry <= 0,
        'needs_renewal': days_until_expiry <= warning_days
    }

def renew_certificates(domains):
    """续订证书"""
    for domain in domains:
        result = subprocess.run([
            'certbot', 'renew', '--cert-name', domain, '--quiet'
        ], capture_output=True, text=True)
        
        if result.returncode == 0:
            print(f"Successfully renewed certificate for {domain}")
        else:
            print(f"Failed to renew certificate for {domain}: {result.stderr}")

def send_alert_email(recipients, subject, body):
    """发送告警邮件"""
    msg = MIMEText(body)
    msg['Subject'] = subject
    msg['From'] = 'monitor@your-domain.com'
    msg['To'] = ', '.join(recipients)
    
    # 发送邮件(需要配置SMTP服务器)
    # smtp_server = smtplib.SMTP('smtp.your-server.com', 587)
    # smtp_server.starttls()
    # smtp_server.login('username', 'password')
    # smtp_server.send_message(msg)
    # smtp_server.quit()

# 检查证书到期时间
domains_to_check = ['your-domain.com', 'www.your-domain.com']
for domain in domains_to_check:
    cert_info = check_ssl_expiry(domain)
    print(f"Domain: {cert_info['domain']}")
    print(f"Expiry: {cert_info['expiry_date']}")
    print(f"Days until expiry: {cert_info['days_until_expiry']}")
    print(f"Needs renewal: {cert_info['needs_renewal']}")
    print("-" * 50)

负载均衡与高可用

Nginx负载均衡配置

# load_balancer.conf - 负载均衡配置
upstream daoman_backend {
    # 加权轮询
    server backend1.daoman.com:8000 weight=3 max_fails=2 fail_timeout=30s;
    server backend2.daoman.com:8000 weight=3 max_fails=2 fail_timeout=30s;
    server backend3.daoman.com:8000 weight=2 max_fails=2 fail_timeout=30s;
    
    # IP哈希(保持会话)
    ip_hash;
    
    # 保持连接
    keepalive 32;
}

# 健康检查
upstream daoman_backend_health {
    server backend1.daoman.com:8000 max_fails=2 fail_timeout=30s;
    server backend2.daoman.com:8000 max_fails=2 fail_timeout=30s;
    server backend3.daoman.com:8000 max_fails=2 fail_timeout=30s;
    
    # 健康检查配置
    health_check interval=10 fails=2 passes=3 uri=/health;
}

# 主服务器配置
server {
    listen 80;
    server_name api.daoman.com;
    
    location / {
        proxy_pass http://daoman_backend;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        
        # 负载均衡相关配置
        proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504;
        proxy_next_upstream_tries 3;
        proxy_next_upstream_timeout 10s;
    }
}

HAProxy配置(备选方案)

# haproxy.cfg - HAProxy负载均衡配置
global
    log stdout local0
    maxconn 4096
    daemon

defaults
    log global
    mode http
    option httplog
    option dontlognull
    retries 3
    option redispatch
    timeout connect 5000ms
    timeout client 50000ms
    timeout server 50000ms

frontend daoman_frontend
    bind *:80
    bind *:443 ssl crt /etc/ssl/private/daoman.pem
    default_backend daoman_backend
    option forwardfor
    option httpclose

backend daoman_backend
    balance roundrobin
    option httpchk GET /health
    server backend1 127.0.0.1:8000 check inter 2000 rise 2 fall 3
    server backend2 127.0.0.1:8001 check inter 2000 rise 2 fall 3
    server backend3 127.0.0.1:8002 check inter 2000 rise 2 fall 3
    server backend4 127.0.0.1:8003 check inter 2000 rise 2 fall 3

应用层健康检查

# health_check.py - 应用层健康检查
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime
import asyncio
import logging
import psutil
import platform

app = FastAPI()

class HealthStatus(BaseModel):
    status: str
    timestamp: str
    uptime: float
    version: str
    services: dict
    system: dict

@app.get("/health", response_model=HealthStatus)
async def health_check():
    """健康检查端点"""
    import time
    start_time = time.time()
    
    # 检查各个服务状态
    services_status = {
        "database": await check_database_connection(),
        "redis": await check_redis_connection(),
        "external_api": await check_external_api(),
        "disk_space": await check_disk_space(),
        "memory": await check_memory_usage(),
    }
    
    # 获取系统信息
    system_info = {
        "cpu_percent": psutil.cpu_percent(interval=1),
        "memory_percent": psutil.virtual_memory().percent,
        "disk_percent": psutil.disk_usage('/').percent,
        "platform": platform.platform(),
        "python_version": platform.python_version(),
    }
    
    # 计算总状态
    service_statuses = list(services_status.values())
    overall_status = "healthy" if all(service_statuses) else "degraded"
    
    return HealthStatus(
        status=overall_status,
        timestamp=datetime.now().isoformat(),
        uptime=time.time() - start_time,
        version="1.0.0",
        services=services_status,
        system=system_info
    )

async def check_database_connection():
    """检查数据库连接"""
    try:
        # 实现数据库连接检查逻辑
        await asyncio.sleep(0.1)  # 模拟数据库调用
        return True
    except Exception as e:
        logging.error(f"Database connection check failed: {e}")
        return False

async def check_redis_connection():
    """检查Redis连接"""
    try:
        # 实现Redis连接检查逻辑
        await asyncio.sleep(0.05)  # 模拟Redis调用
        return True
    except Exception as e:
        logging.error(f"Redis connection check failed: {e}")
        return False

async def check_external_api():
    """检查外部API"""
    try:
        # 实现外部API检查逻辑
        await asyncio.sleep(0.1)  # 模拟外部调用
        return True
    except Exception as e:
        logging.error(f"External API check failed: {e}")
        return False

async def check_disk_space():
    """检查磁盘空间"""
    try:
        disk_usage = psutil.disk_usage('/')
        # 如果磁盘使用率超过90%,返回False
        return disk_usage.percent < 90
    except Exception as e:
        logging.error(f"Disk space check failed: {e}")
        return False

async def check_memory_usage():
    """检查内存使用"""
    try:
        memory_percent = psutil.virtual_memory().percent
        # 如果内存使用率超过90%,返回False
        return memory_percent < 90
    except Exception as e:
        logging.error(f"Memory usage check failed: {e}")
        return False

# 就绪探针(用于Kubernetes等编排系统)
@app.get("/ready")
async def readiness_check():
    """就绪检查端点"""
    # 实现就绪检查逻辑
    return {"status": "ready"}

# 存活探针
@app.get("/live")
async def liveness_check():
    """存活检查端点"""
    return {"status": "alive"}

安全加固措施

Nginx安全配置

# security_config.conf - Nginx安全配置
# 防止点击劫持
add_header X-Frame-Options "SAMEORIGIN" always;

# 防止MIME类型嗅探
add_header X-Content-Type-Options "nosniff" always;

# XSS防护
add_header X-XSS-Protection "1; mode=block" always;

# 内容安全策略
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; font-src 'self' data:; connect-src 'self'; frame-ancestors 'none';" always;

# 引用策略
add_header Referrer-Policy "strict-origin-when-cross-origin" always;

# HSTS
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;

# 限制请求方法
limit_except GET POST HEAD {
    deny all;
}

# 限制请求大小
client_max_body_size 10M;

# 隐藏Nginx版本号
server_tokens off;

# 防止敏感文件访问
location ~ /\. {
    deny all;
    access_log off;
    log_not_found off;
}

# 防止访问备份文件
location ~* \.(bak|backup|old|orig|save|swp|tmp)$ {
    deny all;
    access_log off;
    log_not_found off;
}

# 防止访问配置文件
location ~* \.(conf|config|ini|log|sql|sh|env)$ {
    deny all;
    access_log off;
    log_not_found off;
}

# 限制API访问频率
location /api/ {
    # 限制IP请求频率
    limit_req zone=api burst=10 nodelay;
    limit_req_status 429;
    
    # 限制并发连接数
    limit_conn perip 10;
    limit_conn_status 429;
}

应用层安全配置

# security_middleware.py - 安全中间件
from fastapi import FastAPI, Request, Response
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
import time
import hashlib
import hmac

app = FastAPI()

class SecurityMiddleware(BaseHTTPMiddleware):
    """安全中间件"""
    
    async def dispatch(self, request: Request, call_next):
        # 添加安全头部
        response = await call_next(request)
        
        # 添加安全头部
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        
        # 防止暴力破解
        if request.method == "POST" and "/auth/" in request.url.path:
            if await self.check_rate_limit(request):
                return JSONResponse(
                    status_code=429,
                    content={"detail": "Too many requests"}
                )
        
        return response
    
    async def check_rate_limit(self, request: Request):
        """检查请求频率限制"""
        # 实现请求频率限制逻辑
        # 这里只是示例
        return False

# 添加安全中间件
app.add_middleware(SecurityMiddleware)

# 添加可信主机中间件
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["your-domain.com", "www.your-domain.com", "*.your-domain.com"]
)

# CSRF保护(如果需要)
class CSRFProtection:
    """CSRF保护"""
    
    def __init__(self, secret_key: str):
        self.secret_key = secret_key
    
    def generate_token(self, request: Request) -> str:
        """生成CSRF令牌"""
        timestamp = str(int(time.time()))
        data = f"{request.client.host}:{timestamp}"
        signature = hmac.new(
            self.secret_key.encode(),
            data.encode(),
            hashlib.sha256
        ).hexdigest()
        return f"{timestamp}.{signature}"
    
    def verify_token(self, request: Request, token: str) -> bool:
        """验证CSRF令牌"""
        try:
            timestamp, signature = token.split(".", 1)
            expected_timestamp = str(int(time.time()))
            
            # 检查时间戳(令牌有效期5分钟)
            if abs(int(expected_timestamp) - int(timestamp)) > 300:
                return False
            
            data = f"{request.client.host}:{timestamp}"
            expected_signature = hmac.new(
                self.secret_key.encode(),
                data.encode(),
                hashlib.sha256
            ).hexdigest()
            
            return hmac.compare_digest(signature, expected_signature)
        except:
            return False

性能优化策略

Gunicorn性能优化

# performance_config.py - Gunicorn性能配置
import multiprocessing
import os

def get_optimal_workers():
    """获取最优工作进程数"""
    cpu_count = multiprocessing.cpu_count()
    # 对于I/O密集型应用,通常设置为CPU核心数的2倍
    # 对于CPU密集型应用,通常设置为CPU核心数
    return min(cpu_count * 2, 8)  # 最大不超过8个进程

def get_optimal_connections():
    """获取最优连接数"""
    # 每个worker的连接数
    return 1000

class PerformanceConfig:
    """性能配置类"""
    
    @staticmethod
    def get_gunicorn_config():
        """获取Gunicorn性能配置"""
        cpu_count = multiprocessing.cpu_count()
        
        return {
            # 服务器配置
            'bind': '0.0.0.0:8000',
            'backlog': 2048,
            
            # 进程配置
            'workers': get_optimal_workers(),
            'worker_class': 'uvicorn.workers.UvicornWorker',
            'worker_connections': get_optimal_connections(),
            'max_requests': 1000,
            'max_requests_jitter': 100,
            'timeout': 300,
            'graceful_timeout': 30,
            'keepalive': 5,
            
            # 性能优化
            'preload_app': True,
            'worker_tmp_dir': '/dev/shm',  # 使用内存临时目录
            'max_keepalive_requests': 100,  # 最大长连接请求数
            'keepalive_timeout': 2,        # 长连接超时时间
            
            # 日志配置
            'accesslog': '/var/log/gunicorn/access.log',
            'errorlog': '/var/log/gunicorn/error.log',
            'loglevel': 'info',
        }

# 内存优化配置
"""
# 在gunicorn.conf.py中添加
worker_tmp_dir = '/dev/shm'  # 使用内存临时目录,提高I/O性能
max_keepalive_requests = 100  # 限制长连接请求数,防止内存泄漏
keepalive = 5  # 合理的长连接超时时间
"""

# 预加载优化
"""
preload_app = True  # 预加载应用,减少worker启动时间
worker_class = 'uvicorn.workers.UvicornWorker'  # 使用高性能worker
"""

应用层性能优化

# performance_optimization.py - 应用层性能优化
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
import asyncio
import time
from functools import wraps
import aiocache
from aiocache import cached, Cache
from aiocache.serializers import JsonSerializer

app = FastAPI()

class PerformanceMiddleware(BaseHTTPMiddleware):
    """性能监控中间件"""
    
    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        
        response = await call_next(request)
        
        process_time = time.time() - start_time
        response.headers["X-Process-Time"] = str(process_time)
        
        # 记录慢查询
        if process_time > 1.0:  # 超过1秒的请求
            print(f"Slow request: {request.url} took {process_time:.2f}s")
        
        return response

app.add_middleware(PerformanceMiddleware)

# 缓存配置
cache = Cache(
    aiocache.SimpleMemoryCache,
    serializer=JsonSerializer(),
    ttl=300  # 5分钟缓存
)

# 缓存装饰器
@cached(cache=cache, ttl=300)
async def get_expensive_data(param: str):
    """带缓存的昂贵操作"""
    # 模拟昂贵的数据库查询或外部API调用
    await asyncio.sleep(0.1)
    return {"data": f"expensive result for {param}"}

# 数据库连接池优化
"""
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"

engine = create_async_engine(
    DATABASE_URL,
    pool_size=20,  # 连接池大小
    max_overflow=30,  # 最大溢出连接数
    pool_pre_ping=True,  # 连接前ping检查
    pool_recycle=300,  # 连接回收时间
    echo=False  # 是否打印SQL语句
)

AsyncSessionLocal = sessionmaker(
    engine, 
    class_=AsyncSession, 
    expire_on_commit=False
)
"""

# 响应压缩中间件
"""
from starlette.middleware.gzip import GZipMiddleware

app.add_middleware(GZipMiddleware, minimum_size=1000)
"""

监控与日志管理

日志配置

# logging_config.py - 日志配置
import logging
import logging.handlers
import json
from datetime import datetime
import sys
from typing import Dict, Any

class JSONFormatter(logging.Formatter):
    """JSON格式化器"""
    
    def format(self, record):
        log_entry = {
            'timestamp': datetime.utcnow().isoformat(),
            'level': record.levelname,
            'logger': record.name,
            'message': record.getMessage(),
            'module': record.module,
            'function': record.funcName,
            'line': record.lineno,
        }
        
        # 添加异常信息
        if record.exc_info:
            log_entry['exception'] = self.formatException(record.exc_info)
        
        # 添加额外信息
        if hasattr(record, 'extra'):
            log_entry.update(record.extra)
        
        return json.dumps(log_entry)

def setup_logging(level=logging.INFO):
    """设置日志配置"""
    # 创建logger
    logger = logging.getLogger()
    logger.setLevel(level)
    
    # 清除现有处理器
    logger.handlers.clear()
    
    # 控制台处理器
    console_handler = logging.StreamHandler(sys.stdout)
    console_handler.setLevel(level)
    console_formatter = JSONFormatter()
    console_handler.setFormatter(console_formatter)
    logger.addHandler(console_handler)
    
    # 文件处理器
    file_handler = logging.handlers.RotatingFileHandler(
        '/var/log/daoman/app.log',
        maxBytes=100 * 1024 * 1024,  # 100MB
        backupCount=10
    )
    file_handler.setLevel(level)
    file_formatter = JSONFormatter()
    file_handler.setFormatter(file_formatter)
    logger.addHandler(file_handler)
    
    # 错误日志处理器
    error_handler = logging.handlers.RotatingFileHandler(
        '/var/log/daoman/error.log',
        maxBytes=100 * 1024 * 1024,
        backupCount=10
    )
    error_handler.setLevel(logging.ERROR)
    error_formatter = JSONFormatter()
    error_handler.setFormatter(error_formatter)
    logger.addHandler(error_handler)

# 使用日志配置
setup_logging(logging.INFO)

# 结构化日志记录
def log_request(request, response_time, status_code):
    """记录请求日志"""
    logger = logging.getLogger(__name__)
    logger.info(
        "Request processed",
        extra={
            'method': request.method,
            'path': str(request.url),
            'response_time': response_time,
            'status_code': status_code,
            'user_agent': request.headers.get('user-agent'),
            'ip_address': request.client.host
        }
    )

监控配置

# prometheus.yml - Prometheus监控配置
global:
  scrape_interval: 15s
  evaluation_interval: 15s

rule_files:
  - "alert_rules.yml"

scrape_configs:
  - job_name: 'daoman-api'
    static_configs:
      - targets: ['localhost:8000']
    metrics_path: '/metrics'
    scrape_interval: 5s
    
  - job_name: 'nginx'
    static_configs:
      - targets: ['localhost:9113']
    scrape_interval: 5s
    
  - job_name: 'postgres-exporter'
    static_configs:
      - targets: ['localhost:9187']
    scrape_interval: 5s
    
  - job_name: 'redis-exporter'
    static_configs:
      - targets: ['localhost:9121']
    scrape_interval: 5s

alerting:
  alertmanagers:
    - static_configs:
        - targets:
          - alertmanager:9093

系统监控脚本

# system_monitor.py - 系统监控
import psutil
import time
import requests
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from typing import Dict, Any

class SystemMonitor:
    """系统监控类"""
    
    def __init__(self, api_url: str, thresholds: Dict[str, float] = None):
        self.api_url = api_url
        self.thresholds = thresholds or {
            'cpu_percent': 80.0,
            'memory_percent': 85.0,
            'disk_percent': 90.0,
            'response_time': 2.0
        }
    
    def get_system_metrics(self) -> Dict[str, Any]:
        """获取系统指标"""
        return {
            'timestamp': datetime.now().isoformat(),
            'cpu_percent': psutil.cpu_percent(interval=1),
            'memory_percent': psutil.virtual_memory().percent,
            'disk_percent': psutil.disk_usage('/').percent,
            'load_average': psutil.getloadavg(),
            'network_io': psutil.net_io_counters()._asdict(),
            'process_count': len(psutil.pids()),
        }
    
    def check_api_health(self) -> Dict[str, Any]:
        """检查API健康状态"""
        start_time = time.time()
        try:
            response = requests.get(f"{self.api_url}/health", timeout=10)
            response_time = time.time() - start_time
            
            return {
                'status_code': response.status_code,
                'response_time': response_time,
                'is_healthy': response.status_code == 200,
                'content_length': len(response.content)
            }
        except Exception as e:
            return {
                'status_code': 0,
                'response_time': time.time() - start_time,
                'is_healthy': False,
                'error': str(e)
            }
    
    def check_alerts(self, metrics: Dict[str, Any]) -> Dict[str, bool]:
        """检查告警条件"""
        alerts = {}
        
        for metric, threshold in self.thresholds.items():
            if metric in metrics:
                alerts[metric] = metrics[metric] > threshold
        
        return alerts
    
    def send_alert(self, alert_message: str, recipients: list):
        """发送告警"""
        print(f"ALERT: {alert_message}")  # 实际部署时替换为邮件或短信通知
        
    def run_monitoring_cycle(self, recipients: list):
        """运行监控周期"""
        # 获取系统指标
        metrics = self.get_system_metrics()
        
        # 检查API健康
        api_health = self.check_api_health()
        metrics.update(api_health)
        
        # 检查告警
        alerts = self.check_alerts(metrics)
        
        # 处理告警
        for metric, is_alert in alerts.items():
            if is_alert:
                alert_msg = f"ALERT: {metric} exceeded threshold. Current value: {metrics[metric]}"
                self.send_alert(alert_msg, recipients)
        
        return metrics

# 使用示例
if __name__ == "__main__":
    monitor = SystemMonitor(
        api_url="https://your-domain.com",
        thresholds={
            'cpu_percent': 80.0,
            'memory_percent': 85.0,
            'disk_percent': 90.0,
            'response_time': 2.0
        }
    )
    
    recipients = ["admin@your-domain.com"]
    
    while True:
        try:
            metrics = monitor.run_monitoring_cycle(recipients)
            print(f"System metrics: {metrics}")
            time.sleep(60)  # 每分钟检查一次
        except KeyboardInterrupt:
            print("Monitoring stopped.")
            break
        except Exception as e:
            print(f"Monitoring error: {e}")
            time.sleep(60)

故障排查与调试

常见问题诊断

#!/bin/bash
# troubleshooting.sh - 故障排查脚本

echo "🔍 Daoman API 故障排查工具"
echo "=========================="

# 1. 检查服务状态
echo "1. 服务状态检查:"
sudo systemctl status nginx
sudo systemctl status daoman-api
sudo systemctl status postgresql
sudo systemctl status redis

# 2. 检查端口占用
echo -e "\n2. 端口占用检查:"
sudo netstat -tlnp | grep :80
sudo netstat -tlnp | grep :443
sudo netstat -tlnp | grep :8000

# 3. 检查Nginx配置
echo -e "\n3. Nginx配置检查:"
sudo nginx -t

# 4. 检查日志文件
echo -e "\n4. 最近的日志:"
echo "Nginx访问日志 (最近10行):"
sudo tail -n 10 /var/log/nginx/access.log

echo -e "\nNginx错误日志 (最近10行):"
sudo tail -n 10 /var/log/nginx/error.log

echo -e "\nGunicorn访问日志 (最近10行):"
sudo tail -n 10 /var/log/gunicorn/access.log

echo -e "\nGunicorn错误日志 (最近10行):"
sudo tail -n 10 /var/log/gunicorn/error.log

# 5. 检查系统资源
echo -e "\n5. 系统资源检查:"
echo "CPU使用率:"
top -bn1 | head -n 5

echo -e "\n内存使用情况:"
free -h

echo -e "\n磁盘使用情况:"
df -h

# 6. 网络连通性测试
echo -e "\n6. 网络连通性测试:"
curl -I https://your-domain.com/health 2>/dev/null || echo "无法连接到API"

# 7. 数据库连接测试
echo -e "\n7. 数据库连接测试:"
PGPASSWORD=your_password psql -h localhost -U your_user -d your_db -c "SELECT version();" 2>/dev/null || echo "数据库连接失败"

# 8. Redis连接测试
echo -e "\n8. Redis连接测试:"
redis-cli ping 2>/dev/null || echo "Redis连接失败"

echo -e "\n✅ 故障排查完成!"

性能分析工具

# performance_analysis.py - 性能分析
import cProfile
import pstats
import io
from contextlib import redirect_stdout
import asyncio
import time
from functools import wraps
import tracemalloc

def profile_endpoint(func):
    """性能分析装饰器"""
    @wraps(func)
    async def wrapper(*args, **kwargs):
        # 内存追踪
        tracemalloc.start()
        
        # CPU性能分析
        pr = cProfile.Profile()
        pr.enable()
        
        start_time = time.time()
        
        try:
            result = await func(*args, **kwargs)
        finally:
            end_time = time.time()
            
            pr.disable()
            
            # 保存性能数据
            s = io.StringIO()
            ps = pstats.Stats(pr, stream=s)
            ps.sort_stats('cumulative')
            ps.print_stats(20)  # 显示前20个最耗时的函数
            
            # 内存使用情况
            current, peak = tracemalloc.get_traced_memory()
            tracemalloc.stop()
            
            print(f"\n=== 性能分析报告 ===")
            print(f"函数: {func.__name__}")
            print(f"执行时间: {end_time - start_time:.4f}秒")
            print(f"当前内存使用: {current / 1024 / 1024:.2f} MB")
            print(f"峰值内存使用: {peak / 1024 / 1024:.2f} MB")
            print(f"\n最耗时的函数:")
            print(s.getvalue())
    
    return wrapper

# 使用示例
"""
@app.get("/profiled-endpoint")
@profile_endpoint
async def profiled_endpoint():
    # 模拟一些处理
    await asyncio.sleep(0.1)
    return {"message": "Profiled response"}
"""

# 请求追踪
class RequestTracer:
    """请求追踪器"""
    
    def __init__(self):
        self.requests = {}
    
    async def trace_request(self, request_id: str, operation: str, duration: float):
        """追踪请求"""
        if request_id not in self.requests:
            self.requests[request_id] = []
        
        self.requests[request_id].append({
            'operation': operation,
            'duration': duration,
            'timestamp': time.time()
        })
    
    def get_request_trace(self, request_id: str):
        """获取请求追踪信息"""
        return self.requests.get(request_id, [])

# 异步性能监控
async def async_performance_monitor():
    """异步性能监控"""
    import aiohttp
    import asyncio
    
    urls = [
        "https://your-domain.com/health",
        "https://your-domain.com/api/users",
        "https://your-domain.com/api/items"
    ]
    
    async def fetch(session, url):
        start_time = time.time()
        try:
            async with session.get(url) as response:
                response_time = time.time() - start_time
                return {
                    'url': url,
                    'status': response.status,
                    'response_time': response_time,
                    'success': response.status == 200
                }
        except Exception as e:
            return {
                'url': url,
                'status': 0,
                'response_time': time.time() - start_time,
                'success': False,
                'error': str(e)
            }
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        print("=== 异步性能测试结果 ===")
        for result in results:
            status = "✓" if result['success'] else "✗"
            print(f"{status} {result['url']} - {result['response_time']:.3f}s (Status: {result['status']})")

# 运行异步性能测试
# asyncio.run(async_performance_monitor())

自动化部署脚本

部署脚本

#!/bin/bash
# deploy.sh - 自动化部署脚本

set -e  # 遇到错误立即退出

echo "🚀 开始部署Daoman API..."

# 配置变量
APP_NAME="daoman-api"
APP_DIR="/opt/daoman"
BACKUP_DIR="/opt/backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)

# 创建备份
echo "📁 创建备份..."
sudo mkdir -p $BACKUP_DIR
sudo tar -czf $BACKUP_DIR/${APP_NAME}_backup_${TIMESTAMP}.tar.gz $APP_DIR/ || echo "备份失败,继续部署..."

# 拉取最新代码
echo "📥 拉取最新代码..."
cd $APP_DIR
sudo git fetch origin
sudo git reset --hard origin/main

# 更新依赖
echo "📦 更新Python依赖..."
sudo $APP_DIR/venv/bin/pip install --upgrade pip
sudo $APP_DIR/venv/bin/pip install -r $APP_DIR/requirements.txt

# 运行数据库迁移
echo "🔄 运行数据库迁移..."
sudo $APP_DIR/venv/bin/python $APP_DIR/manage.py migrate

# 收集静态文件
echo "📦 收集静态文件..."
sudo $APP_DIR/venv/bin/python $APP_DIR/manage.py collectstatic --noinput

# 重启Gunicorn服务
echo "🔄 重启Gunicorn服务..."
sudo systemctl stop $APP_NAME
sleep 5
sudo systemctl start $APP_NAME

# 等待服务启动
echo "⏳ 等待服务启动..."
sleep 10

# 检查服务状态
if sudo systemctl is-active --quiet $APP_NAME; then
    echo "✅ $APP_NAME 服务启动成功"
else
    echo "❌ $APP_NAME 服务启动失败"
    sudo systemctl status $APP_NAME
    exit 1
fi

# 重新加载Nginx配置
echo "🔄 重新加载Nginx配置..."
sudo nginx -t && sudo systemctl reload nginx

# 验证部署
echo "🔍 验证部署..."
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" https://your-domain.com/health)
if [ "$HTTP_CODE" -eq 200 ]; then
    echo "✅ 部署验证成功 (HTTP $HTTP_CODE)"
else
    echo "❌ 部署验证失败 (HTTP $HTTP_CODE)"
    exit 1
fi

# 清理旧备份(保留最近3个)
echo "🧹 清理旧备份..."
sudo ls -t $BACKUP_DIR/${APP_NAME}_backup_*.tar.gz | tail -n +4 | xargs -r sudo rm

echo "🎉 部署完成!"
echo "📋 部署时间: $(date)"
echo "📋 服务状态: $(sudo systemctl is-active $APP_NAME)"
echo "📋 最后提交: $(cd $APP_DIR && git log -1 --format='%h - %s (%cr)')"

回滚脚本

#!/bin/bash
# rollback.sh - 回滚脚本

set -e

echo "🔄 开始回滚Daoman API..."

# 配置变量
APP_NAME="daoman-api"
APP_DIR="/opt/daoman"
BACKUP_DIR="/opt/backups"

# 检查是否有备份
if [ ! "$(ls -A $BACKUP_DIR/*.tar.gz 2>/dev/null)" ]; then
    echo "❌ 没有找到备份文件"
    exit 1
fi

# 获取最新的备份
LATEST_BACKUP=$(ls -t $BACKUP_DIR/*.tar.gz | head -n 1)
echo "📂 使用备份文件: $LATEST_BACKUP"

# 停止服务
echo "⏹️  停止服务..."
sudo systemctl stop $APP_NAME

# 恢复应用文件
echo "📤 恢复应用文件..."
sudo rm -rf $APP_DIR/*
sudo tar -xzf $LATEST_BACKUP -C /opt/

# 重启服务
echo "🔄 重启服务..."
sudo systemctl start $APP_NAME

# 验证回滚
echo "🔍 验证回滚..."
sleep 10

if sudo systemctl is-active --quiet $APP_NAME; then
    echo "✅ 回滚成功"
    echo "📋 服务状态: $(sudo systemctl is-active $APP_NAME)"
else
    echo "❌ 回滚失败"
    sudo systemctl status $APP_NAME
    exit 1
fi

echo "🎉 回滚完成!"

总结

Nginx + Gunicorn生产部署架构为FastAPI应用提供了:

  1. 高可用性:多进程、负载均衡、健康检查
  2. 高性能:反向代理、静态文件服务、连接复用
  3. 安全性:SSL终止、请求过滤、安全头部
  4. 可扩展性:水平扩展、垂直扩展、负载均衡
  5. 监控能力:日志记录、性能监控、告警系统

💡 关键要点:合理配置Gunicorn工作进程数、启用Nginx缓存、实施安全加固、建立监控告警系统。


SEO优化建议

为了提高这篇Nginx与Gunicorn生产部署教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:

标题优化

  • 主标题:使用包含核心关键词的标题,如"FastAPI Nginx与Gunicorn生产部署完全指南"
  • 二级标题:每个章节标题都包含相关的长尾关键词
  • H1-H6层次结构:保持正确的标题层级,便于搜索引擎理解内容结构

内容优化

  • 关键词密度:在内容中自然地融入关键词如"Nginx", "Gunicorn", "生产部署", "反向代理", "负载均衡"等
  • 元描述:在文章开头的元数据中包含吸引人的描述
  • 内部链接:链接到其他相关教程,如Docker容器化部署
  • 外部权威链接:引用官方文档和权威资源

技术SEO

  • 页面加载速度:优化代码块和图片加载
  • 移动端适配:确保在移动设备上良好显示
  • 结构化数据:使用适当的HTML标签和语义化元素

用户体验优化

  • 内容可读性:使用清晰的段落结构和代码示例
  • 互动元素:提供实际可运行的代码示例
  • 更新频率:定期更新内容以保持时效性

常见问题解答(FAQ)

Q1: 为什么需要Nginx配合Gunicorn部署?

A: Nginx作为反向代理提供静态文件服务、SSL终止、负载均衡、请求过滤等功能,而Gunicorn负责Python应用的多进程管理,两者配合可以提供更好的性能和安全性。

Q2: 如何配置Gunicorn的工作进程数?

A: 通常设置为CPU核心数的2倍加1,对于I/O密集型应用可以适当增加,对于CPU密集型应用应谨慎设置,避免过度消耗CPU资源。

Q3: 如何实现SSL证书的自动化续期?

A: 使用Let's Encrypt的Certbot工具,配置自动续期脚本和定时任务,确保证书持续有效。

Q4: 如何进行负载均衡配置?

A: 在Nginx中配置upstream块定义多个后端服务器,使用weight参数设置权重,配置健康检查和故障转移策略。

Q5: 如何监控生产环境的性能?

A: 使用Prometheus + Grafana进行指标监控,配置日志收集系统,设置性能告警阈值,定期分析性能数据。


🔗 相关教程推荐

🏷️ 标签云: FastAPI部署 Nginx Gunicorn 生产环境 反向代理 负载均衡 SSL证书 性能优化 安全配置 监控告警