#FastAPI请求体高级处理 - 2026年完整指南
#目录
#嵌套模型与复杂数据
#嵌套请求模型
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
from datetime import datetime
class Address(BaseModel):
"""地址模型"""
street: str = Field(..., min_length=5, max_length=100)
city: str = Field(..., min_length=2, max_length=50)
state: str = Field(..., min_length=2, max_length=50)
country: str = Field(..., min_length=2, max_length=50)
postal_code: str = Field(..., regex=r"^[0-9]{5}(-[0-9]{4})?$")
class ContactInfo(BaseModel):
"""联系信息模型"""
email: str = Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
phone: str = Field(..., regex=r"^\+?1?[-.\s]?\(?(\d{3})\)?[-.\s]?(\d{3})[-.\s]?(\d{4})$")
secondary_phone: Optional[str] = Field(None, regex=r"^\+?1?[-.\s]?\(?(\d{3})\)?[-.\s]?(\d{3})[-.\s]?(\d{4})$")
class OrderItem(BaseModel):
"""订单项目模型"""
product_id: int = Field(..., gt=0)
product_name: str = Field(..., min_length=1, max_length=200)
quantity: int = Field(..., gt=0, le=1000)
unit_price: float = Field(..., gt=0)
total_price: float = Field(..., gt=0)
def __init__(self, **data):
super().__init__(**data)
# 自动计算总价(如果未提供)
if 'total_price' not in data or data['total_price'] is None:
self.total_price = self.quantity * self.unit_price
class CustomerOrder(BaseModel):
"""客户订单模型(嵌套模型示例)"""
order_number: str = Field(..., regex=r"^ORD\d{8}$", description="订单号格式:ORD+8位数字")
customer_name: str = Field(..., min_length=2, max_length=100)
customer_email: str = Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
billing_address: Address
shipping_address: Optional[Address] = None
contact_info: ContactInfo
items: List[OrderItem] = Field(..., min_items=1, max_items=100)
shipping_method: str = Field(default="standard", regex=r"^(standard|express|overnight)$")
special_instructions: Optional[str] = Field(None, max_length=500)
metadata: Dict[str, str] = Field(default_factory=dict)
created_at: datetime = Field(default_factory=datetime.utcnow)
def __init__(self, **data):
super().__init__(**data)
# 如果未提供收货地址,使用账单地址
if not self.shipping_address:
self.shipping_address = self.billing_address
@property
def subtotal(self) -> float:
"""计算小计"""
return sum(item.total_price for item in self.items)
@property
def total_amount(self) -> float:
"""计算总额(含运费)"""
shipping_cost = 10.00 if self.shipping_method == "standard" else 20.00
return self.subtotal + shipping_cost
# 创建嵌套模型实例
billing_addr = Address(
street="123 Main St",
city="New York",
state="NY",
country="USA",
postal_code="10001"
)
contact = ContactInfo(
email="customer@example.com",
phone="+1-555-123-4567"
)
order_items = [
OrderItem(
product_id=1,
product_name="Laptop",
quantity=1,
unit_price=999.99
),
OrderItem(
product_id=2,
product_name="Mouse",
quantity=2,
unit_price=29.99
)
]
customer_order = CustomerOrder(
order_number="ORD2024010001",
customer_name="John Smith",
customer_email="customer@example.com",
billing_address=billing_addr,
contact_info=contact,
items=order_items,
metadata={"source": "website", "campaign": "summer_sale"}
)
print("嵌套模型示例:", customer_order.model_dump())
print(f"订单小计: ${customer_order.subtotal:.2f}")
print(f"订单总额: ${customer_order.total_amount:.2f}")#动态模型构建
from pydantic import BaseModel, create_model
from typing import Optional, Union, Dict, Any
import inspect
def create_dynamic_model(model_name: str, fields: Dict[str, tuple]):
"""
动态创建Pydantic模型
fields: {字段名: (类型, 默认值或Field对象)}
"""
field_definitions = {}
for field_name, (field_type, field_default) in fields.items():
if isinstance(field_default, Field):
field_definitions[field_name] = (field_type, field_default)
else:
field_definitions[field_name] = (field_type, field_default)
dynamic_model = create_model(model_name, **field_definitions)
return dynamic_model
# 示例:创建用户配置模型
UserConfigFields = {
"username": (str, Field(..., min_length=3, max_length=50)),
"email": (str, Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")),
"age": (Optional[int], Field(None, ge=13, le=120)),
"preferences": (Dict[str, Any], Field(default_factory=dict)),
"is_active": (bool, True)
}
DynamicUserConfig = create_dynamic_model("DynamicUserConfig", UserConfigFields)
# 验证动态模型
try:
user_config = DynamicUserConfig(
username="johndoe",
email="john@example.com",
age=25,
preferences={"theme": "dark", "language": "en"},
is_active=True
)
print("动态模型创建成功:", user_config.model_dump())
except Exception as e:
print(f"动态模型创建失败: {e}")
# 更复杂的动态模型示例
def create_api_request_model(endpoint: str, required_fields: Dict[str, type], optional_fields: Dict[str, tuple] = None):
"""
为特定API端点创建请求模型
"""
fields = {}
# 添加必需字段
for field_name, field_type in required_fields.items():
fields[field_name] = (field_type, ...)
# 添加可选字段
if optional_fields:
for field_name, (field_type, default_value) in optional_fields.items():
fields[field_name] = (field_type, default_value)
model_name = f"{endpoint.title()}Request"
return create_dynamic_model(model_name, fields)
# 为用户API创建动态模型
UserApiRequest = create_api_request_model(
endpoint="user",
required_fields={
"action": str,
"user_data": dict
},
optional_fields={
"validate_only": (bool, False),
"return_fields": (list, [])
}
)
api_request = UserApiRequest(
action="create",
user_data={"name": "John", "email": "john@example.com"},
validate_only=True
)
print("API请求模型:", api_request.model_dump())#递归模型
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
class TreeNode(BaseModel):
"""树节点模型(递归模型示例)"""
id: int
name: str
value: Optional[str] = None
children: List['TreeNode'] = Field(default_factory=list)
parent_id: Optional[int] = None
level: int = 0
created_at: datetime = Field(default_factory=datetime.utcnow)
def add_child(self, child_node: 'TreeNode'):
"""添加子节点"""
child_node.parent_id = self.id
child_node.level = self.level + 1
self.children.append(child_node)
def find_node(self, node_id: int) -> Optional['TreeNode']:
"""查找节点"""
if self.id == node_id:
return self
for child in self.children:
found = child.find_node(node_id)
if found:
return found
return None
def to_dict(self) -> dict:
"""转换为字典(带层级信息)"""
result = self.model_dump()
result['children'] = [child.to_dict() for child in self.children]
return result
# 修复向前引用
TreeNode.update_forward_refs()
# 创建树结构
root = TreeNode(id=1, name="Root", value="root_value")
child1 = TreeNode(id=2, name="Child 1", value="child1_value")
child2 = TreeNode(id=3, name="Child 2", value="child2_value")
grandchild1 = TreeNode(id=4, name="Grandchild 1", value="gc1_value")
grandchild2 = TreeNode(id=5, name="Grandchild 2", value="gc2_value")
# 构建树
root.add_child(child1)
root.add_child(child2)
child1.add_child(grandchild1)
child1.add_child(grandchild2)
print("树结构:", root.to_dict())
# 查找节点
found_node = root.find_node(4)
if found_node:
print(f"找到节点: {found_node.name} (ID: {found_node.id}, Level: {found_node.level})")#响应序列化与格式化
#自定义序列化
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
import json
class CustomSerializableModel(BaseModel):
"""自定义序列化的模型"""
id: int
name: str
sensitive_data: str = Field(repr=False) # 不在repr中显示
private_info: str = Field(exclude=True) # 序列化时排除
created_at: datetime = Field(default_factory=datetime.utcnow)
metadata: dict = Field(default_factory=dict)
class Config:
json_encoders = {
datetime: lambda dt: dt.isoformat(),
}
def model_dump(
self,
*,
mode='python',
include=None,
exclude=None,
context=None,
by_alias: bool = False,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
round_trip: bool = False,
warnings: bool = True,
):
"""自定义序列化方法"""
# 调用父类方法
data = super().model_dump(
mode=mode,
include=include,
exclude=exclude,
context=context,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
round_trip=round_trip,
warnings=warnings
)
# 添加自定义处理
if 'created_at' in data and isinstance(data['created_at'], datetime):
# 添加时间戳
data['timestamp'] = int(data['created_at'].timestamp())
# 格式化时间为友好显示
data['created_at_formatted'] = data['created_at'].strftime('%Y-%m-%d %H:%M:%S')
# 处理元数据
if 'metadata' in data and isinstance(data['metadata'], dict):
# 添加元数据统计
data['metadata_count'] = len(data['metadata'])
return data
def to_api_response(self, include_sensitive: bool = False):
"""为API定制的序列化方法"""
exclude_fields = set()
if not include_sensitive:
exclude_fields.add('sensitive_data')
data = self.model_dump(exclude=exclude_fields)
# API特定的格式化
data['api_format'] = True
data['version'] = '1.0'
return data
# 测试自定义序列化
custom_model = CustomSerializableModel(
id=1,
name="Test Item",
sensitive_data="secret information",
private_info="private data not serialized",
metadata={"category": "test", "priority": "high"}
)
print("自定义序列化:", custom_model.model_dump())
print("API响应格式:", custom_model.to_api_response(include_sensitive=False))
print("API响应格式(含敏感数据):", custom_model.to_api_response(include_sensitive=True))#JSON序列化优化
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
from decimal import Decimal
import json
class JsonOptimizedModel(BaseModel):
"""JSON优化的模型"""
id: int
name: str
price: Decimal = Field(decimal_places=2) # 精确的小数处理
tags: List[str] = Field(default_factory=list)
metadata: dict = Field(default_factory=dict)
created_at: datetime
rating: float = Field(ge=0.0, le=5.0)
class Config:
json_encoders = {
datetime: lambda dt: dt.isoformat(),
Decimal: lambda d: float(d), # 或者保持为字符串以保持精度
}
def model_dump_json(self, **kwargs):
"""优化的JSON序列化"""
# 先转换为字典
data = self.model_dump()
# 自定义JSON编码
return json.dumps(
data,
ensure_ascii=False, # 支持中文
separators=(',', ':'), # 紧凑格式
default=str # 处理不可序列化对象
)
@classmethod
def from_json_string(cls, json_str: str):
"""从JSON字符串创建模型实例"""
data = json.loads(json_str)
return cls(**data)
# 测试JSON优化
json_model = JsonOptimizedModel(
id=1,
name="Premium Product",
price=Decimal("1299.99"),
tags=["premium", "featured"],
metadata={"color": "red", "size": "large"},
created_at=datetime.utcnow(),
rating=4.5
)
json_string = json_model.model_dump_json()
print("优化JSON:", json_string)
# 从JSON字符串重建
rebuilt_model = JsonOptimizedModel.from_json_string(json_string)
print("重建模型:", rebuilt_model.model_dump())#响应格式化工具
from pydantic import BaseModel
from typing import Any, Dict, Optional, List
from datetime import datetime
import time
class ResponseFormatter:
"""响应格式化工具类"""
@staticmethod
def format_success(data: Any = None, message: str = "Success", **kwargs) -> Dict[str, Any]:
"""格式化成功响应"""
response = {
"success": True,
"message": message,
"data": data,
"timestamp": datetime.utcnow().isoformat(),
"status_code": 200
}
response.update(kwargs)
return response
@staticmethod
def format_error(message: str, error_code: str = "GENERIC_ERROR", status_code: int = 400, **kwargs) -> Dict[str, Any]:
"""格式化错误响应"""
response = {
"success": False,
"message": message,
"error_code": error_code,
"timestamp": datetime.utcnow().isoformat(),
"status_code": status_code
}
response.update(kwargs)
return response
@staticmethod
def format_paged_response(
data: List[Any],
total: int,
page: int = 1,
size: int = 10,
message: str = "Success"
) -> Dict[str, Any]:
"""格式化分页响应"""
pages = (total + size - 1) // size # 向上取整
return {
"success": True,
"message": message,
"data": data,
"pagination": {
"total": total,
"page": page,
"size": size,
"pages": pages,
"has_next": page < pages,
"has_prev": page > 1
},
"timestamp": datetime.utcnow().isoformat()
}
@staticmethod
def benchmark_function(func, *args, **kwargs):
"""函数执行时间基准测试"""
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
execution_time = end_time - start_time
return {
"result": result,
"execution_time": f"{execution_time:.4f}s",
"execution_time_ms": f"{execution_time * 1000:.2f}ms"
}
# 使用响应格式化工具
formatter = ResponseFormatter()
# 成功响应示例
success_resp = formatter.format_success(
data={"user_id": 123, "username": "john_doe"},
message="用户创建成功",
request_id="req_12345"
)
print("成功响应:", success_resp)
# 错误响应示例
error_resp = formatter.format_error(
message="用户不存在",
error_code="USER_NOT_FOUND",
status_code=404,
suggested_action="请检查用户ID是否正确"
)
print("错误响应:", error_resp)
# 分页响应示例
paged_resp = formatter.format_paged_response(
data=[{"id": 1, "name": "Item 1"}, {"id": 2, "name": "Item 2"}],
total=100,
page=1,
size=10,
message="项目列表获取成功"
)
print("分页响应:", paged_resp)#错误处理与调试
#请求体验证错误处理
from fastapi import FastAPI, Request, HTTPException
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, ValidationError
from typing import Optional
import logging
from datetime import datetime
app = FastAPI()
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ValidatedInput(BaseModel):
"""带验证的输入模型"""
name: str = Field(..., min_length=2, max_length=50)
email: str = Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
age: int = Field(..., ge=0, le=150)
rating: Optional[float] = Field(None, ge=0.0, le=5.0)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""处理请求验证错误"""
logger.error(f"验证错误: {exc}")
# 格式化错误信息
errors = []
for error in exc.errors():
errors.append({
"field": " -> ".join(str(loc) for loc in error['loc']),
"message": error['msg'],
"type": error['type'],
"input": error.get('input'),
"ctx": error.get('ctx', {})
})
return JSONResponse(
status_code=422,
content={
"success": False,
"message": "请求数据验证失败",
"error_code": "VALIDATION_ERROR",
"details": errors,
"timestamp": datetime.utcnow().isoformat()
}
)
@app.exception_handler(ValidationError)
async def pydantic_validation_handler(request: Request, exc: ValidationError):
"""处理Pydantic验证错误"""
logger.error(f"Pydantic验证错误: {exc}")
errors = []
for error in exc.errors():
errors.append({
"field": " -> ".join(str(loc) for loc in error['loc']),
"message": error['msg'],
"type": error['type']
})
return JSONResponse(
status_code=422,
content={
"success": False,
"message": "数据验证失败",
"error_code": "PYDANTIC_VALIDATION_ERROR",
"details": errors
}
)
@app.post("/validated-input/")
async def handle_validated_input(input_data: ValidatedInput):
"""处理带验证的输入"""
return {
"success": True,
"data": input_data.model_dump(),
"message": "输入验证成功"
}
# 测试验证错误处理
# 可以发送无效数据来测试错误处理,例如:
# {
# "name": "A", # 太短
# "email": "invalid-email", # 无效邮箱
# "age": 200 # 超出范围
# }#调试工具和中间件
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import time
import json
from typing import Callable, Awaitable
app = FastAPI()
class DebugMiddleware:
"""调试中间件"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
return await self.app(scope, receive, send)
request = Request(scope, receive)
# 记录请求信息
start_time = time.time()
request_body = b""
# 读取请求体(注意:这会影响后续的请求体读取)
# 在实际应用中,你可能需要更谨慎地处理这个
# 处理请求
async def send_wrapper(message):
if message["type"] == "http.response.start":
# 添加调试头部
headers = message.get("headers", [])
headers.append([b"x-response-time", f"{time.time() - start_time:.4f}s".encode()])
message["headers"] = headers
await send(message)
try:
# 记录请求
print(f"→ {request.method} {request.url.path}")
print(f" Headers: {dict(request.headers)}")
# 处理响应
response = await self.app(scope, receive, send_wrapper)
return response
except Exception as e:
print(f"✗ 请求处理错误: {e}")
raise
# 添加调试中间件
# app.add_middleware(DebugMiddleware)
def create_debug_endpoint(app: FastAPI):
"""创建调试端点"""
@app.get("/debug/info")
async def debug_info():
"""调试信息端点"""
import sys
import platform
return {
"fastapi_version": "2.0.0", # 这里应该动态获取
"python_version": platform.python_version(),
"platform": platform.platform(),
"sys_info": {
"argv": sys.argv,
"executable": sys.executable,
},
"timestamp": datetime.utcnow().isoformat()
}
@app.post("/debug/echo")
async def debug_echo(request: Request):
"""回声端点,返回接收到的所有信息"""
body = await request.body()
return {
"method": request.method,
"url": str(request.url),
"headers": dict(request.headers),
"query_params": dict(request.query_params),
"path_params": request.path_params,
"body": body.decode('utf-8') if body else None,
"timestamp": datetime.utcnow().isoformat()
}
# 创建调试端点
create_debug_endpoint(app)#高级错误处理
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional
import traceback
from enum import Enum
from datetime import datetime
app = FastAPI()
class ErrorCode(str, Enum):
"""错误代码枚举"""
VALIDATION_ERROR = "VALIDATION_ERROR"
BUSINESS_LOGIC_ERROR = "BUSINESS_LOGIC_ERROR"
RESOURCE_NOT_FOUND = "RESOURCE_NOT_FOUND"
PERMISSION_DENIED = "PERMISSION_DENIED"
INTERNAL_ERROR = "INTERNAL_ERROR"
RATE_LIMIT_EXCEEDED = "RATE_LIMIT_EXCEEDED"
class ErrorResponse(BaseModel):
"""错误响应模型"""
success: bool = False
error_code: ErrorCode
message: str
details: Optional[dict] = None
timestamp: str
trace_id: Optional[str] = None
def create_error_response(
error_code: ErrorCode,
message: str,
details: Optional[dict] = None,
trace_id: Optional[str] = None
) -> JSONResponse:
"""创建错误响应"""
error_resp = ErrorResponse(
error_code=error_code,
message=message,
details=details,
timestamp=datetime.utcnow().isoformat(),
trace_id=trace_id
)
status_codes = {
ErrorCode.VALIDATION_ERROR: 422,
ErrorCode.RESOURCE_NOT_FOUND: 404,
ErrorCode.PERMISSION_DENIED: 403,
ErrorCode.RATE_LIMIT_EXCEEDED: 429,
ErrorCode.INTERNAL_ERROR: 500,
ErrorCode.BUSINESS_LOGIC_ERROR: 400
}
return JSONResponse(
status_code=status_codes.get(error_code, 500),
content=error_resp.model_dump()
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""处理HTTP异常"""
error_map = {
400: ErrorCode.BUSINESS_LOGIC_ERROR,
401: ErrorCode.PERMISSION_DENIED,
403: ErrorCode.PERMISSION_DENIED,
404: ErrorCode.RESOURCE_NOT_FOUND,
422: ErrorCode.VALIDATION_ERROR,
429: ErrorCode.RATE_LIMIT_EXCEEDED,
500: ErrorCode.INTERNAL_ERROR
}
error_code = error_map.get(exc.status_code, ErrorCode.INTERNAL_ERROR)
return create_error_response(
error_code=error_code,
message=exc.detail if exc.detail else "HTTP错误",
details={"status_code": exc.status_code}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""处理一般异常"""
# 记录错误追踪
error_trace = traceback.format_exc()
print(f"未处理的异常: {exc}\n{error_trace}")
return create_error_response(
error_code=ErrorCode.INTERNAL_ERROR,
message="服务器内部错误",
details={
"error_type": type(exc).__name__,
"message": str(exc)
}
)
## 性能优化技巧 \{#性能优化技巧}
```python
import asyncio
from typing import List
from pydantic import BaseModel, Field
from concurrent.futures import ThreadPoolExecutor
import time
class PerformanceOptimization:
"""性能优化技巧"""
@staticmethod
async def batch_process_requests(requests: List[dict], model_class) -> List:
"""批量处理请求"""
tasks = []
for req_data in requests:
# 使用asyncio.create_task进行并发处理
task = asyncio.create_task(PerformanceOptimization.process_single_request(req_data, model_class))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
@staticmethod
async def process_single_request(data: dict, model_class):
"""处理单个请求"""
try:
# 验证数据
validated_model = model_class(**data)
# 模拟处理时间
await asyncio.sleep(0.01) # 模拟IO操作
return validated_model
except Exception as e:
return {"error": str(e), "data": data}
@staticmethod
def process_requests_sync(requests: List[dict], model_class, max_workers: int = 4):
"""同步批量处理(使用线程池)"""
def process_request(data):
try:
return model_class(**data)
except Exception as e:
return {"error": str(e), "data": data}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(process_request, requests))
return results
# 示例:性能测试模型
class PerformanceTestModel(BaseModel):
id: int = Field(..., gt=0)
name: str = Field(..., min_length=1, max_length=100)
value: float = Field(..., gt=0)
category: str = Field(..., regex=r"^[a-zA-Z_][a-zA-Z0-9_]*$")
# 测试性能优化
async def performance_test():
"""性能测试"""
# 准备测试数据
test_data = []
for i in range(100):
test_data.append({
"id": i + 1,
"name": f"Item_{i}",
"value": float(i * 10),
"category": "test_category"
})
print("开始性能测试...")
# 测试异步批量处理
start_time = time.time()
async_results = await PerformanceOptimization.batch_process_requests(test_data, PerformanceTestModel)
async_time = time.time() - start_time
print(f"异步批量处理时间: {async_time:.4f}秒")
print(f"成功处理: {sum(1 for r in async_results if not isinstance(r, dict) or 'error' not in r)}")
# 测试同步批量处理
start_time = time.time()
sync_results = PerformanceOptimization.process_requests_sync(test_data, PerformanceTestModel)
sync_time = time.time() - start_time
print(f"同步批量处理时间: {sync_time:.4f}秒")
print(f"成功处理: {sum(1 for r in sync_results if not isinstance(r, dict) or 'error' not in r)}")#实际应用案例
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
import uuid
from datetime import datetime
app = FastAPI()
class OrderItem(BaseModel):
"""订单项目"""
product_id: int
quantity: int
unit_price: float
class OrderRequest(BaseModel):
"""订单请求"""
customer_id: int
items: List[OrderItem]
shipping_address: str
payment_method: str = "credit_card"
notes: Optional[str] = None
class OrderResponse(BaseModel):
"""订单响应"""
order_id: str
customer_id: int
total_amount: float
status: str = "processing"
created_at: datetime = Field(default_factory=datetime.utcnow)
estimated_delivery: datetime
class NotificationService:
"""通知服务"""
@staticmethod
async def send_order_confirmation(order_id: str, customer_id: int):
"""发送订单确认通知"""
print(f"发送订单确认: 订单ID {order_id}, 客户ID {customer_id}")
# 模拟异步通知发送
await asyncio.sleep(0.1)
print(f"订单确认已发送: {order_id}")
@app.post("/orders/", response_model=OrderResponse)
async def create_order(
order_request: OrderRequest,
background_tasks: BackgroundTasks
):
"""创建订单的实际应用案例"""
# 生成订单ID
order_id = f"ORD-{uuid.uuid4().hex[:8].upper()}"
# 计算总金额
total_amount = sum(item.quantity * item.unit_price for item in order_request.items)
# 估算配送时间(2-5个工作日)
estimated_delivery = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
estimated_delivery = estimated_delivery.add(days=3) # 加3天
# 创建响应
response = OrderResponse(
order_id=order_id,
customer_id=order_request.customer_id,
total_amount=total_amount,
estimated_delivery=estimated_delivery
)
# 添加后台任务发送通知
background_tasks.add_task(
NotificationService.send_order_confirmation,
order_id,
order_request.customer_id
)
# 这里可以添加更多的业务逻辑
# 如:库存检查、支付处理、订单保存到数据库等
return response
class UserRegistrationRequest(BaseModel):
"""用户注册请求"""
username: str = Field(..., min_length=3, max_length=50, regex=r"^[a-zA-Z0-9_]+$")
email: str = Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
password: str = Field(..., min_length=8)
full_name: str = Field(..., min_length=2, max_length=100)
age: Optional[int] = Field(None, ge=13, le=120)
terms_accepted: bool = Field(..., description="必须接受条款")
class UserRegistrationResponse(BaseModel):
"""用户注册响应"""
user_id: int
username: str
email: str
registration_date: datetime = Field(default_factory=datetime.utcnow)
verification_required: bool = True
welcome_bonus: Optional[str] = "100积分"
@app.post("/register/", response_model=UserRegistrationResponse)
async def register_user(
user_request: UserRegistrationRequest,
background_tasks: BackgroundTasks
):
"""用户注册的实际应用案例"""
# 这里通常是保存到数据库的逻辑
# 模拟生成用户ID
user_id = int(datetime.utcnow().timestamp() * 1000) % 1000000
# 创建响应
response = UserRegistrationResponse(
user_id=user_id,
username=user_request.username,
email=user_request.email,
verification_required=True
)
# 添加后台任务发送验证邮件
background_tasks.add_task(send_verification_email, user_request.email, user_id)
# 添加欢迎奖励
if user_request.age and user_request.age < 25:
response.welcome_bonus = "200积分(青年优惠)"
return response
async def send_verification_email(email: str, user_id: int):
"""发送验证邮件的后台任务"""
print(f"发送验证邮件到: {email}, 用户ID: {user_id}")
await asyncio.sleep(0.5) # 模拟邮件发送时间
print(f"验证邮件已发送: {email}")
# 数据验证的实际应用
class APISpecification(BaseModel):
"""API规范模型"""
endpoint: str = Field(..., regex=r"^/[a-zA-Z0-9/_-]+$")
method: str = Field(..., regex=r"^(GET|POST|PUT|DELETE|PATCH)$")
description: str = Field(..., min_length=10, max_length=500)
request_model: Optional[Dict] = None
response_model: Optional[Dict] = None
auth_required: bool = True
rate_limit: int = Field(default=100, ge=1, le=10000)
tags: List[str] = Field(default_factory=list, max_items=10)
@app.post("/api-spec/", response_model=dict)
async def create_api_spec(spec: APISpecification):
"""创建API规范的实际应用"""
# 验证API规范的有效性
if spec.auth_required and 'auth' not in spec.tags:
spec.tags.append('auth')
# 模拟保存API规范
spec_id = f"SPEC-{uuid.uuid4().hex[:8]}"
return {
"spec_id": spec_id,
"endpoint": spec.endpoint,
"method": spec.method,
"status": "created",
"validation_passed": True
}#相关教程
#总结
FastAPI的高级请求体处理功能提供了:
- 嵌套模型:处理复杂的嵌套数据结构
- 动态模型:根据需求动态创建模型
- 递归模型:处理树形结构数据
- 序列化优化:自定义数据序列化格式
- 错误处理:完善的错误处理和调试机制
- 性能优化:高效的处理和验证机制
- 实际应用:真实世界的使用案例
掌握这些高级技巧可以帮助您构建更加健壮、高效的FastAPI应用程序。

