#FastAPI Nginx与Gunicorn生产部署完全指南
📂 所属阶段:第五阶段 — 工程化与部署(实战篇)
🔗 相关章节:Docker容器化部署 · Pydantic Settings多环境配置
#目录
#生产部署架构概述
#为什么需要Nginx + Gunicorn架构?
在生产环境中,直接使用Uvicorn运行FastAPI应用存在诸多局限性:
单一进程架构问题:
┌─────────────────────────────────────────────────────┐
│ 用户请求 → Uvicorn → FastAPI → 处理请求 │
│ (单进程,阻塞时所有请求等待) │
└─────────────────────────────────────────────────────┘
Nginx + Gunicorn架构优势:
┌─────────────────────────────────────────────────────┐
│ 用户请求 → Nginx → Gunicorn Workers → FastAPI │
│ (多进程,负载均衡,SSL终止,静态文件) │
└─────────────────────────────────────────────────────┘#核心组件作用
| 组件 | 职责 | 优势 |
|---|---|---|
| Nginx | 反向代理、SSL终止、静态文件服务 | 高并发、低内存占用、成熟稳定 |
| Gunicorn | WSGI服务器、进程管理 | 多进程、热重启、优雅关闭 |
| Uvicorn | ASGI服务器、异步处理 | 异步支持、高性能、低延迟 |
| FastAPI | 应用逻辑处理 | 类型提示、自动生成文档、依赖注入 |
#典型生产架构
互联网用户 → DNS → CDN → Nginx集群 → Gunicorn Workers → 数据库集群
↓
监控系统 ← 日志收集 ← 应用日志#Nginx反向代理配置
#基础反向代理配置
# /etc/nginx/sites-available/daoman-api
upstream daoman_api {
# Gunicorn workers负载均衡
server 127.0.0.1:8000 weight=3 max_fails=2 fail_timeout=30s;
server 127.0.0.1:8001 weight=3 max_fails=2 fail_timeout=30s;
server 127.0.0.1:8002 weight=2 max_fails=2 fail_timeout=30s;
server 127.0.0.1:8003 weight=2 max_fails=2 fail_timeout=30s;
# 保持连接
keepalive 32;
}
server {
listen 80;
server_name your-domain.com www.your-domain.com;
# HTTP自动重定向到HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name your-domain.com www.your-domain.com;
# SSL证书配置
ssl_certificate /etc/ssl/certs/your-domain.crt;
ssl_certificate_key /etc/ssl/private/your-domain.key;
ssl_trusted_certificate /etc/ssl/certs/ca-bundle.crt;
# SSL安全配置
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA384;
ssl_prefer_server_ciphers off;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
ssl_session_tickets off;
# 安全头部
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Referrer-Policy "no-referrer-when-downgrade" always;
add_header Content-Security-Policy "default-src 'self' http: https: data: blob: 'unsafe-inline'" always;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;
# 日志配置
access_log /var/log/nginx/daoman-access.log main buffer=32k flush=5m;
error_log /var/log/nginx/daoman-error.log warn;
# 客户端请求限制
client_max_body_size 100M;
client_body_timeout 120s;
client_header_timeout 120s;
# Gzip压缩
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_proxied expired no-cache no-store private must-revalidate auth;
gzip_types
application/atom+xml
application/javascript
application/json
application/ld+json
application/manifest+json
application/rss+xml
application/vnd.geo+json
application/vnd.ms-fontobject
application/wasm
application/x-font-ttf
application/x-web-app-manifest+json
application/xhtml+xml
application/xml
font/opentype
image/bmp
image/svg+xml
text/cache-manifest
text/calendar
text/css
text/javascript
text/plain
text/xml;
# 主API代理
location / {
proxy_pass http://daoman_api;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_set_header X-Forwarded-Host $host;
proxy_set_header X-Original-Forwarded-For $remote_addr;
# 超时配置
proxy_connect_timeout 60s;
proxy_send_timeout 60s;
proxy_read_timeout 60s;
# 缓冲配置
proxy_buffering on;
proxy_buffer_size 128k;
proxy_buffers 4 256k;
proxy_busy_buffers_size 256k;
# 重试和错误处理
proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504;
proxy_next_upstream_tries 3;
proxy_next_upstream_timeout 10s;
}
# 静态文件服务
location /static/ {
alias /var/www/daoman/static/;
expires 30d;
add_header Cache-Control "public, immutable";
access_log off;
}
# 媒体文件服务
location /media/ {
alias /var/www/daoman/media/;
expires 7d;
add_header Cache-Control "public";
access_log off;
}
# 健康检查端点
location /health {
access_log off;
proxy_pass http://daoman_api/health;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# WebSocket支持
location /ws/ {
proxy_pass http://daoman_api;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_read_timeout 86400;
proxy_send_timeout 86400;
}
# API文档
location /docs {
proxy_pass http://daoman_api;
proxy_http_version 1.1;
proxy_set_header Connection "";
}
location /redoc {
proxy_pass http://daoman_api;
proxy_http_version 1.1;
proxy_set_header Connection "";
}
location /openapi.json {
proxy_pass http://daoman_api;
proxy_http_version 1.1;
proxy_set_header Connection "";
}
}#Nginx性能优化配置
# /etc/nginx/nginx.conf - 主配置文件优化
user www-data;
worker_processes auto;
worker_rlimit_nofile 100000;
events {
worker_connections 2048;
multi_accept on;
use epoll;
}
http {
# 基础配置
sendfile on;
tcp_nopush on;
tcp_nodelay on;
keepalive_timeout 65;
keepalive_requests 100;
types_hash_max_size 2048;
# 客户端请求限制
client_body_timeout 12;
client_header_timeout 12;
send_timeout 10;
client_max_body_size 100m;
# 限制模块配置
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
limit_req_zone $binary_remote_addr zone=login:10m rate=1r/s;
limit_conn_zone $binary_remote_addr zone=perip:10m;
# 日志格式
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for" '
'rt=$request_time uct="$upstream_connect_time" '
'uht="$upstream_header_time" urt="$upstream_response_time"';
# Gzip配置
gzip on;
gzip_vary on;
gzip_min_length 1024;
gzip_comp_level 6;
gzip_types
text/plain
text/css
text/xml
text/javascript
application/json
application/javascript
application/xml+rss
application/atom+xml
image/svg+xml;
# 包含站点配置
include /etc/nginx/conf.d/*.conf;
include /etc/nginx/sites-enabled/*;
}#Gunicorn高性能配置
#Gunicorn配置文件
# gunicorn.conf.py - Gunicorn生产配置
import multiprocessing
import os
# 服务器配置
bind = "0.0.0.0:8000" # 绑定地址和端口
backlog = 2048 # 监听队列大小
# 进程配置
workers = multiprocessing.cpu_count() * 2 + 1 # 工作进程数 (推荐: CPU核心数 * 2 + 1)
worker_class = "uvicorn.workers.UvicornWorker" # 使用Uvicorn作为worker
worker_connections = 1000 # 每个worker的最大连接数
max_requests = 1000 # 每个worker处理的最大请求数后重启
max_requests_jitter = 100 # 随机抖动,避免同时重启
timeout = 300 # 请求超时时间
graceful_timeout = 30 # 优雅关闭超时时间
keepalive = 5 # HTTP长连接超时时间
# 进程命名
proc_name = "daoman_api"
# 服务器软件标识
server_software = "DaomanAPI/1.0"
# 重启配置
max_requests = 1000 # 每个worker处理1000个请求后重启
max_requests_jitter = 100 # 随机抖动避免同时重启
# 日志配置
accesslog = "/var/log/gunicorn/access.log" # 访问日志
errorlog = "/var/log/gunicorn/error.log" # 错误日志
loglevel = "info" # 日志级别
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# 进程管理
preload_app = True # 预加载应用
daemon = False # 不作为守护进程运行
enable_stdio_inheritance = True # 继承stdio
# 资源限制
limit_request_line = 4094 # HTTP请求行最大长度
limit_request_fields = 100 # HTTP头部字段最大数量
limit_request_field_size = 8190 # HTTP头部字段最大长度
# SSL配置(如果需要)
# keyfile = "/path/to/keyfile"
# certfile = "/path/to/certfile"#启动脚本优化
#!/bin/bash
# start_gunicorn.sh - Gunicorn启动脚本
# 设置环境变量
export PYTHONPATH="/opt/daoman:$PYTHONPATH"
export ENV="production"
export LOG_LEVEL="info"
# 创建日志目录
mkdir -p /var/log/gunicorn
chown www-data:www-data /var/log/gunicorn
# 获取CPU核心数
CPU_CORES=$(nproc)
WORKERS=$((CPU_CORES * 2 + 1))
echo "Starting Daoman API with $WORKERS workers..."
# 启动Gunicorn
exec gunicorn main:app \
--config gunicorn.conf.py \
--workers $WORKERS \
--bind 0.0.0.0:8000 \
--worker-class uvicorn.workers.UvicornWorker \
--worker-connections 1000 \
--max-requests 1000 \
--max-requests-jitter 100 \
--timeout 300 \
--graceful-timeout 30 \
--keep-alive 5 \
--preload-app \
--access-logfile /var/log/gunicorn/access.log \
--error-logfile /var/log/gunicorn/error.log \
--log-level info \
--capture-output \
--enable-stdio-inheritance#多端口Gunicorn配置
# multi_port_gunicorn.py - 多端口配置示例
import multiprocessing
import socket
from contextlib import closing
def find_free_port():
"""查找可用端口"""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(('', 0))
s.listen(1)
port = s.getsockname()[1]
return port
# 生成多个Gunicorn配置
def generate_multi_port_config():
"""生成多端口配置"""
cpu_cores = multiprocessing.cpu_count()
ports = [find_free_port() for _ in range(min(4, cpu_cores))]
configs = []
for i, port in enumerate(ports):
config = {
'bind': f'0.0.0.0:{port}',
'workers': 1,
'worker_class': 'uvicorn.workers.UvicornWorker',
'worker_connections': 1000,
'max_requests': 1000,
'max_requests_jitter': 100,
'timeout': 300,
'graceful_timeout': 30,
'keepalive': 5,
'preload_app': True,
}
configs.append(config)
return configs, ports
# 使用supervisor管理多个Gunicorn实例
"""
[group:daoman_api]
programs:daoman_api_8000,daoman_api_8001,daoman_api_8002,daoman_api_8003
[program:daoman_api_8000]
command=gunicorn main:app --bind 0.0.0.0:8000 --workers 1 --worker-class uvicorn.workers.UvicornWorker
directory=/opt/daoman
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/gunicorn/daoman_api_8000.log
[program:daoman_api_8001]
command=gunicorn main:app --bind 0.0.0.0:8001 --workers 1 --worker-class uvicorn.workers.UvicornWorker
directory=/opt/daoman
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/gunicorn/daoman_api_8001.log
"""#SSL证书配置与HTTPS
#Let's Encrypt自动化配置
#!/bin/bash
# ssl_setup.sh - SSL证书自动化配置
# 安装Certbot
sudo apt update
sudo apt install certbot python3-certbot-nginx -y
# 获取SSL证书
sudo certbot --nginx -d your-domain.com -d www.your-domain.com --agree-tos --email your-email@example.com
# 测试自动续期
sudo certbot renew --dry-run
# 设置自动续期定时任务
(crontab -l 2>/dev/null; echo "0 12 * * * /usr/bin/certbot renew --quiet") | crontab -#SSL安全配置
# ssl_configuration.conf - SSL安全配置
# SSL协议和加密套件
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-RSA-AES128-SHA256:ECDHE-RSA-AES256-SHA384:ECDHE-RSA-CHACHA20-POLY1305;
ssl_prefer_server_ciphers off;
# 会话管理
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
ssl_session_tickets off;
# OCSP装订
ssl_stapling on;
ssl_stapling_verify on;
resolver 8.8.8.8 8.8.4.4 valid=300s;
resolver_timeout 5s;
# HSTS
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;
# SSL安全头部
add_header X-Frame-Options DENY always;
add_header X-Content-Type-Options nosniff always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;#证书监控脚本
# ssl_monitor.py - SSL证书监控
import ssl
import socket
from datetime import datetime, timedelta
import smtplib
from email.mime.text import MIMEText
import subprocess
def check_ssl_expiry(domain, port=443, warning_days=30):
"""检查SSL证书到期时间"""
context = ssl.create_default_context()
with socket.create_connection((domain, port), timeout=10) as sock:
with context.wrap_socket(sock, server_hostname=domain) as ssock:
cert = ssock.getpeercert()
# 解析证书到期时间
expiry_date = datetime.strptime(cert['notAfter'], '%b %d %H:%M:%S %Y %Z')
days_until_expiry = (expiry_date - datetime.now()).days
return {
'domain': domain,
'expiry_date': expiry_date.strftime('%Y-%m-%d'),
'days_until_expiry': days_until_expiry,
'is_expired': days_until_expiry <= 0,
'needs_renewal': days_until_expiry <= warning_days
}
def renew_certificates(domains):
"""续订证书"""
for domain in domains:
result = subprocess.run([
'certbot', 'renew', '--cert-name', domain, '--quiet'
], capture_output=True, text=True)
if result.returncode == 0:
print(f"Successfully renewed certificate for {domain}")
else:
print(f"Failed to renew certificate for {domain}: {result.stderr}")
def send_alert_email(recipients, subject, body):
"""发送告警邮件"""
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = 'monitor@your-domain.com'
msg['To'] = ', '.join(recipients)
# 发送邮件(需要配置SMTP服务器)
# smtp_server = smtplib.SMTP('smtp.your-server.com', 587)
# smtp_server.starttls()
# smtp_server.login('username', 'password')
# smtp_server.send_message(msg)
# smtp_server.quit()
# 检查证书到期时间
domains_to_check = ['your-domain.com', 'www.your-domain.com']
for domain in domains_to_check:
cert_info = check_ssl_expiry(domain)
print(f"Domain: {cert_info['domain']}")
print(f"Expiry: {cert_info['expiry_date']}")
print(f"Days until expiry: {cert_info['days_until_expiry']}")
print(f"Needs renewal: {cert_info['needs_renewal']}")
print("-" * 50)#负载均衡与高可用
#Nginx负载均衡配置
# load_balancer.conf - 负载均衡配置
upstream daoman_backend {
# 加权轮询
server backend1.daoman.com:8000 weight=3 max_fails=2 fail_timeout=30s;
server backend2.daoman.com:8000 weight=3 max_fails=2 fail_timeout=30s;
server backend3.daoman.com:8000 weight=2 max_fails=2 fail_timeout=30s;
# IP哈希(保持会话)
ip_hash;
# 保持连接
keepalive 32;
}
# 健康检查
upstream daoman_backend_health {
server backend1.daoman.com:8000 max_fails=2 fail_timeout=30s;
server backend2.daoman.com:8000 max_fails=2 fail_timeout=30s;
server backend3.daoman.com:8000 max_fails=2 fail_timeout=30s;
# 健康检查配置
health_check interval=10 fails=2 passes=3 uri=/health;
}
# 主服务器配置
server {
listen 80;
server_name api.daoman.com;
location / {
proxy_pass http://daoman_backend;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# 负载均衡相关配置
proxy_next_upstream error timeout invalid_header http_500 http_502 http_503 http_504;
proxy_next_upstream_tries 3;
proxy_next_upstream_timeout 10s;
}
}#HAProxy配置(备选方案)
# haproxy.cfg - HAProxy负载均衡配置
global
log stdout local0
maxconn 4096
daemon
defaults
log global
mode http
option httplog
option dontlognull
retries 3
option redispatch
timeout connect 5000ms
timeout client 50000ms
timeout server 50000ms
frontend daoman_frontend
bind *:80
bind *:443 ssl crt /etc/ssl/private/daoman.pem
default_backend daoman_backend
option forwardfor
option httpclose
backend daoman_backend
balance roundrobin
option httpchk GET /health
server backend1 127.0.0.1:8000 check inter 2000 rise 2 fall 3
server backend2 127.0.0.1:8001 check inter 2000 rise 2 fall 3
server backend3 127.0.0.1:8002 check inter 2000 rise 2 fall 3
server backend4 127.0.0.1:8003 check inter 2000 rise 2 fall 3#应用层健康检查
# health_check.py - 应用层健康检查
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from datetime import datetime
import asyncio
import logging
import psutil
import platform
app = FastAPI()
class HealthStatus(BaseModel):
status: str
timestamp: str
uptime: float
version: str
services: dict
system: dict
@app.get("/health", response_model=HealthStatus)
async def health_check():
"""健康检查端点"""
import time
start_time = time.time()
# 检查各个服务状态
services_status = {
"database": await check_database_connection(),
"redis": await check_redis_connection(),
"external_api": await check_external_api(),
"disk_space": await check_disk_space(),
"memory": await check_memory_usage(),
}
# 获取系统信息
system_info = {
"cpu_percent": psutil.cpu_percent(interval=1),
"memory_percent": psutil.virtual_memory().percent,
"disk_percent": psutil.disk_usage('/').percent,
"platform": platform.platform(),
"python_version": platform.python_version(),
}
# 计算总状态
service_statuses = list(services_status.values())
overall_status = "healthy" if all(service_statuses) else "degraded"
return HealthStatus(
status=overall_status,
timestamp=datetime.now().isoformat(),
uptime=time.time() - start_time,
version="1.0.0",
services=services_status,
system=system_info
)
async def check_database_connection():
"""检查数据库连接"""
try:
# 实现数据库连接检查逻辑
await asyncio.sleep(0.1) # 模拟数据库调用
return True
except Exception as e:
logging.error(f"Database connection check failed: {e}")
return False
async def check_redis_connection():
"""检查Redis连接"""
try:
# 实现Redis连接检查逻辑
await asyncio.sleep(0.05) # 模拟Redis调用
return True
except Exception as e:
logging.error(f"Redis connection check failed: {e}")
return False
async def check_external_api():
"""检查外部API"""
try:
# 实现外部API检查逻辑
await asyncio.sleep(0.1) # 模拟外部调用
return True
except Exception as e:
logging.error(f"External API check failed: {e}")
return False
async def check_disk_space():
"""检查磁盘空间"""
try:
disk_usage = psutil.disk_usage('/')
# 如果磁盘使用率超过90%,返回False
return disk_usage.percent < 90
except Exception as e:
logging.error(f"Disk space check failed: {e}")
return False
async def check_memory_usage():
"""检查内存使用"""
try:
memory_percent = psutil.virtual_memory().percent
# 如果内存使用率超过90%,返回False
return memory_percent < 90
except Exception as e:
logging.error(f"Memory usage check failed: {e}")
return False
# 就绪探针(用于Kubernetes等编排系统)
@app.get("/ready")
async def readiness_check():
"""就绪检查端点"""
# 实现就绪检查逻辑
return {"status": "ready"}
# 存活探针
@app.get("/live")
async def liveness_check():
"""存活检查端点"""
return {"status": "alive"}#安全加固措施
#Nginx安全配置
# security_config.conf - Nginx安全配置
# 防止点击劫持
add_header X-Frame-Options "SAMEORIGIN" always;
# 防止MIME类型嗅探
add_header X-Content-Type-Options "nosniff" always;
# XSS防护
add_header X-XSS-Protection "1; mode=block" always;
# 内容安全策略
add_header Content-Security-Policy "default-src 'self'; script-src 'self' 'unsafe-inline' 'unsafe-eval'; style-src 'self' 'unsafe-inline'; img-src 'self' data: https:; font-src 'self' data:; connect-src 'self'; frame-ancestors 'none';" always;
# 引用策略
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
# HSTS
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;
# 限制请求方法
limit_except GET POST HEAD {
deny all;
}
# 限制请求大小
client_max_body_size 10M;
# 隐藏Nginx版本号
server_tokens off;
# 防止敏感文件访问
location ~ /\. {
deny all;
access_log off;
log_not_found off;
}
# 防止访问备份文件
location ~* \.(bak|backup|old|orig|save|swp|tmp)$ {
deny all;
access_log off;
log_not_found off;
}
# 防止访问配置文件
location ~* \.(conf|config|ini|log|sql|sh|env)$ {
deny all;
access_log off;
log_not_found off;
}
# 限制API访问频率
location /api/ {
# 限制IP请求频率
limit_req zone=api burst=10 nodelay;
limit_req_status 429;
# 限制并发连接数
limit_conn perip 10;
limit_conn_status 429;
}#应用层安全配置
# security_middleware.py - 安全中间件
from fastapi import FastAPI, Request, Response
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import JSONResponse
import time
import hashlib
import hmac
app = FastAPI()
class SecurityMiddleware(BaseHTTPMiddleware):
"""安全中间件"""
async def dispatch(self, request: Request, call_next):
# 添加安全头部
response = await call_next(request)
# 添加安全头部
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# 防止暴力破解
if request.method == "POST" and "/auth/" in request.url.path:
if await self.check_rate_limit(request):
return JSONResponse(
status_code=429,
content={"detail": "Too many requests"}
)
return response
async def check_rate_limit(self, request: Request):
"""检查请求频率限制"""
# 实现请求频率限制逻辑
# 这里只是示例
return False
# 添加安全中间件
app.add_middleware(SecurityMiddleware)
# 添加可信主机中间件
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["your-domain.com", "www.your-domain.com", "*.your-domain.com"]
)
# CSRF保护(如果需要)
class CSRFProtection:
"""CSRF保护"""
def __init__(self, secret_key: str):
self.secret_key = secret_key
def generate_token(self, request: Request) -> str:
"""生成CSRF令牌"""
timestamp = str(int(time.time()))
data = f"{request.client.host}:{timestamp}"
signature = hmac.new(
self.secret_key.encode(),
data.encode(),
hashlib.sha256
).hexdigest()
return f"{timestamp}.{signature}"
def verify_token(self, request: Request, token: str) -> bool:
"""验证CSRF令牌"""
try:
timestamp, signature = token.split(".", 1)
expected_timestamp = str(int(time.time()))
# 检查时间戳(令牌有效期5分钟)
if abs(int(expected_timestamp) - int(timestamp)) > 300:
return False
data = f"{request.client.host}:{timestamp}"
expected_signature = hmac.new(
self.secret_key.encode(),
data.encode(),
hashlib.sha256
).hexdigest()
return hmac.compare_digest(signature, expected_signature)
except:
return False#性能优化策略
#Gunicorn性能优化
# performance_config.py - Gunicorn性能配置
import multiprocessing
import os
def get_optimal_workers():
"""获取最优工作进程数"""
cpu_count = multiprocessing.cpu_count()
# 对于I/O密集型应用,通常设置为CPU核心数的2倍
# 对于CPU密集型应用,通常设置为CPU核心数
return min(cpu_count * 2, 8) # 最大不超过8个进程
def get_optimal_connections():
"""获取最优连接数"""
# 每个worker的连接数
return 1000
class PerformanceConfig:
"""性能配置类"""
@staticmethod
def get_gunicorn_config():
"""获取Gunicorn性能配置"""
cpu_count = multiprocessing.cpu_count()
return {
# 服务器配置
'bind': '0.0.0.0:8000',
'backlog': 2048,
# 进程配置
'workers': get_optimal_workers(),
'worker_class': 'uvicorn.workers.UvicornWorker',
'worker_connections': get_optimal_connections(),
'max_requests': 1000,
'max_requests_jitter': 100,
'timeout': 300,
'graceful_timeout': 30,
'keepalive': 5,
# 性能优化
'preload_app': True,
'worker_tmp_dir': '/dev/shm', # 使用内存临时目录
'max_keepalive_requests': 100, # 最大长连接请求数
'keepalive_timeout': 2, # 长连接超时时间
# 日志配置
'accesslog': '/var/log/gunicorn/access.log',
'errorlog': '/var/log/gunicorn/error.log',
'loglevel': 'info',
}
# 内存优化配置
"""
# 在gunicorn.conf.py中添加
worker_tmp_dir = '/dev/shm' # 使用内存临时目录,提高I/O性能
max_keepalive_requests = 100 # 限制长连接请求数,防止内存泄漏
keepalive = 5 # 合理的长连接超时时间
"""
# 预加载优化
"""
preload_app = True # 预加载应用,减少worker启动时间
worker_class = 'uvicorn.workers.UvicornWorker' # 使用高性能worker
"""#应用层性能优化
# performance_optimization.py - 应用层性能优化
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
import asyncio
import time
from functools import wraps
import aiocache
from aiocache import cached, Cache
from aiocache.serializers import JsonSerializer
app = FastAPI()
class PerformanceMiddleware(BaseHTTPMiddleware):
"""性能监控中间件"""
async def dispatch(self, request: Request, call_next):
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = str(process_time)
# 记录慢查询
if process_time > 1.0: # 超过1秒的请求
print(f"Slow request: {request.url} took {process_time:.2f}s")
return response
app.add_middleware(PerformanceMiddleware)
# 缓存配置
cache = Cache(
aiocache.SimpleMemoryCache,
serializer=JsonSerializer(),
ttl=300 # 5分钟缓存
)
# 缓存装饰器
@cached(cache=cache, ttl=300)
async def get_expensive_data(param: str):
"""带缓存的昂贵操作"""
# 模拟昂贵的数据库查询或外部API调用
await asyncio.sleep(0.1)
return {"data": f"expensive result for {param}"}
# 数据库连接池优化
"""
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/db"
engine = create_async_engine(
DATABASE_URL,
pool_size=20, # 连接池大小
max_overflow=30, # 最大溢出连接数
pool_pre_ping=True, # 连接前ping检查
pool_recycle=300, # 连接回收时间
echo=False # 是否打印SQL语句
)
AsyncSessionLocal = sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
"""
# 响应压缩中间件
"""
from starlette.middleware.gzip import GZipMiddleware
app.add_middleware(GZipMiddleware, minimum_size=1000)
"""#监控与日志管理
#日志配置
# logging_config.py - 日志配置
import logging
import logging.handlers
import json
from datetime import datetime
import sys
from typing import Dict, Any
class JSONFormatter(logging.Formatter):
"""JSON格式化器"""
def format(self, record):
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': record.levelname,
'logger': record.name,
'message': record.getMessage(),
'module': record.module,
'function': record.funcName,
'line': record.lineno,
}
# 添加异常信息
if record.exc_info:
log_entry['exception'] = self.formatException(record.exc_info)
# 添加额外信息
if hasattr(record, 'extra'):
log_entry.update(record.extra)
return json.dumps(log_entry)
def setup_logging(level=logging.INFO):
"""设置日志配置"""
# 创建logger
logger = logging.getLogger()
logger.setLevel(level)
# 清除现有处理器
logger.handlers.clear()
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setLevel(level)
console_formatter = JSONFormatter()
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# 文件处理器
file_handler = logging.handlers.RotatingFileHandler(
'/var/log/daoman/app.log',
maxBytes=100 * 1024 * 1024, # 100MB
backupCount=10
)
file_handler.setLevel(level)
file_formatter = JSONFormatter()
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
# 错误日志处理器
error_handler = logging.handlers.RotatingFileHandler(
'/var/log/daoman/error.log',
maxBytes=100 * 1024 * 1024,
backupCount=10
)
error_handler.setLevel(logging.ERROR)
error_formatter = JSONFormatter()
error_handler.setFormatter(error_formatter)
logger.addHandler(error_handler)
# 使用日志配置
setup_logging(logging.INFO)
# 结构化日志记录
def log_request(request, response_time, status_code):
"""记录请求日志"""
logger = logging.getLogger(__name__)
logger.info(
"Request processed",
extra={
'method': request.method,
'path': str(request.url),
'response_time': response_time,
'status_code': status_code,
'user_agent': request.headers.get('user-agent'),
'ip_address': request.client.host
}
)#监控配置
# prometheus.yml - Prometheus监控配置
global:
scrape_interval: 15s
evaluation_interval: 15s
rule_files:
- "alert_rules.yml"
scrape_configs:
- job_name: 'daoman-api'
static_configs:
- targets: ['localhost:8000']
metrics_path: '/metrics'
scrape_interval: 5s
- job_name: 'nginx'
static_configs:
- targets: ['localhost:9113']
scrape_interval: 5s
- job_name: 'postgres-exporter'
static_configs:
- targets: ['localhost:9187']
scrape_interval: 5s
- job_name: 'redis-exporter'
static_configs:
- targets: ['localhost:9121']
scrape_interval: 5s
alerting:
alertmanagers:
- static_configs:
- targets:
- alertmanager:9093#系统监控脚本
# system_monitor.py - 系统监控
import psutil
import time
import requests
from datetime import datetime
import smtplib
from email.mime.text import MIMEText
from typing import Dict, Any
class SystemMonitor:
"""系统监控类"""
def __init__(self, api_url: str, thresholds: Dict[str, float] = None):
self.api_url = api_url
self.thresholds = thresholds or {
'cpu_percent': 80.0,
'memory_percent': 85.0,
'disk_percent': 90.0,
'response_time': 2.0
}
def get_system_metrics(self) -> Dict[str, Any]:
"""获取系统指标"""
return {
'timestamp': datetime.now().isoformat(),
'cpu_percent': psutil.cpu_percent(interval=1),
'memory_percent': psutil.virtual_memory().percent,
'disk_percent': psutil.disk_usage('/').percent,
'load_average': psutil.getloadavg(),
'network_io': psutil.net_io_counters()._asdict(),
'process_count': len(psutil.pids()),
}
def check_api_health(self) -> Dict[str, Any]:
"""检查API健康状态"""
start_time = time.time()
try:
response = requests.get(f"{self.api_url}/health", timeout=10)
response_time = time.time() - start_time
return {
'status_code': response.status_code,
'response_time': response_time,
'is_healthy': response.status_code == 200,
'content_length': len(response.content)
}
except Exception as e:
return {
'status_code': 0,
'response_time': time.time() - start_time,
'is_healthy': False,
'error': str(e)
}
def check_alerts(self, metrics: Dict[str, Any]) -> Dict[str, bool]:
"""检查告警条件"""
alerts = {}
for metric, threshold in self.thresholds.items():
if metric in metrics:
alerts[metric] = metrics[metric] > threshold
return alerts
def send_alert(self, alert_message: str, recipients: list):
"""发送告警"""
print(f"ALERT: {alert_message}") # 实际部署时替换为邮件或短信通知
def run_monitoring_cycle(self, recipients: list):
"""运行监控周期"""
# 获取系统指标
metrics = self.get_system_metrics()
# 检查API健康
api_health = self.check_api_health()
metrics.update(api_health)
# 检查告警
alerts = self.check_alerts(metrics)
# 处理告警
for metric, is_alert in alerts.items():
if is_alert:
alert_msg = f"ALERT: {metric} exceeded threshold. Current value: {metrics[metric]}"
self.send_alert(alert_msg, recipients)
return metrics
# 使用示例
if __name__ == "__main__":
monitor = SystemMonitor(
api_url="https://your-domain.com",
thresholds={
'cpu_percent': 80.0,
'memory_percent': 85.0,
'disk_percent': 90.0,
'response_time': 2.0
}
)
recipients = ["admin@your-domain.com"]
while True:
try:
metrics = monitor.run_monitoring_cycle(recipients)
print(f"System metrics: {metrics}")
time.sleep(60) # 每分钟检查一次
except KeyboardInterrupt:
print("Monitoring stopped.")
break
except Exception as e:
print(f"Monitoring error: {e}")
time.sleep(60)#故障排查与调试
#常见问题诊断
#!/bin/bash
# troubleshooting.sh - 故障排查脚本
echo "🔍 Daoman API 故障排查工具"
echo "=========================="
# 1. 检查服务状态
echo "1. 服务状态检查:"
sudo systemctl status nginx
sudo systemctl status daoman-api
sudo systemctl status postgresql
sudo systemctl status redis
# 2. 检查端口占用
echo -e "\n2. 端口占用检查:"
sudo netstat -tlnp | grep :80
sudo netstat -tlnp | grep :443
sudo netstat -tlnp | grep :8000
# 3. 检查Nginx配置
echo -e "\n3. Nginx配置检查:"
sudo nginx -t
# 4. 检查日志文件
echo -e "\n4. 最近的日志:"
echo "Nginx访问日志 (最近10行):"
sudo tail -n 10 /var/log/nginx/access.log
echo -e "\nNginx错误日志 (最近10行):"
sudo tail -n 10 /var/log/nginx/error.log
echo -e "\nGunicorn访问日志 (最近10行):"
sudo tail -n 10 /var/log/gunicorn/access.log
echo -e "\nGunicorn错误日志 (最近10行):"
sudo tail -n 10 /var/log/gunicorn/error.log
# 5. 检查系统资源
echo -e "\n5. 系统资源检查:"
echo "CPU使用率:"
top -bn1 | head -n 5
echo -e "\n内存使用情况:"
free -h
echo -e "\n磁盘使用情况:"
df -h
# 6. 网络连通性测试
echo -e "\n6. 网络连通性测试:"
curl -I https://your-domain.com/health 2>/dev/null || echo "无法连接到API"
# 7. 数据库连接测试
echo -e "\n7. 数据库连接测试:"
PGPASSWORD=your_password psql -h localhost -U your_user -d your_db -c "SELECT version();" 2>/dev/null || echo "数据库连接失败"
# 8. Redis连接测试
echo -e "\n8. Redis连接测试:"
redis-cli ping 2>/dev/null || echo "Redis连接失败"
echo -e "\n✅ 故障排查完成!"#性能分析工具
# performance_analysis.py - 性能分析
import cProfile
import pstats
import io
from contextlib import redirect_stdout
import asyncio
import time
from functools import wraps
import tracemalloc
def profile_endpoint(func):
"""性能分析装饰器"""
@wraps(func)
async def wrapper(*args, **kwargs):
# 内存追踪
tracemalloc.start()
# CPU性能分析
pr = cProfile.Profile()
pr.enable()
start_time = time.time()
try:
result = await func(*args, **kwargs)
finally:
end_time = time.time()
pr.disable()
# 保存性能数据
s = io.StringIO()
ps = pstats.Stats(pr, stream=s)
ps.sort_stats('cumulative')
ps.print_stats(20) # 显示前20个最耗时的函数
# 内存使用情况
current, peak = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"\n=== 性能分析报告 ===")
print(f"函数: {func.__name__}")
print(f"执行时间: {end_time - start_time:.4f}秒")
print(f"当前内存使用: {current / 1024 / 1024:.2f} MB")
print(f"峰值内存使用: {peak / 1024 / 1024:.2f} MB")
print(f"\n最耗时的函数:")
print(s.getvalue())
return wrapper
# 使用示例
"""
@app.get("/profiled-endpoint")
@profile_endpoint
async def profiled_endpoint():
# 模拟一些处理
await asyncio.sleep(0.1)
return {"message": "Profiled response"}
"""
# 请求追踪
class RequestTracer:
"""请求追踪器"""
def __init__(self):
self.requests = {}
async def trace_request(self, request_id: str, operation: str, duration: float):
"""追踪请求"""
if request_id not in self.requests:
self.requests[request_id] = []
self.requests[request_id].append({
'operation': operation,
'duration': duration,
'timestamp': time.time()
})
def get_request_trace(self, request_id: str):
"""获取请求追踪信息"""
return self.requests.get(request_id, [])
# 异步性能监控
async def async_performance_monitor():
"""异步性能监控"""
import aiohttp
import asyncio
urls = [
"https://your-domain.com/health",
"https://your-domain.com/api/users",
"https://your-domain.com/api/items"
]
async def fetch(session, url):
start_time = time.time()
try:
async with session.get(url) as response:
response_time = time.time() - start_time
return {
'url': url,
'status': response.status,
'response_time': response_time,
'success': response.status == 200
}
except Exception as e:
return {
'url': url,
'status': 0,
'response_time': time.time() - start_time,
'success': False,
'error': str(e)
}
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print("=== 异步性能测试结果 ===")
for result in results:
status = "✓" if result['success'] else "✗"
print(f"{status} {result['url']} - {result['response_time']:.3f}s (Status: {result['status']})")
# 运行异步性能测试
# asyncio.run(async_performance_monitor())#自动化部署脚本
#部署脚本
#!/bin/bash
# deploy.sh - 自动化部署脚本
set -e # 遇到错误立即退出
echo "🚀 开始部署Daoman API..."
# 配置变量
APP_NAME="daoman-api"
APP_DIR="/opt/daoman"
BACKUP_DIR="/opt/backups"
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
# 创建备份
echo "📁 创建备份..."
sudo mkdir -p $BACKUP_DIR
sudo tar -czf $BACKUP_DIR/${APP_NAME}_backup_${TIMESTAMP}.tar.gz $APP_DIR/ || echo "备份失败,继续部署..."
# 拉取最新代码
echo "📥 拉取最新代码..."
cd $APP_DIR
sudo git fetch origin
sudo git reset --hard origin/main
# 更新依赖
echo "📦 更新Python依赖..."
sudo $APP_DIR/venv/bin/pip install --upgrade pip
sudo $APP_DIR/venv/bin/pip install -r $APP_DIR/requirements.txt
# 运行数据库迁移
echo "🔄 运行数据库迁移..."
sudo $APP_DIR/venv/bin/python $APP_DIR/manage.py migrate
# 收集静态文件
echo "📦 收集静态文件..."
sudo $APP_DIR/venv/bin/python $APP_DIR/manage.py collectstatic --noinput
# 重启Gunicorn服务
echo "🔄 重启Gunicorn服务..."
sudo systemctl stop $APP_NAME
sleep 5
sudo systemctl start $APP_NAME
# 等待服务启动
echo "⏳ 等待服务启动..."
sleep 10
# 检查服务状态
if sudo systemctl is-active --quiet $APP_NAME; then
echo "✅ $APP_NAME 服务启动成功"
else
echo "❌ $APP_NAME 服务启动失败"
sudo systemctl status $APP_NAME
exit 1
fi
# 重新加载Nginx配置
echo "🔄 重新加载Nginx配置..."
sudo nginx -t && sudo systemctl reload nginx
# 验证部署
echo "🔍 验证部署..."
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" https://your-domain.com/health)
if [ "$HTTP_CODE" -eq 200 ]; then
echo "✅ 部署验证成功 (HTTP $HTTP_CODE)"
else
echo "❌ 部署验证失败 (HTTP $HTTP_CODE)"
exit 1
fi
# 清理旧备份(保留最近3个)
echo "🧹 清理旧备份..."
sudo ls -t $BACKUP_DIR/${APP_NAME}_backup_*.tar.gz | tail -n +4 | xargs -r sudo rm
echo "🎉 部署完成!"
echo "📋 部署时间: $(date)"
echo "📋 服务状态: $(sudo systemctl is-active $APP_NAME)"
echo "📋 最后提交: $(cd $APP_DIR && git log -1 --format='%h - %s (%cr)')"#回滚脚本
#!/bin/bash
# rollback.sh - 回滚脚本
set -e
echo "🔄 开始回滚Daoman API..."
# 配置变量
APP_NAME="daoman-api"
APP_DIR="/opt/daoman"
BACKUP_DIR="/opt/backups"
# 检查是否有备份
if [ ! "$(ls -A $BACKUP_DIR/*.tar.gz 2>/dev/null)" ]; then
echo "❌ 没有找到备份文件"
exit 1
fi
# 获取最新的备份
LATEST_BACKUP=$(ls -t $BACKUP_DIR/*.tar.gz | head -n 1)
echo "📂 使用备份文件: $LATEST_BACKUP"
# 停止服务
echo "⏹️ 停止服务..."
sudo systemctl stop $APP_NAME
# 恢复应用文件
echo "📤 恢复应用文件..."
sudo rm -rf $APP_DIR/*
sudo tar -xzf $LATEST_BACKUP -C /opt/
# 重启服务
echo "🔄 重启服务..."
sudo systemctl start $APP_NAME
# 验证回滚
echo "🔍 验证回滚..."
sleep 10
if sudo systemctl is-active --quiet $APP_NAME; then
echo "✅ 回滚成功"
echo "📋 服务状态: $(sudo systemctl is-active $APP_NAME)"
else
echo "❌ 回滚失败"
sudo systemctl status $APP_NAME
exit 1
fi
echo "🎉 回滚完成!"#总结
Nginx + Gunicorn生产部署架构为FastAPI应用提供了:
- 高可用性:多进程、负载均衡、健康检查
- 高性能:反向代理、静态文件服务、连接复用
- 安全性:SSL终止、请求过滤、安全头部
- 可扩展性:水平扩展、垂直扩展、负载均衡
- 监控能力:日志记录、性能监控、告警系统
💡 关键要点:合理配置Gunicorn工作进程数、启用Nginx缓存、实施安全加固、建立监控告警系统。
#SEO优化建议
为了提高这篇Nginx与Gunicorn生产部署教程在搜索引擎中的排名,以下是几个关键的SEO优化建议:
#标题优化
- 主标题:使用包含核心关键词的标题,如"FastAPI Nginx与Gunicorn生产部署完全指南"
- 二级标题:每个章节标题都包含相关的长尾关键词
- H1-H6层次结构:保持正确的标题层级,便于搜索引擎理解内容结构
#内容优化
- 关键词密度:在内容中自然地融入关键词如"Nginx", "Gunicorn", "生产部署", "反向代理", "负载均衡"等
- 元描述:在文章开头的元数据中包含吸引人的描述
- 内部链接:链接到其他相关教程,如Docker容器化部署等
- 外部权威链接:引用官方文档和权威资源
#技术SEO
- 页面加载速度:优化代码块和图片加载
- 移动端适配:确保在移动设备上良好显示
- 结构化数据:使用适当的HTML标签和语义化元素
#用户体验优化
- 内容可读性:使用清晰的段落结构和代码示例
- 互动元素:提供实际可运行的代码示例
- 更新频率:定期更新内容以保持时效性
#常见问题解答(FAQ)
#Q1: 为什么需要Nginx配合Gunicorn部署?
A: Nginx作为反向代理提供静态文件服务、SSL终止、负载均衡、请求过滤等功能,而Gunicorn负责Python应用的多进程管理,两者配合可以提供更好的性能和安全性。
#Q2: 如何配置Gunicorn的工作进程数?
A: 通常设置为CPU核心数的2倍加1,对于I/O密集型应用可以适当增加,对于CPU密集型应用应谨慎设置,避免过度消耗CPU资源。
#Q3: 如何实现SSL证书的自动化续期?
A: 使用Let's Encrypt的Certbot工具,配置自动续期脚本和定时任务,确保证书持续有效。
#Q4: 如何进行负载均衡配置?
A: 在Nginx中配置upstream块定义多个后端服务器,使用weight参数设置权重,配置健康检查和故障转移策略。
#Q5: 如何监控生产环境的性能?
A: 使用Prometheus + Grafana进行指标监控,配置日志收集系统,设置性能告警阈值,定期分析性能数据。
🔗 相关教程推荐
- Docker容器化部署 - 容器化部署策略
- Pydantic Settings多环境配置 - 环境配置管理
- 异步任务队列Celery - 异步任务处理
- WebSocket实时通信 - 实时通信技术
- 流式响应StreamingResponse - 流式数据处理
🏷️ 标签云: FastAPI部署 Nginx Gunicorn 生产环境 反向代理 负载均衡 SSL证书 性能优化 安全配置 监控告警

