#FastAPI请求体处理与响应模型
📂 所属阶段:第一阶段 — 快速筑基(基础篇)
🔗 相关章节:路径参数与查询参数 · 响应处理与状态码 · Pydantic综合指南
#目录
#请求体处理概述
请求体处理是Web API开发的核心环节,FastAPI提供了强大而灵活的机制来处理HTTP请求体数据。通过Pydantic模型,FastAPI实现了自动的数据验证、转换和序列化。
#请求体处理机制
FastAPI的请求体处理包含以下几个关键组件:
- 数据模型定义:使用Pydantic模型定义数据结构
- 自动验证:基于类型提示的自动数据验证
- 类型转换:自动类型转换和数据清洗
- 错误处理:详细的验证错误信息
- 文档生成:自动生成API文档
#基础请求体处理
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class Item(BaseModel):
"""基础项目模型"""
name: str
description: Optional[str] = None
price: float
tax: Optional[float] = None
@app.post("/items/")
def create_item(item: Item):
"""
创建新项目
请求体会自动验证和转换
"""
return item
@app.post("/items-with-computation/")
def create_item_with_computation(item: Item):
"""
创建项目并计算相关信息
演示请求体数据处理
"""
item_dict = item.model_dump()
if item.tax:
total = item.price + item.tax
item_dict["total"] = total
return item_dict#HTTP方法与请求体
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
app = FastAPI()
class User(BaseModel):
"""用户模型"""
id: Optional[int] = None
name: str
email: str
age: Optional[int] = None
# POST - 创建资源
@app.post("/users/")
def create_user(user: User):
"""创建新用户"""
user.id = 123 # 模拟生成ID
return user
# PUT - 更新资源(完整替换)
@app.put("/users/{user_id}")
def update_user(user_id: int, user: User):
"""更新用户信息"""
user.id = user_id
return user
# PATCH - 部分更新
@app.patch("/users/{user_id}")
def partial_update_user(user_id: int, user: User):
"""部分更新用户信息"""
# 这里应该实现部分更新逻辑
return {"user_id": user_id, "updated_fields": user.model_dump()}#Pydantic请求模型
#基础模型定义
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
class Product(BaseModel):
"""产品请求模型"""
name: str = Field(
..., # 必填字段
min_length=3,
max_length=100,
description="产品名称,3-100字符"
)
description: Optional[str] = Field(
None,
max_length=500,
description="产品描述,最多500字符"
)
price: float = Field(
...,
gt=0, # 大于0
le=10000, # 小于等于10000
description="产品价格,大于0,小于等于10000"
)
category: str = Field(
...,
regex=r"^[a-zA-Z_][a-zA-Z0-9_]*$", # 字母数字下划线格式
description="产品分类,字母数字下划线格式"
)
tags: List[str] = Field(
default=[],
max_items=10,
description="产品标签,最多10个"
)
created_at: datetime = Field(default_factory=datetime.utcnow)
is_active: bool = True
# 使用示例
product = Product(
name="Laptop",
description="High-performance laptop",
price=1299.99,
category="electronics",
tags=["computer", "hardware"]
)
print(product.model_dump())#高级字段约束
from pydantic import BaseModel, Field, validator
from typing import Optional, List, Dict
from decimal import Decimal
from uuid import UUID
class AdvancedRequestModel(BaseModel):
"""高级请求模型,演示复杂字段约束"""
# 邮箱验证
email: str = Field(
...,
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
description="有效的邮箱地址"
)
# 电话号码验证
phone: str = Field(
...,
regex=r"^\+?1?[-.\s]?\(?(\d{3})\)?[-.\s]?(\d{3})[-.\s]?(\d{4})$",
description="有效的电话号码"
)
# 密码强度验证
password: str = Field(
...,
min_length=8,
regex=r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[@$!%*?&])[A-Za-z\d@$!%*?&]{8,}$",
description="强密码,至少8位,包含大小写字母、数字和特殊字符"
)
# 数值精度
amount: Decimal = Field(
...,
gt=0,
description="金额,必须大于0,支持高精度计算"
)
# 唯一标识符
external_id: UUID = Field(
description="外部系统唯一标识符"
)
# 嵌套字典
metadata: Dict[str, str] = Field(
default_factory=dict,
description="元数据,键值对形式"
)
# 数值范围验证
rating: float = Field(
default=0.0,
ge=0.0,
le=5.0,
description="评分,0-5分"
)
# 枚举验证
status: str = Field(
default="active",
regex=r"^(active|inactive|pending|suspended)$",
description="状态:active, inactive, pending, suspended"
)
# 验证示例
import uuid
from decimal import Decimal
try:
advanced_request = AdvancedRequestModel(
email="user@example.com",
phone="+1-555-123-4567",
password="StrongPass123!",
amount=Decimal("1234.56"),
external_id=uuid.uuid4(),
metadata={"source": "web", "version": "1.0"},
rating=4.5,
status="active"
)
print("高级请求模型验证成功:", advanced_request.model_dump())
except Exception as e:
print(f"验证失败: {e}")#可选字段与默认值
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
class FlexibleRequestModel(BaseModel):
"""灵活的请求模型,演示可选字段和默认值"""
# 必填字段
name: str = Field(..., description="必填名称")
# 可选字段
description: Optional[str] = Field(None, description="可选描述")
notes: Optional[str] = Field(None, description="可选备注")
# 具有默认值的字段
priority: int = Field(default=1, ge=1, le=5, description="优先级,1-5")
is_public: bool = Field(default=False, description="是否公开")
# 工厂函数默认值
tags: List[str] = Field(default_factory=list, description="标签列表")
created_at: datetime = Field(default_factory=datetime.utcnow, description="创建时间")
# 条件默认值
slug: Optional[str] = Field(None, description="URL友好标识符")
def __init__(self, **data):
super().__init__(**data)
# 如果slug未提供,基于name生成
if not self.slug and self.name:
self.slug = self.name.lower().replace(' ', '-').replace('_', '-')
class Config:
# 配置选项
validate_assignment = True # 赋值时验证
use_enum_values = True # 使用枚举值
extra = "forbid" # 禁止额外字段
# 使用示例
flexible_request = FlexibleRequestModel(
name="Sample Product",
description="This is a sample product",
priority=3,
tags=["sample", "demo"]
)
print("灵活请求模型:", flexible_request.model_dump())#请求体验证与约束
#内置验证器
from fastapi import FastAPI
from pydantic import BaseModel, Field, validator
from typing import Optional, List
import re
app = FastAPI()
class ValidatedRequest(BaseModel):
"""带内置验证器的请求模型"""
# 字符串验证
title: str = Field(
...,
min_length=5,
max_length=100,
description="标题,5-100字符"
)
# 数值验证
quantity: int = Field(
...,
ge=1, # 大于等于1
le=1000, # 小于等于1000
description="数量,1-1000"
)
# 价格验证
unit_price: float = Field(
...,
gt=0, # 大于0
le=100000, # 小于等于100000
description="单价,大于0,小于等于100000"
)
# 邮箱验证
contact_email: str = Field(
...,
regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$",
description="联系邮箱"
)
# 标签验证
categories: List[str] = Field(
...,
min_items=1,
max_items=5,
description="分类列表,1-5个"
)
# URL验证
website: Optional[str] = Field(
None,
regex=r"^https?://(?:[-\w.])+(?:\:[0-9]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:\#(?:[\w.])*)?)?$",
description="网站URL"
)
@validator('title')
def validate_title(cls, v):
"""验证标题格式"""
if not v or len(v.strip()) < 5:
raise ValueError('标题长度不足')
if v.lower().startswith(('spam', 'scam', 'fake')):
raise ValueError('标题不能包含敏感词汇')
return v.strip()
@validator('categories')
def validate_categories(cls, v):
"""验证分类"""
if not v:
raise ValueError('至少需要一个分类')
if len(v) > 5:
raise ValueError('分类不能超过5个')
# 验证每个分类的格式
for cat in v:
if not re.match(r'^[a-zA-Z0-9_-]+$', cat):
raise ValueError(f'分类格式无效: {cat}')
return [cat.lower() for cat in v]
@app.post("/validated-requests/")
def handle_validated_request(request: ValidatedRequest):
"""
处理带验证的请求
演示内置验证器的使用
"""
return request#自定义验证逻辑
from pydantic import BaseModel, Field, validator, root_validator
from typing import Optional
from datetime import date, datetime
class BusinessLogicRequest(BaseModel):
"""业务逻辑请求模型"""
customer_name: str
order_date: date
delivery_date: date
order_value: float
customer_type: str # premium, standard, basic
discount_code: Optional[str] = None
payment_method: str # credit_card, debit_card, bank_transfer, cash
@validator('customer_name')
def validate_customer_name(cls, v):
"""验证客户姓名"""
if not v or len(v.strip()) < 2:
raise ValueError('客户姓名至少需要2个字符')
if not v.replace(' ', '').isalpha():
raise ValueError('客户姓名只能包含字母和空格')
return v.strip().title()
@validator('order_date', 'delivery_date')
def validate_date_not_past(cls, v):
"""验证日期不能是过去"""
if v < date.today():
raise ValueError('日期不能是过去的时间')
return v
@validator('order_value')
def validate_order_value(cls, v):
"""验证订单金额"""
if v <= 0:
raise ValueError('订单金额必须大于0')
if v > 1000000:
raise ValueError('订单金额不能超过1,000,000')
return round(v, 2)
@validator('customer_type')
def validate_customer_type(cls, v):
"""验证客户类型"""
allowed_types = ['premium', 'standard', 'basic']
if v.lower() not in allowed_types:
raise ValueError(f'客户类型必须是 {allowed_types} 之一')
return v.lower()
@validator('payment_method')
def validate_payment_method(cls, v):
"""验证支付方式"""
allowed_methods = ['credit_card', 'debit_card', 'bank_transfer', 'cash']
if v.lower() not in allowed_methods:
raise ValueError(f'支付方式必须是 {allowed_methods} 之一')
return v.lower()
@validator('discount_code')
def validate_discount_code(cls, v):
"""验证折扣码"""
if v and not re.match(r'^[A-Z0-9]{6,10}$', v.upper()):
raise ValueError('折扣码格式无效(6-10位字母数字大写)')
return v.upper() if v else v
@root_validator
def validate_business_rules(cls, values):
"""验证业务规则"""
order_date = values.get('order_date')
delivery_date = values.get('delivery_date')
order_value = values.get('order_value')
customer_type = values.get('customer_type')
discount_code = values.get('discount_code')
payment_method = values.get('payment_method')
# 配送日期不能早于订单日期
if order_date and delivery_date and delivery_date < order_date:
raise ValueError('配送日期不能早于订单日期')
# 预期配送时间
if order_date and delivery_date:
days_diff = (delivery_date - order_date).days
if days_diff > 30:
raise ValueError('配送时间不能超过30天')
# 高价值订单的特殊要求
if order_value and order_value > 10000:
if payment_method == 'cash':
raise ValueError('高价值订单不能使用现金支付')
if customer_type == 'basic':
raise ValueError('基本客户不能下单高价值订单')
# 折扣码使用规则
if discount_code and customer_type == 'basic':
raise ValueError('基本客户不能使用折扣码')
if discount_code and order_value and order_value < 100:
raise ValueError('小额订单不能使用折扣码')
return values
# 测试业务逻辑验证
try:
business_request = BusinessLogicRequest(
customer_name="John Smith",
order_date=date(2024, 6, 1),
delivery_date=date(2024, 6, 5),
order_value=1500.00,
customer_type="standard",
discount_code="SAVE10OFF",
payment_method="credit_card"
)
print("业务逻辑验证成功:", business_request.model_dump())
except Exception as e:
print(f"业务逻辑验证失败: {e}")#复杂验证场景
from pydantic import BaseModel, Field, validator, root_validator
from typing import Optional, List, Dict
from datetime import datetime, timedelta
class ComplexValidationRequest(BaseModel):
"""复杂验证请求模型"""
event_name: str
start_time: datetime
end_time: datetime
participants: List[Dict[str, str]] # [{'name': '...', 'email': '...'}]
venue: str
capacity: int
requirements: Dict[str, bool] # {'catering': True, 'audio': False}
budget: float
organizer: Dict[str, str] # {'name': '...', 'email': '...', 'phone': '...'}
@validator('event_name')
def validate_event_name(cls, v):
"""验证事件名称"""
if not v or len(v.strip()) < 3:
raise ValueError('事件名称至少需要3个字符')
if len(v) > 100:
raise ValueError('事件名称不能超过100个字符')
return v.strip()
@validator('participants')
def validate_participants(cls, v):
"""验证参与者列表"""
if not v:
raise ValueError('至少需要一个参与者')
if len(v) > 1000:
raise ValueError('参与者数量不能超过1000人')
emails = set()
for i, participant in enumerate(v):
if 'name' not in participant or not participant['name']:
raise ValueError(f'参与者 {i+1} 缺少姓名')
if 'email' not in participant or not participant['email']:
raise ValueError(f'参与者 {i+1} 缺少邮箱')
# 验证邮箱格式
email = participant['email']
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
raise ValueError(f'参与者 {i+1} 邮箱格式无效: {email}')
# 检查邮箱重复
if email in emails:
raise ValueError(f'参与者 {i+1} 邮箱重复: {email}')
emails.add(email)
return v
@validator('capacity')
def validate_capacity(cls, v):
"""验证容量"""
if v <= 0:
raise ValueError('容量必须大于0')
if v > 10000:
raise ValueError('容量不能超过10000人')
return v
@validator('budget')
def validate_budget(cls, v):
"""验证预算"""
if v < 0:
raise ValueError('预算不能为负数')
if v > 10000000:
raise ValueError('预算不能超过10,000,000')
return round(v, 2)
@validator('organizer')
def validate_organizer(cls, v):
"""验证组织者信息"""
required_fields = ['name', 'email', 'phone']
for field in required_fields:
if field not in v or not v[field]:
raise ValueError(f'组织者信息缺少 {field}')
# 验证邮箱格式
email = v['email']
if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', email):
raise ValueError(f'组织者邮箱格式无效: {email}')
# 验证电话格式
phone = v['phone']
if not re.match(r'^\+?1?[-.\s]?\(?(\d{3})\)?[-.\s]?(\d{3})[-.\s]?(\d{4})$', phone):
raise ValueError(f'组织者电话格式无效: {phone}')
return v
@root_validator
def validate_event_business_logic(cls, values):
"""验证事件业务逻辑"""
start_time = values.get('start_time')
end_time = values.get('end_time')
participants = values.get('participants')
capacity = values.get('capacity')
requirements = values.get('requirements', {})
budget = values.get('budget')
# 时间验证
if start_time and end_time:
if start_time >= end_time:
raise ValueError('结束时间必须晚于开始时间')
duration = end_time - start_time
if duration > timedelta(days=7):
raise ValueError('事件持续时间不能超过7天')
if start_time < datetime.now():
raise ValueError('开始时间不能是过去的时间')
# 参与者数量验证
if participants and capacity:
if len(participants) > capacity:
raise ValueError(f'参与者数量({len(participants)})超过容量限制({capacity})')
# 预算分配验证
if budget:
# 假设餐饮费用每人50元
catering_needed = requirements.get('catering', False)
if catering_needed and participants:
estimated_catering_cost = len(participants) * 50
if estimated_catering_cost > budget * 0.8: # 餐饮费用不超过预算的80%
raise ValueError('餐饮费用估计超过预算的80%')
return values
# 测试复杂验证
try:
complex_request = ComplexValidationRequest(
event_name="Annual Conference",
start_time=datetime(2024, 6, 1, 9, 0),
end_time=datetime(2024, 6, 1, 17, 0),
participants=[
{"name": "John Doe", "email": "john@example.com"},
{"name": "Jane Smith", "email": "jane@example.com"}
],
venue="Convention Center",
capacity=100,
requirements={"catering": True, "audio": True},
budget=5000.00,
organizer={
"name": "Event Organizer",
"email": "organizer@example.com",
"phone": "+1-555-123-4567"
}
)
print("复杂验证成功:", complex_request.model_dump())
except Exception as e:
print(f"复杂验证失败: {e}")#响应模型定义
#基础响应模型
from fastapi import FastAPI
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
app = FastAPI()
class UserResponse(BaseModel):
"""用户响应模型"""
id: int
name: str
email: str
created_at: datetime = Field(default_factory=datetime.utcnow)
is_active: bool = True
class ItemResponse(BaseModel):
"""项目响应模型"""
id: int
name: str
description: Optional[str] = None
price: float
owner_id: int
created_at: datetime = Field(default_factory=datetime.utcnow)
@app.post("/users/", response_model=UserResponse)
def create_user_response(user: UserResponse):
"""
创建用户响应
使用response_model指定响应格式
"""
return user
@app.get("/users/{user_id}", response_model=UserResponse)
def get_user_response(user_id: int):
"""
获取用户响应
返回符合UserResponse模型的数据
"""
return UserResponse(
id=user_id,
name="John Doe",
email="john@example.com",
is_active=True
)#嵌套响应模型
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
class AddressResponse(BaseModel):
"""地址响应模型"""
street: str
city: str
state: str
country: str
postal_code: str
class ContactInfoResponse(BaseModel):
"""联系信息响应模型"""
email: str
phone: str
website: Optional[str] = None
class UserProfileResponse(BaseModel):
"""用户资料响应模型"""
id: int
username: str
full_name: str
email: str
avatar_url: Optional[str] = None
bio: Optional[str] = None
address: Optional[AddressResponse] = None
contact_info: ContactInfoResponse
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
is_active: bool = True
class OrderItemResponse(BaseModel):
"""订单项目响应模型"""
id: int
product_name: str
quantity: int
unit_price: float
total_price: float
class OrderResponse(BaseModel):
"""订单响应模型"""
id: int
order_number: str
customer: UserProfileResponse
items: List[OrderItemResponse]
total_amount: float
status: str = "pending"
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
shipping_address: AddressResponse
# 示例数据
def create_sample_response():
"""创建示例响应数据"""
address = AddressResponse(
street="123 Main St",
city="New York",
state="NY",
country="USA",
postal_code="10001"
)
contact_info = ContactInfoResponse(
email="john@example.com",
phone="+1-555-123-4567"
)
user_profile = UserProfileResponse(
id=1,
username="johndoe",
full_name="John Doe",
email="john@example.com",
contact_info=contact_info,
address=address
)
order_items = [
OrderItemResponse(
id=1,
product_name="Laptop",
quantity=1,
unit_price=999.99,
total_price=999.99
),
OrderItemResponse(
id=2,
product_name="Mouse",
quantity=2,
unit_price=29.99,
total_price=59.98
)
]
order = OrderResponse(
id=1001,
order_number="ORD2024010001",
customer=user_profile,
items=order_items,
total_amount=1059.97,
shipping_address=address
)
return order
sample_order = create_sample_response()
print("嵌套响应模型示例:", sample_order.model_dump(mode='json'))#响应模型约束
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
class SecureResponse(BaseModel):
"""安全响应模型,演示响应数据约束"""
# 基本信息
id: int = Field(..., description="唯一标识符")
name: str = Field(..., min_length=1, max_length=100, description="名称")
# 敏感信息脱敏
email: str = Field(..., description="邮箱(已脱敏)")
phone: str = Field(..., description="电话(已脱敏)")
# 业务数据
status: str = Field(default="active", description="状态")
score: float = Field(default=0.0, ge=0.0, le=100.0, description="评分")
# 时间戳
created_at: datetime = Field(default_factory=datetime.utcnow, description="创建时间")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="更新时间")
# 配置选项
settings: dict = Field(default_factory=dict, description="用户设置")
def model_dump(self, **kwargs):
"""自定义序列化方法,脱敏敏感信息"""
data = super().model_dump(**kwargs)
# 脱敏邮箱:user@domain.com -> u***@d***.com
if 'email' in data and data['email']:
email_parts = data['email'].split('@')
if len(email_parts) == 2:
local_part, domain_part = email_parts
if len(local_part) > 2:
masked_local = local_part[0] + '*' * (len(local_part) - 2) + local_part[-1]
else:
masked_local = '*' * len(local_part)
domain_parts = domain_part.split('.')
if len(domain_parts) >= 2:
domain_name, tld = domain_parts[-2], domain_parts[-1]
masked_domain = domain_name[0] + '*' * (len(domain_name) - 1) + '.' + tld
else:
masked_domain = '*' * len(domain_part)
data['email'] = f"{masked_local}@{masked_domain}"
# 脱敏电话:+1-555-123-4567 -> +*-***-***-4567
if 'phone' in data and data['phone']:
import re
phone = data['phone']
# 保留最后四位数字,其余脱敏
digits = re.findall(r'\d', phone)
if len(digits) >= 4:
visible_part = ''.join(digits[-4:]) # 最后4位
masked_part = '*' * (len(digits) - 4) # 其余部分用*代替
masked_phone = masked_part + visible_part
# 重新格式化
formatted_masked = ''
digit_idx = 0
for char in phone:
if char.isdigit():
if digit_idx < len(masked_part):
formatted_masked += '*'
else:
formatted_masked += visible_part[digit_idx - len(masked_part)]
digit_idx += 1
else:
formatted_masked += char
data['phone'] = formatted_masked
return data
# 测试安全响应模型
secure_response = SecureResponse(
id=123,
name="John Doe",
email="john.doe@example.com",
phone="+1-555-123-4567",
status="active",
score=85.5,
settings={"theme": "dark", "language": "en"}
)
print("安全响应模型(脱敏后):", secure_response.model_dump())#泛型响应模型
from pydantic import BaseModel
from typing import TypeVar, Generic, List, Optional, Dict, Any
from datetime import datetime
T = TypeVar('T')
class BaseApiResponse(BaseModel):
"""基础API响应模型"""
success: bool
message: str
timestamp: datetime = Field(default_factory=datetime.utcnow)
class ApiResponse(Generic[T], BaseApiResponse):
"""泛型API响应模型"""
data: Optional[T] = None
error_code: Optional[str] = None
metadata: Dict[str, Any] = Field(default_factory=dict)
class PagedResponse(Generic[T], BaseApiResponse):
"""分页响应模型"""
data: List[T] = Field(default_factory=list)
total: int = 0
page: int = 1
size: int = 10
pages: int = 0
class User(BaseModel):
"""用户模型"""
id: int
name: str
email: str
class Product(BaseModel):
"""产品模型"""
id: int
name: str
price: float
# 使用泛型响应模型
user_response = ApiResponse[User](
success=True,
message="用户获取成功",
data=User(id=1, name="John", email="john@example.com")
)
product_list_response = PagedResponse[Product](
success=True,
message="产品列表获取成功",
data=[
Product(id=1, name="Laptop", price=999.99),
Product(id=2, name="Mouse", price=29.99)
],
total=2,
page=1,
size=10,
pages=1
)
print("用户响应:", user_response.model_dump())
print("产品列表响应:", product_list_response.model_dump())#文件上传处理
#基础文件上传
from fastapi import FastAPI, File, UploadFile
from fastapi.responses import JSONResponse
from typing import Optional
import shutil
import os
app = FastAPI()
@app.post("/uploadfile/")
async def upload_file(file: UploadFile = File(...)):
"""
基础文件上传
接收上传的文件并返回文件信息
"""
# 读取文件内容
contents = await file.read()
# 创建响应数据
file_info = {
"filename": file.filename,
"content_type": file.content_type,
"size": len(contents),
"message": "文件上传成功"
}
return file_info
@app.post("/uploadfile-with-validation/")
async def upload_file_with_validation(
file: UploadFile = File(...),
max_size: int = 10 * 1024 * 1024 # 10MB
):
"""
带验证的文件上传
验证文件类型和大小
"""
# 验证文件类型
allowed_types = [
"image/jpeg", "image/png", "image/gif",
"application/pdf", "text/plain",
"application/msword", "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
]
if file.content_type not in allowed_types:
return JSONResponse(
status_code=400,
content={"error": f"不支持的文件类型: {file.content_type}"}
)
# 读取文件内容以验证大小
contents = await file.read()
if len(contents) > max_size:
return JSONResponse(
status_code=400,
content={"error": f"文件过大,最大允许 {max_size} 字节"}
)
return {
"filename": file.filename,
"content_type": file.content_type,
"size": len(contents),
"message": "文件验证并上传成功"
}#多文件上传
from fastapi import FastAPI, File, UploadFile
from typing import List
import aiofiles
app = FastAPI()
@app.post("/upload-multiple-files/")
async def upload_multiple_files(files: List[UploadFile] = File(...)):
"""
多文件上传
接收多个文件并返回每个文件的信息
"""
file_infos = []
for file in files:
contents = await file.read()
file_info = {
"filename": file.filename,
"content_type": file.content_type,
"size": len(contents),
"message": "文件上传成功"
}
file_infos.append(file_info)
return {
"uploaded_files_count": len(file_infos),
"files": file_infos
}
@app.post("/upload-multiple-files-with-processing/")
async def upload_multiple_files_with_processing(
files: List[UploadFile] = File(...),
process_images: bool = True
):
"""
多文件上传带处理
可选择对图片文件进行处理
"""
results = []
for i, file in enumerate(files):
file_result = {
"original_filename": file.filename,
"content_type": file.content_type
}
contents = await file.read()
file_result["size"] = len(contents)
# 如果是图片且需要处理
if file.content_type.startswith('image/') and process_images:
# 这里可以添加图片处理逻辑
# 例如:调整大小、压缩、格式转换等
file_result["processed"] = True
file_result["format"] = file.content_type.split('/')[-1].upper()
else:
file_result["processed"] = False
results.append(file_result)
return {
"total_uploaded": len(results),
"processed": sum(1 for r in results if r.get("processed", False)),
"files": results
}#文件保存与管理
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.responses import FileResponse
import aiofiles
import os
from pathlib import Path
import uuid
from datetime import datetime
app = FastAPI()
# 创建上传目录
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
class FileManager:
"""文件管理器"""
@staticmethod
def generate_unique_filename(original_filename: str) -> str:
"""生成唯一文件名"""
name, ext = os.path.splitext(original_filename)
unique_name = f"{uuid.uuid4()}_{name}{ext}"
return unique_name
@staticmethod
def validate_file_type(content_type: str, allowed_types: List[str]) -> bool:
"""验证文件类型"""
return content_type in allowed_types
@staticmethod
def validate_file_size(size: int, max_size: int) -> bool:
"""验证文件大小"""
return size <= max_size
@app.post("/save-upload/")
async def save_upload_file(
file: UploadFile = File(...),
description: str = ""
):
"""
保存上传的文件
将文件保存到服务器并返回访问信息
"""
# 生成唯一文件名
unique_filename = FileManager.generate_unique_filename(file.filename)
file_path = UPLOAD_DIR / unique_filename
# 验证文件类型
allowed_types = [
"image/jpeg", "image/png", "image/gif",
"application/pdf", "text/plain"
]
if not FileManager.validate_file_type(file.content_type, allowed_types):
raise HTTPException(
status_code=400,
detail=f"不支持的文件类型: {file.content_type}"
)
# 读取并保存文件
async with aiofiles.open(file_path, 'wb') as buffer:
content = await file.read()
# 验证文件大小 (5MB限制)
max_size = 5 * 1024 * 1024
if not FileManager.validate_file_size(len(content), max_size):
raise HTTPException(
status_code=400,
detail=f"文件过大,最大允许 {max_size} 字节"
)
await buffer.write(content)
# 返回文件信息
return {
"filename": file.filename,
"unique_filename": unique_filename,
"saved_path": str(file_path),
"content_type": file.content_type,
"size": len(content),
"description": description,
"upload_time": datetime.utcnow().isoformat(),
"download_url": f"/download/{unique_filename}"
}
@app.get("/download/{filename}")
async def download_file(filename: str):
"""下载文件"""
file_path = UPLOAD_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
return FileResponse(
path=file_path,
filename=file_path.name,
media_type='application/octet-stream'
)
@app.delete("/delete/{filename}")
async def delete_file(filename: str):
"""删除文件"""
file_path = UPLOAD_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="文件不存在")
try:
file_path.unlink()
return {"message": f"文件 {filename} 已删除"}
except Exception as e:
raise HTTPException(status_code=500, detail=f"删除文件失败: {str(e)}")#表单数据处理
#基础表单处理
from fastapi import FastAPI, Form
from pydantic import BaseModel, Field
from typing import Optional
app = FastAPI()
@app.post("/login/")
async def login(username: str = Form(...), password: str = Form(...)):
"""
基础表单登录
处理用户名和密码表单数据
"""
return {
"username": username,
"message": "登录请求接收成功"
}
class UserRegistrationForm(BaseModel):
"""用户注册表单模型"""
username: str = Field(..., min_length=3, max_length=50)
email: str = Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
password: str = Field(..., min_length=8)
full_name: Optional[str] = Field(None, max_length=100)
age: Optional[int] = Field(None, ge=13, le=120)
@app.post("/register-with-model/")
async def register_with_model(
username: str = Form(...),
email: str = Form(...),
password: str = Form(...),
full_name: str = Form(None),
age: int = Form(None)
):
"""
使用模型验证的表单处理
"""
# 创建表单数据字典
form_data = {
"username": username,
"email": email,
"password": password,
"full_name": full_name,
"age": age
}
# 使用模型验证
user_form = UserRegistrationForm(**{k: v for k, v in form_data.items() if v is not None})
return {
"user_data": user_form.model_dump(),
"message": "注册表单处理成功"
}#复杂表单处理
from fastapi import FastAPI, Form
from typing import List
import json
app = FastAPI()
@app.post("/complex-form/")
async def complex_form(
name: str = Form(...),
email: str = Form(...),
age: int = Form(..., ge=0, le=150),
subscribe_newsletter: bool = Form(default=False),
favorite_colors: List[str] = Form(...),
preferences_json: str = Form(...)
):
"""
复杂表单处理
演示多种表单字段类型
"""
try:
# 解析JSON格式的偏好设置
preferences = json.loads(preferences_json)
except json.JSONDecodeError:
preferences = {}
return {
"name": name,
"email": email,
"age": age,
"subscribe_newsletter": subscribe_newsletter,
"favorite_colors": favorite_colors,
"preferences": preferences,
"message": "复杂表单处理成功"
}
@app.post("/survey-form/")
async def survey_form(
q1_satisfaction: int = Form(..., ge=1, le=5),
q2_recommend: bool = Form(...),
q3_improvements: str = Form(...),
q4_features: List[str] = Form(...),
q5_comments: str = Form(None)
):
"""
调查表单处理
演示调查问卷类型的表单
"""
return {
"satisfaction_rating": q1_satisfaction,
"would_recommend": q2_recommend,
"improvement_suggestions": q1_improvements,
"desired_features": q4_features,
"additional_comments": q5_comments,
"message": "调查表单提交成功"
}#嵌套模型与复杂数据
#嵌套请求模型
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
from datetime import datetime
class Address(BaseModel):
"""地址模型"""
street: str = Field(..., min_length=5, max_length=100)
city: str = Field(..., min_length=2, max_length=50)
state: str = Field(..., min_length=2, max_length=50)
country: str = Field(..., min_length=2, max_length=50)
postal_code: str = Field(..., regex=r"^[0-9]{5}(-[0-9]{4})?$")
class ContactInfo(BaseModel):
"""联系信息模型"""
email: str = Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
phone: str = Field(..., regex=r"^\+?1?[-.\s]?\(?(\d{3})\)?[-.\s]?(\d{3})[-.\s]?(\d{4})$")
secondary_phone: Optional[str] = Field(None, regex=r"^\+?1?[-.\s]?\(?(\d{3})\)?[-.\s]?(\d{3})[-.\s]?(\d{4})$")
class OrderItem(BaseModel):
"""订单项目模型"""
product_id: int = Field(..., gt=0)
product_name: str = Field(..., min_length=1, max_length=200)
quantity: int = Field(..., gt=0, le=1000)
unit_price: float = Field(..., gt=0)
total_price: float = Field(..., gt=0)
def __init__(self, **data):
super().__init__(**data)
# 自动计算总价(如果未提供)
if 'total_price' not in data or data['total_price'] is None:
self.total_price = self.quantity * self.unit_price
class CustomerOrder(BaseModel):
"""客户订单模型(嵌套模型示例)"""
order_number: str = Field(..., regex=r"^ORD\d{8}$", description="订单号格式:ORD+8位数字")
customer_name: str = Field(..., min_length=2, max_length=100)
customer_email: str = Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
billing_address: Address
shipping_address: Optional[Address] = None
contact_info: ContactInfo
items: List[OrderItem] = Field(..., min_items=1, max_items=100)
shipping_method: str = Field(default="standard", regex=r"^(standard|express|overnight)$")
special_instructions: Optional[str] = Field(None, max_length=500)
metadata: Dict[str, str] = Field(default_factory=dict)
created_at: datetime = Field(default_factory=datetime.utcnow)
def __init__(self, **data):
super().__init__(**data)
# 如果未提供收货地址,使用账单地址
if not self.shipping_address:
self.shipping_address = self.billing_address
@property
def subtotal(self) -> float:
"""计算小计"""
return sum(item.total_price for item in self.items)
@property
def total_amount(self) -> float:
"""计算总额(含运费)"""
shipping_cost = 10.00 if self.shipping_method == "standard" else 20.00
return self.subtotal + shipping_cost
# 创建嵌套模型实例
billing_addr = Address(
street="123 Main St",
city="New York",
state="NY",
country="USA",
postal_code="10001"
)
contact = ContactInfo(
email="customer@example.com",
phone="+1-555-123-4567"
)
order_items = [
OrderItem(
product_id=1,
product_name="Laptop",
quantity=1,
unit_price=999.99
),
OrderItem(
product_id=2,
product_name="Mouse",
quantity=2,
unit_price=29.99
)
]
customer_order = CustomerOrder(
order_number="ORD2024010001",
customer_name="John Smith",
customer_email="customer@example.com",
billing_address=billing_addr,
contact_info=contact,
items=order_items,
metadata={"source": "website", "campaign": "summer_sale"}
)
print("嵌套模型示例:", customer_order.model_dump())
print(f"订单小计: ${customer_order.subtotal:.2f}")
print(f"订单总额: ${customer_order.total_amount:.2f}")#动态模型构建
from pydantic import BaseModel, create_model
from typing import Optional, Union, Dict, Any
import inspect
def create_dynamic_model(model_name: str, fields: Dict[str, tuple]):
"""
动态创建Pydantic模型
fields: {字段名: (类型, 默认值或Field对象)}
"""
field_definitions = {}
for field_name, (field_type, field_default) in fields.items():
if isinstance(field_default, Field):
field_definitions[field_name] = (field_type, field_default)
else:
field_definitions[field_name] = (field_type, field_default)
dynamic_model = create_model(model_name, **field_definitions)
return dynamic_model
# 示例:创建用户配置模型
UserConfigFields = {
"username": (str, Field(..., min_length=3, max_length=50)),
"email": (str, Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")),
"age": (Optional[int], Field(None, ge=13, le=120)),
"preferences": (Dict[str, Any], Field(default_factory=dict)),
"is_active": (bool, True)
}
DynamicUserConfig = create_dynamic_model("DynamicUserConfig", UserConfigFields)
# 验证动态模型
try:
user_config = DynamicUserConfig(
username="johndoe",
email="john@example.com",
age=25,
preferences={"theme": "dark", "language": "en"},
is_active=True
)
print("动态模型创建成功:", user_config.model_dump())
except Exception as e:
print(f"动态模型创建失败: {e}")
# 更复杂的动态模型示例
def create_api_request_model(endpoint: str, required_fields: Dict[str, type], optional_fields: Dict[str, tuple] = None):
"""
为特定API端点创建请求模型
"""
fields = {}
# 添加必需字段
for field_name, field_type in required_fields.items():
fields[field_name] = (field_type, ...)
# 添加可选字段
if optional_fields:
for field_name, (field_type, default_value) in optional_fields.items():
fields[field_name] = (field_type, default_value)
model_name = f"{endpoint.title()}Request"
return create_dynamic_model(model_name, fields)
# 为用户API创建动态模型
UserApiRequest = create_api_request_model(
endpoint="user",
required_fields={
"action": str,
"user_data": dict
},
optional_fields={
"validate_only": (bool, False),
"return_fields": (list, [])
}
)
api_request = UserApiRequest(
action="create",
user_data={"name": "John", "email": "john@example.com"},
validate_only=True
)
print("API请求模型:", api_request.model_dump())#递归模型
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
class TreeNode(BaseModel):
"""树节点模型(递归模型示例)"""
id: int
name: str
value: Optional[str] = None
children: List['TreeNode'] = Field(default_factory=list)
parent_id: Optional[int] = None
level: int = 0
created_at: datetime = Field(default_factory=datetime.utcnow)
def add_child(self, child_node: 'TreeNode'):
"""添加子节点"""
child_node.parent_id = self.id
child_node.level = self.level + 1
self.children.append(child_node)
def find_node(self, node_id: int) -> Optional['TreeNode']:
"""查找节点"""
if self.id == node_id:
return self
for child in self.children:
found = child.find_node(node_id)
if found:
return found
return None
def to_dict(self) -> dict:
"""转换为字典(带层级信息)"""
result = self.model_dump()
result['children'] = [child.to_dict() for child in self.children]
return result
# 修复向前引用
TreeNode.update_forward_refs()
# 创建树结构
root = TreeNode(id=1, name="Root", value="root_value")
child1 = TreeNode(id=2, name="Child 1", value="child1_value")
child2 = TreeNode(id=3, name="Child 2", value="child2_value")
grandchild1 = TreeNode(id=4, name="Grandchild 1", value="gc1_value")
grandchild2 = TreeNode(id=5, name="Grandchild 2", value="gc2_value")
# 构建树
root.add_child(child1)
root.add_child(child2)
child1.add_child(grandchild1)
child1.add_child(grandchild2)
print("树结构:", root.to_dict())
# 查找节点
found_node = root.find_node(4)
if found_node:
print(f"找到节点: {found_node.name} (ID: {found_node.id}, Level: {found_node.level})")#响应序列化与格式化
#自定义序列化
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
import json
class CustomSerializableModel(BaseModel):
"""自定义序列化的模型"""
id: int
name: str
sensitive_data: str = Field(repr=False) # 不在repr中显示
private_info: str = Field(exclude=True) # 序列化时排除
created_at: datetime = Field(default_factory=datetime.utcnow)
metadata: dict = Field(default_factory=dict)
class Config:
json_encoders = {
datetime: lambda dt: dt.isoformat(),
}
def model_dump(
self,
*,
mode='python',
include=None,
exclude=None,
context=None,
by_alias: bool = False,
exclude_unset: bool = False,
exclude_defaults: bool = False,
exclude_none: bool = False,
round_trip: bool = False,
warnings: bool = True,
):
"""自定义序列化方法"""
# 调用父类方法
data = super().model_dump(
mode=mode,
include=include,
exclude=exclude,
context=context,
by_alias=by_alias,
exclude_unset=exclude_unset,
exclude_defaults=exclude_defaults,
exclude_none=exclude_none,
round_trip=round_trip,
warnings=warnings
)
# 添加自定义处理
if 'created_at' in data and isinstance(data['created_at'], datetime):
# 添加时间戳
data['timestamp'] = int(data['created_at'].timestamp())
# 格式化时间为友好显示
data['created_at_formatted'] = data['created_at'].strftime('%Y-%m-%d %H:%M:%S')
# 处理元数据
if 'metadata' in data and isinstance(data['metadata'], dict):
# 添加元数据统计
data['metadata_count'] = len(data['metadata'])
return data
def to_api_response(self, include_sensitive: bool = False):
"""为API定制的序列化方法"""
exclude_fields = set()
if not include_sensitive:
exclude_fields.add('sensitive_data')
data = self.model_dump(exclude=exclude_fields)
# API特定的格式化
data['api_format'] = True
data['version'] = '1.0'
return data
# 测试自定义序列化
custom_model = CustomSerializableModel(
id=1,
name="Test Item",
sensitive_data="secret information",
private_info="private data not serialized",
metadata={"category": "test", "priority": "high"}
)
print("自定义序列化:", custom_model.model_dump())
print("API响应格式:", custom_model.to_api_response(include_sensitive=False))
print("API响应格式(含敏感数据):", custom_model.to_api_response(include_sensitive=True))#JSON序列化优化
from pydantic import BaseModel, Field
from typing import Optional, List
from datetime import datetime
from decimal import Decimal
import json
class JsonOptimizedModel(BaseModel):
"""JSON优化的模型"""
id: int
name: str
price: Decimal = Field(decimal_places=2) # 精确的小数处理
tags: List[str] = Field(default_factory=list)
metadata: dict = Field(default_factory=dict)
created_at: datetime
rating: float = Field(ge=0.0, le=5.0)
class Config:
json_encoders = {
datetime: lambda dt: dt.isoformat(),
Decimal: lambda d: float(d), # 或者保持为字符串以保持精度
}
def model_dump_json(self, **kwargs):
"""优化的JSON序列化"""
# 先转换为字典
data = self.model_dump()
# 自定义JSON编码
return json.dumps(
data,
ensure_ascii=False, # 支持中文
separators=(',', ':'), # 紧凑格式
default=str # 处理不可序列化对象
)
@classmethod
def from_json_string(cls, json_str: str):
"""从JSON字符串创建模型实例"""
data = json.loads(json_str)
return cls(**data)
# 测试JSON优化
json_model = JsonOptimizedModel(
id=1,
name="Premium Product",
price=Decimal("1299.99"),
tags=["premium", "featured"],
metadata={"color": "red", "size": "large"},
created_at=datetime.utcnow(),
rating=4.5
)
json_string = json_model.model_dump_json()
print("优化JSON:", json_string)
# 从JSON字符串重建
rebuilt_model = JsonOptimizedModel.from_json_string(json_string)
print("重建模型:", rebuilt_model.model_dump())#响应格式化工具
from pydantic import BaseModel
from typing import Any, Dict, Optional, List
from datetime import datetime
import time
class ResponseFormatter:
"""响应格式化工具类"""
@staticmethod
def format_success(data: Any = None, message: str = "Success", **kwargs) -> Dict[str, Any]:
"""格式化成功响应"""
response = {
"success": True,
"message": message,
"data": data,
"timestamp": datetime.utcnow().isoformat(),
"status_code": 200
}
response.update(kwargs)
return response
@staticmethod
def format_error(message: str, error_code: str = "GENERIC_ERROR", status_code: int = 400, **kwargs) -> Dict[str, Any]:
"""格式化错误响应"""
response = {
"success": False,
"message": message,
"error_code": error_code,
"timestamp": datetime.utcnow().isoformat(),
"status_code": status_code
}
response.update(kwargs)
return response
@staticmethod
def format_paged_response(
data: List[Any],
total: int,
page: int = 1,
size: int = 10,
message: str = "Success"
) -> Dict[str, Any]:
"""格式化分页响应"""
pages = (total + size - 1) // size # 向上取整
return {
"success": True,
"message": message,
"data": data,
"pagination": {
"total": total,
"page": page,
"size": size,
"pages": pages,
"has_next": page < pages,
"has_prev": page > 1
},
"timestamp": datetime.utcnow().isoformat()
}
@staticmethod
def benchmark_function(func, *args, **kwargs):
"""函数执行时间基准测试"""
start_time = time.time()
result = func(*args, **kwargs)
end_time = time.time()
return {
"result": result,
"execution_time": f"{execution_time:.4f}s",
"execution_time_ms": f"{execution_time * 1000:.2f}ms"
}
# 使用响应格式化工具
formatter = ResponseFormatter()
# 成功响应示例
success_resp = formatter.format_success(
data={"user_id": 123, "username": "john_doe"},
message="用户创建成功",
request_id="req_12345"
)
print("成功响应:", success_resp)
# 错误响应示例
error_resp = formatter.format_error(
message="用户不存在",
error_code="USER_NOT_FOUND",
status_code=404,
suggested_action="请检查用户ID是否正确"
)
print("错误响应:", error_resp)
# 分页响应示例
paged_resp = formatter.format_paged_response(
data=[{"id": 1, "name": "Item 1"}, {"id": 2, "name": "Item 2"}],
total=100,
page=1,
size=10,
message="项目列表获取成功"
)
print("分页响应:", paged_resp)#错误处理与调试
#请求体验证错误处理
from fastapi import FastAPI, Request, HTTPException
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field, ValidationError
from typing import Optional
import logging
from datetime import datetime
app = FastAPI()
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ValidatedInput(BaseModel):
"""带验证的输入模型"""
name: str = Field(..., min_length=2, max_length=50)
email: str = Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
age: int = Field(..., ge=0, le=150)
rating: Optional[float] = Field(None, ge=0.0, le=5.0)
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
"""处理请求验证错误"""
logger.error(f"验证错误: {exc}")
# 格式化错误信息
errors = []
for error in exc.errors():
errors.append({
"field": " -> ".join(str(loc) for loc in error['loc']),
"message": error['msg'],
"type": error['type'],
"input": error.get('input'),
"ctx": error.get('ctx', {})
})
return JSONResponse(
status_code=422,
content={
"success": False,
"message": "请求数据验证失败",
"error_code": "VALIDATION_ERROR",
"details": errors,
"timestamp": datetime.utcnow().isoformat()
}
)
@app.exception_handler(ValidationError)
async def pydantic_validation_handler(request: Request, exc: ValidationError):
"""处理Pydantic验证错误"""
logger.error(f"Pydantic验证错误: {exc}")
errors = []
for error in exc.errors():
errors.append({
"field": " -> ".join(str(loc) for loc in error['loc']),
"message": error['msg'],
"type": error['type']
})
return JSONResponse(
status_code=422,
content={
"success": False,
"message": "数据验证失败",
"error_code": "PYDANTIC_VALIDATION_ERROR",
"details": errors
}
)
@app.post("/validated-input/")
async def handle_validated_input(input_data: ValidatedInput):
"""处理带验证的输入"""
return {
"success": True,
"data": input_data.model_dump(),
"message": "输入验证成功"
}
# 测试验证错误处理
# 可以发送无效数据来测试错误处理,例如:
# {
# "name": "A", # 太短
# "email": "invalid-email", # 无效邮箱
# "age": 200 # 超出范围
# }#调试工具和中间件
from fastapi import FastAPI, Request, Response
from fastapi.responses import JSONResponse
import time
import json
from typing import Callable, Awaitable
app = FastAPI()
class DebugMiddleware:
"""调试中间件"""
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
return await self.app(scope, receive, send)
request = Request(scope, receive)
# 记录请求信息
start_time = time.time()
request_body = b""
# 读取请求体(注意:这会影响后续的请求体读取)
# 在实际应用中,你可能需要更谨慎地处理这个
# 处理请求
async def send_wrapper(message):
if message["type"] == "http.response.start":
# 添加调试头部
headers = message.get("headers", [])
headers.append([b"x-response-time", f"{time.time() - start_time:.4f}s".encode()])
message["headers"] = headers
await send(message)
try:
# 记录请求
print(f"→ {request.method} {request.url.path}")
print(f" Headers: {dict(request.headers)}")
# 处理响应
response = await self.app(scope, receive, send_wrapper)
return response
except Exception as e:
print(f"✗ 请求处理错误: {e}")
raise
# 添加调试中间件
# app.add_middleware(DebugMiddleware)
def create_debug_endpoint(app: FastAPI):
"""创建调试端点"""
@app.get("/debug/info")
async def debug_info():
"""调试信息端点"""
import sys
import platform
return {
"fastapi_version": "2.0.0", # 这里应该动态获取
"python_version": platform.python_version(),
"platform": platform.platform(),
"sys_info": {
"argv": sys.argv,
"executable": sys.executable,
},
"timestamp": datetime.utcnow().isoformat()
}
@app.post("/debug/echo")
async def debug_echo(request: Request):
"""回声端点,返回接收到的所有信息"""
body = await request.body()
return {
"method": request.method,
"url": str(request.url),
"headers": dict(request.headers),
"query_params": dict(request.query_params),
"path_params": request.path_params,
"body": body.decode('utf-8') if body else None,
"timestamp": datetime.utcnow().isoformat()
}
# 创建调试端点
create_debug_endpoint(app)#高级错误处理
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from typing import Optional
import traceback
from enum import Enum
from datetime import datetime
app = FastAPI()
class ErrorCode(str, Enum):
"""错误代码枚举"""
VALIDATION_ERROR = "VALIDATION_ERROR"
BUSINESS_LOGIC_ERROR = "BUSINESS_LOGIC_ERROR"
RESOURCE_NOT_FOUND = "RESOURCE_NOT_FOUND"
PERMISSION_DENIED = "PERMISSION_DENIED"
INTERNAL_ERROR = "INTERNAL_ERROR"
RATE_LIMIT_EXCEEDED = "RATE_LIMIT_EXCEEDED"
class ErrorResponse(BaseModel):
"""错误响应模型"""
success: bool = False
error_code: ErrorCode
message: str
details: Optional[dict] = None
timestamp: str
trace_id: Optional[str] = None
def create_error_response(
error_code: ErrorCode,
message: str,
details: Optional[dict] = None,
trace_id: Optional[str] = None
) -> JSONResponse:
"""创建错误响应"""
error_resp = ErrorResponse(
error_code=error_code,
message=message,
details=details,
timestamp=datetime.utcnow().isoformat(),
trace_id=trace_id
)
status_codes = {
ErrorCode.VALIDATION_ERROR: 422,
ErrorCode.RESOURCE_NOT_FOUND: 404,
ErrorCode.PERMISSION_DENIED: 403,
ErrorCode.RATE_LIMIT_EXCEEDED: 429,
ErrorCode.INTERNAL_ERROR: 500,
ErrorCode.BUSINESS_LOGIC_ERROR: 400
}
return JSONResponse(
status_code=status_codes.get(error_code, 500),
content=error_resp.model_dump()
)
@app.exception_handler(HTTPException)
async def http_exception_handler(request: Request, exc: HTTPException):
"""处理HTTP异常"""
error_map = {
400: ErrorCode.BUSINESS_LOGIC_ERROR,
401: ErrorCode.PERMISSION_DENIED,
403: ErrorCode.PERMISSION_DENIED,
404: ErrorCode.RESOURCE_NOT_FOUND,
422: ErrorCode.VALIDATION_ERROR,
429: ErrorCode.RATE_LIMIT_EXCEEDED,
500: ErrorCode.INTERNAL_ERROR
}
error_code = error_map.get(exc.status_code, ErrorCode.INTERNAL_ERROR)
return create_error_response(
error_code=error_code,
message=exc.detail if exc.detail else "HTTP错误",
details={"status_code": exc.status_code}
)
@app.exception_handler(Exception)
async def general_exception_handler(request: Request, exc: Exception):
"""处理一般异常"""
# 记录错误追踪
error_trace = traceback.format_exc()
print(f"未处理的异常: {exc}\n{error_trace}")
return create_error_response(
error_code=ErrorCode.INTERNAL_ERROR,
message="服务器内部错误",
details={
"error_type": type(exc).__name__,
"message": str(exc)
}
)
## 性能优化技巧 \{#性能优化技巧}
```python
import asyncio
from typing import List
from pydantic import BaseModel, Field
from concurrent.futures import ThreadPoolExecutor
import time
class PerformanceOptimization:
"""性能优化技巧"""
@staticmethod
async def batch_process_requests(requests: List[dict], model_class) -> List:
"""批量处理请求"""
tasks = []
for req_data in requests:
# 使用asyncio.create_task进行并发处理
task = asyncio.create_task(PerformanceOptimization.process_single_request(req_data, model_class))
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
@staticmethod
async def process_single_request(data: dict, model_class):
"""处理单个请求"""
try:
# 验证数据
validated_model = model_class(**data)
# 模拟处理时间
await asyncio.sleep(0.01) # 模拟IO操作
return validated_model
except Exception as e:
return {"error": str(e), "data": data}
@staticmethod
def process_requests_sync(requests: List[dict], model_class, max_workers: int = 4):
"""同步批量处理(使用线程池)"""
def process_request(data):
try:
return model_class(**data)
except Exception as e:
return {"error": str(e), "data": data}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(process_request, requests))
return results
# 示例:性能测试模型
class PerformanceTestModel(BaseModel):
id: int = Field(..., gt=0)
name: str = Field(..., min_length=1, max_length=100)
value: float = Field(..., gt=0)
category: str = Field(..., regex=r"^[a-zA-Z_][a-zA-Z0-9_]*$")
# 测试性能优化
async def performance_test():
"""性能测试"""
# 准备测试数据
test_data = []
for i in range(100):
test_data.append({
"id": i + 1,
"name": f"Item_{i}",
"value": float(i * 10),
"category": "test_category"
})
print("开始性能测试...")
# 测试异步批量处理
start_time = time.time()
async_results = await PerformanceOptimization.batch_process_requests(test_data, PerformanceTestModel)
async_time = time.time() - start_time
print(f"异步批量处理时间: {async_time:.4f}秒")
print(f"成功处理: {sum(1 for r in async_results if not isinstance(r, dict) or 'error' not in r)}")
# 测试同步批量处理
start_time = time.time()
sync_results = PerformanceOptimization.process_requests_sync(test_data, PerformanceTestModel)
sync_time = time.time() - start_time
print(f"同步批量处理时间: {sync_time:.4f}秒")
print(f"成功处理: {sum(1 for r in sync_results if not isinstance(r, dict) or 'error' not in r)}")#实际应用案例
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel, Field
from typing import Optional, List, Dict
import uuid
from datetime import datetime
app = FastAPI()
class OrderItem(BaseModel):
"""订单项目"""
product_id: int
quantity: int
unit_price: float
class OrderRequest(BaseModel):
"""订单请求"""
customer_id: int
items: List[OrderItem]
shipping_address: str
payment_method: str = "credit_card"
notes: Optional[str] = None
class OrderResponse(BaseModel):
"""订单响应"""
order_id: str
customer_id: int
total_amount: float
status: str = "processing"
created_at: datetime = Field(default_factory=datetime.utcnow)
estimated_delivery: datetime
class NotificationService:
"""通知服务"""
@staticmethod
async def send_order_confirmation(order_id: str, customer_id: int):
"""发送订单确认通知"""
print(f"发送订单确认: 订单ID {order_id}, 客户ID {customer_id}")
# 模拟异步通知发送
await asyncio.sleep(0.1)
print(f"订单确认已发送: {order_id}")
@app.post("/orders/", response_model=OrderResponse)
async def create_order(
order_request: OrderRequest,
background_tasks: BackgroundTasks
):
"""创建订单的实际应用案例"""
# 生成订单ID
order_id = f"ORD-{uuid.uuid4().hex[:8].upper()}"
# 计算总金额
total_amount = sum(item.quantity * item.unit_price for item in order_request.items)
# 估算配送时间(2-5个工作日)
estimated_delivery = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
estimated_delivery = estimated_delivery.add(days=3) # 加3天
# 创建响应
response = OrderResponse(
order_id=order_id,
customer_id=order_request.customer_id,
total_amount=total_amount,
estimated_delivery=estimated_delivery
)
# 添加后台任务发送通知
background_tasks.add_task(
NotificationService.send_order_confirmation,
order_id,
order_request.customer_id
)
# 这里可以添加更多的业务逻辑
# 如:库存检查、支付处理、订单保存到数据库等
return response
class UserRegistrationRequest(BaseModel):
"""用户注册请求"""
username: str = Field(..., min_length=3, max_length=50, regex=r"^[a-zA-Z0-9_]+$")
email: str = Field(..., regex=r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$")
password: str = Field(..., min_length=8)
full_name: str = Field(..., min_length=2, max_length=100)
age: Optional[int] = Field(None, ge=13, le=120)
terms_accepted: bool = Field(..., description="必须接受条款")
class UserRegistrationResponse(BaseModel):
"""用户注册响应"""
user_id: int
username: str
email: str
registration_date: datetime = Field(default_factory=datetime.utcnow)
verification_required: bool = True
welcome_bonus: Optional[str] = "100积分"
@app.post("/register/", response_model=UserRegistrationResponse)
async def register_user(
user_request: UserRegistrationRequest,
background_tasks: BackgroundTasks
):
"""用户注册的实际应用案例"""
# 这里通常是保存到数据库的逻辑
# 模拟生成用户ID
user_id = int(datetime.utcnow().timestamp() * 1000) % 1000000
# 创建响应
response = UserRegistrationResponse(
user_id=user_id,
username=user_request.username,
email=user_request.email,
verification_required=True
)
# 添加后台任务发送验证邮件
background_tasks.add_task(send_verification_email, user_request.email, user_id)
# 添加欢迎奖励
if user_request.age and user_request.age < 25:
response.welcome_bonus = "200积分(青年优惠)"
return response
async def send_verification_email(email: str, user_id: int):
"""发送验证邮件的后台任务"""
print(f"发送验证邮件到: {email}, 用户ID: {user_id}")
await asyncio.sleep(0.5) # 模拟邮件发送时间
print(f"验证邮件已发送: {email}")
# 数据验证的实际应用
class APISpecification(BaseModel):
"""API规范模型"""
endpoint: str = Field(..., regex=r"^/[a-zA-Z0-9/_-]+$")
method: str = Field(..., regex=r"^(GET|POST|PUT|DELETE|PATCH)$")
description: str = Field(..., min_length=10, max_length=500)
request_model: Optional[Dict] = None
response_model: Optional[Dict] = None
auth_required: bool = True
rate_limit: int = Field(default=100, ge=1, le=10000)
tags: List[str] = Field(default_factory=list, max_items=10)
@app.post("/api-spec/", response_model=dict)
async def create_api_spec(spec: APISpecification):
"""创建API规范的实际应用"""
# 验证API规范的有效性
if spec.auth_required and 'auth' not in spec.tags:
spec.tags.append('auth')
# 模拟保存API规范
spec_id = f"SPEC-{uuid.uuid4().hex[:8]}"
return {
"spec_id": spec_id,
"endpoint": spec.endpoint,
"method": spec.method,
"status": "created",
"validation_passed": True
}#相关教程
#总结
FastAPI的请求体处理功能提供了:
- 基础处理:简单和复杂的数据模型定义
- 验证机制:内置和自定义验证规则
- 嵌套模型:处理复杂的嵌套数据结构
- 动态模型:根据需求动态创建模型
- 递归模型:处理树形结构数据
- 序列化优化:自定义数据序列化格式
- 错误处理:完善的错误处理和调试机制
- 性能优化:高效的处理和验证机制
- 实际应用:真实世界的使用案例
掌握这些技巧可以帮助您构建更加健壮、高效的FastAPI应用程序。

