FastAPI密码哈希与安全实践完全指南

📂 所属阶段:第四阶段 — 安全与认证(安全篇)
🔗 相关章节:FastAPI OAuth2与JWT鉴权 · FastAPI依赖注入系统

目录

密码安全基础

密码存储的核心原则

密码安全是Web应用程序的基石。错误的密码存储方式可能导致严重的安全漏洞,使用户数据暴露在风险之下。以下是密码存储的核心原则:

  1. 绝不存储明文密码:明文存储密码是最严重的安全错误
  2. 使用强哈希算法:如bcrypt、Argon2等慢哈希算法
  3. 每个用户使用唯一随机盐:防止彩虹表攻击
  4. 定期更新哈希算法:随着计算能力提升,及时升级算法

错误的密码存储方式

# ❌ 绝对禁止!明文存储密码
def bad_register(email: str, password: str):
    """危险:明文存储密码"""
    # 这是极其危险的做法
    query = f"INSERT INTO users (email, password) VALUES ('{email}', '{password}')"
    # ... 执行查询
    return "用户注册成功"

# ❌ 禁止!简单哈希(MD5/SHA1 可被彩虹表破解)
import hashlib

def weak_hash_password(password: str) -> str:
    """脆弱的哈希方式"""
    return hashlib.md5(password.encode()).hexdigest()

# ❌ 禁止!固定盐值哈希(所有用户同一个盐)
SALT = "myapp_salt"  # 固定盐值是安全隐患

def fixed_salt_hash(password: str) -> str:
    """使用固定盐值的哈希"""
    return hashlib.sha256((password + SALT).encode()).hexdigest()

# ❌ 禁止!可预测的盐值
def predictable_salt_hash(password: str, user_id: int) -> str:
    """使用可预测盐值的哈希"""
    salt = f"salt_{user_id}"  # 可预测的盐值
    return hashlib.sha256((password + salt).encode()).hexdigest()

正确的密码存储方式

# ✅ 正确:使用bcrypt进行安全哈希
from passlib.context import CryptContext
import secrets

# 配置密码上下文
pwd_context = CryptContext(
    schemes=["bcrypt"], 
    deprecated="auto",
    bcrypt__rounds=12  # 哈希轮数,影响安全性
)

def secure_hash_password(password: str) -> str:
    """安全的密码哈希函数"""
    return pwd_context.hash(password)

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """验证密码"""
    return pwd_context.verify(plain_password, hashed_password)

密码哈希算法原理

哈希算法的基本概念

哈希算法是一种将任意长度的数据转换为固定长度字符串的算法。密码哈希算法具有以下特点:

  1. 单向性:从原数据可以轻松计算哈希值,但从哈希值无法反推出原数据
  2. 确定性:相同输入总是产生相同输出
  3. 雪崩效应:输入的微小变化会导致输出的巨大差异
  4. 抗碰撞性:很难找到两个不同的输入产生相同的输出

慢哈希算法的重要性

传统的哈希算法(如MD5、SHA-1、SHA-256)设计目标是快速计算,这在密码存储场景下是缺点。慢哈希算法故意设计得较慢,目的是:

  • 防止暴力破解:减慢攻击者的破解速度
  • 抵抗彩虹表攻击:彩虹表是预先计算的哈希值对照表
  • 适应硬件发展:随着计算能力提升,调整算法复杂度

盐值的作用

盐值(Salt)是一段随机数据,与密码一起进行哈希运算。盐值的作用:

import hashlib
import secrets

def hash_with_salt(password: str) -> tuple[str, str]:
    """使用随机盐值进行哈希"""
    # 生成随机盐值
    salt = secrets.token_hex(32)  # 32字节的随机盐值
    
    # 将密码和盐值组合后哈希
    combined = password + salt
    hashed = hashlib.sha256(combined.encode()).hexdigest()
    
    # 返回哈希值和盐值(需要存储两者)
    return hashed, salt

def verify_with_salt(password: str, stored_hash: str, stored_salt: str) -> bool:
    """使用盐值验证密码"""
    combined = password + stored_salt
    computed_hash = hashlib.sha256(combined.encode()).hexdigest()
    return computed_hash == stored_hash

# 示例:相同密码,不同盐值产生不同哈希
password = "mypassword123"
hash1, salt1 = hash_with_salt(password)
hash2, salt2 = hash_with_salt(password)

print(f"密码相同: {password}")
print(f"哈希1: {hash1}")
print(f"哈希2: {hash2}")
print(f"盐值1: {salt1}")
print(f"盐值2: {salt2}")
print(f"哈希相同: {hash1 == hash2}")  # False

Passlib密码库详解

Passlib安装与配置

# 安装Passlib及其bcrypt后端
pip install passlib[bcrypt] bcrypt

# 或者分别安装
pip install passlib
pip install bcrypt

Passlib基础用法

from passlib.context import CryptContext
from passlib.exc import UnknownHashError
import logging

# 创建密码上下文
pwd_context = CryptContext(
    schemes=["bcrypt"],           # 支持的哈希算法
    deprecated="auto",            # 自动标记过时算法
    bcrypt__min_rounds=12,        # bcrypt最小轮数
    bcrypt__max_rounds=14,        # bcrypt最大轮数
    bcrypt__default_rounds=12,    # 默认轮数
)

class PasswordManager:
    """密码管理器 - 封装密码哈希和验证逻辑"""
    
    def __init__(self, context: CryptContext = pwd_context):
        self.context = context
        self.logger = logging.getLogger(__name__)
    
    def hash_password(self, password: str) -> str:
        """哈希密码"""
        try:
            return self.context.hash(password)
        except Exception as e:
            self.logger.error(f"密码哈希失败: {e}")
            raise
    
    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """验证密码"""
        try:
            return self.context.verify(plain_password, hashed_password)
        except UnknownHashError:
            self.logger.warning("未知的哈希格式")
            return False
        except Exception as e:
            self.logger.error(f"密码验证失败: {e}")
            return False
    
    def needs_rehash(self, hashed_password: str) -> bool:
        """检查是否需要重新哈希(算法升级)"""
        return self.context.needs_update(hashed_password)
    
    def rehash_if_needed(self, password: str, hashed_password: str) -> str:
        """如果需要则重新哈希"""
        if self.needs_rehash(hashed_password):
            self.logger.info("密码哈希需要更新")
            return self.hash_password(password)
        return hashed_password

# 使用示例
password_manager = PasswordManager()

# 哈希密码
password = "MySecurePassword123!"
hashed = password_manager.hash_password(password)
print(f"哈希后的密码: {hashed}")

# 验证密码
is_valid = password_manager.verify_password(password, hashed)
print(f"密码验证结果: {is_valid}")

# 验证错误密码
wrong_is_valid = password_manager.verify_password("WrongPassword", hashed)
print(f"错误密码验证结果: {wrong_is_valid}")

多算法支持配置

# 配置支持多种哈希算法
multi_algo_context = CryptContext(
    schemes=["bcrypt", "argon2", "pbkdf2_sha256"],
    default="bcrypt",                    # 默认使用bcrypt
    deprecated=["pbkdf2_sha256"],        # pbkdf2_sha256为过时算法
    bcrypt__default_rounds=12,
    argon2__time_cost=3,
    argon2__memory_cost=65536,
    argon2__parallelism=4,
)

class MultiAlgorithmPasswordManager:
    """支持多种算法的密码管理器"""
    
    def __init__(self, context: CryptContext = multi_algo_context):
        self.context = context
    
    def hash_password(self, password: str, algorithm: str = "bcrypt") -> str:
        """使用指定算法哈希密码"""
        if algorithm not in self.context.schemes():
            raise ValueError(f"不支持的算法: {algorithm}")
        
        # 临时创建使用指定算法的上下文
        temp_context = CryptContext(schemes=[algorithm])
        return temp_context.hash(password)
    
    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """验证密码(自动识别算法)"""
        try:
            return self.context.verify(plain_password, hashed_password)
        except Exception:
            return False
    
    def get_algorithm_info(self, hashed_password: str) -> dict:
        """获取哈希值的算法信息"""
        try:
            info = self.context.identify(hashed_password)
            return {
                "algorithm": info,
                "is_deprecated": info in self.context.deprecated_schemes(),
                "needs_update": self.context.needs_update(hashed_password)
            }
        except Exception:
            return {"algorithm": "unknown", "error": "无法识别算法"}

# 使用示例
multi_manager = MultiAlgorithmPasswordManager()

# 使用不同算法哈希同一密码
password = "TestPassword123!"

bcrypt_hash = multi_manager.hash_password(password, "bcrypt")
argon2_hash = multi_manager.hash_password(password, "argon2")

print(f"bcrypt哈希: {bcrypt_hash}")
print(f"argon2哈希: {argon2_hash}")

# 验证不同算法的哈希
print(f"bcrypt验证: {multi_manager.verify_password(password, bcrypt_hash)}")
print(f"argon2验证: {multi_manager.verify_password(password, argon2_hash)}")

# 获取算法信息
info = multi_manager.get_algorithm_info(bcrypt_hash)
print(f"算法信息: {info}")

bcrypt算法深度解析

bcrypt算法原理

bcrypt是一种基于Blowfish加密算法的密码哈希函数,具有以下特点:

  1. 内置盐值:bcrypt自动生成并存储盐值
  2. 可调节的成本因子:控制哈希计算的复杂度
  3. 慢速计算:故意设计得较慢以抵抗暴力破解

bcrypt哈希格式解析

bcrypt生成的哈希值遵循特定格式:$2b$cost$salt+hash

import re

def parse_bcrypt_hash(hash_string: str) -> dict:
    """解析bcrypt哈希值的各个部分"""
    # bcrypt哈希格式: $2b$12$..................$..................
    #                |  ||  |<-- 22 chars salt -->|<-- 31 chars hash -->|
    #                |  |<-- cost factor -->
    #                |<-- algorithm identifier -->
    
    pattern = r'^\$2[aby]\$(\d+)\$([A-Za-z0-9./]{22})([A-Za-z0-9./]{31})$'
    match = re.match(pattern, hash_string)
    
    if not match:
        raise ValueError("无效的bcrypt哈希格式")
    
    cost_factor = int(match.group(1))
    salt = match.group(2)
    hash_part = match.group(3)
    
    return {
        "algorithm": "bcrypt",
        "cost_factor": cost_factor,
        "salt": salt,
        "hash": hash_part,
        "full_hash": hash_string
    }

# 示例解析
example_hash = "$2b$12$LQv3c1yqBWVHxkd0LHAkCOYz6TtxMQJqhN8/X4.Qjj8PsGqxjPyWa"
parsed = parse_bcrypt_hash(example_hash)
print(f"Parsed bcrypt hash: {parsed}")

bcrypt成本因子调整

from passlib.context import CryptContext
import time

class CostFactorOptimizer:
    """成本因子优化器 - 根据系统性能调整bcrypt成本因子"""
    
    def __init__(self, target_time: float = 0.1):  # 目标时间0.1秒
        self.target_time = target_time
    
    def benchmark_cost_factor(self, cost: int, password: str = "testpassword") -> float:
        """测试特定成本因子的哈希时间"""
        test_context = CryptContext(schemes=["bcrypt"], bcrypt__default_rounds=cost)
        
        start_time = time.time()
        test_context.hash(password)
        end_time = time.time()
        
        return end_time - start_time
    
    def find_optimal_cost(self, min_cost: int = 4, max_cost: int = 16) -> int:
        """找到最优成本因子"""
        for cost in range(max_cost, min_cost - 1, -1):
            avg_time = self.benchmark_cost_factor(cost)
            if avg_time <= self.target_time:
                return cost
        
        # 如果找不到合适成本因子,返回最小值
        return min_cost

# 使用示例
optimizer = CostFactorOptimizer(target_time=0.2)  # 目标0.2秒
optimal_cost = optimizer.find_optimal_cost()
print(f"推荐的bcrypt成本因子: {optimal_cost}")

# 配置使用推荐成本因子
optimized_context = CryptContext(
    schemes=["bcrypt"],
    deprecated="auto",
    bcrypt__default_rounds=optimal_cost
)

bcrypt性能优化

import asyncio
from concurrent.futures import ThreadPoolExecutor
from typing import List, Tuple

class AsyncPasswordManager:
    """异步密码管理器 - 优化大量密码操作的性能"""
    
    def __init__(self, context: CryptContext, max_workers: int = 4):
        self.context = context
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
    
    async def hash_password_async(self, password: str) -> str:
        """异步哈希密码"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor, 
            self.context.hash, 
            password
        )
    
    async def verify_password_async(self, plain_password: str, hashed_password: str) -> bool:
        """异步验证密码"""
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(
            self.executor,
            self.context.verify,
            plain_password,
            hashed_password
        )
    
    async def batch_verify_passwords(
        self, 
        credentials: List[Tuple[str, str]]
    ) -> List[bool]:
        """批量验证密码"""
        tasks = [
            self.verify_password_async(plain, hashed)
            for plain, hashed in credentials
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)

# 使用示例
async def demo_async_password_operations():
    async_manager = AsyncPasswordManager(pwd_context)
    
    # 异步哈希密码
    password = "MyAsyncPassword123!"
    hashed = await async_manager.hash_password_async(password)
    print(f"异步哈希结果: {hashed}")
    
    # 异步验证密码
    is_valid = await async_manager.verify_password_async(password, hashed)
    print(f"异步验证结果: {is_valid}")
    
    # 批量验证示例
    test_credentials = [
        ("correct_password", await async_manager.hash_password_async("correct_password")),
        ("wrong_password", await async_manager.hash_password_async("some_other_password")),
        ("another_password", await async_manager.hash_password_async("another_password")),
    ]
    
    results = await async_manager.batch_verify_passwords(test_credentials)
    print(f"批量验证结果: {results}")

# 运行异步示例
# asyncio.run(demo_async_password_operations())

密码强度验证与策略

密码强度验证器

import re
from typing import List, Optional
from pydantic import BaseModel, field_validator, ValidationError
from dataclasses import dataclass

@dataclass
class PasswordStrengthResult:
    """密码强度验证结果"""
    is_strong: bool
    score: int  # 0-100
    feedback: List[str]
    requirements_met: List[str]
    requirements_missing: List[str]

class PasswordValidator:
    """密码强度验证器"""
    
    def __init__(self):
        self.min_length = 8
        self.max_length = 128
        self.require_uppercase = True
        self.require_lowercase = True
        self.require_digits = True
        self.require_special_chars = True
        self.special_chars = "!@#$%^&*(),.?\":{}|<>"
        self.min_special_chars = 1
        self.max_consecutive_chars = 2  # 最大连续相同字符数
        self.banned_patterns = [
            r'(.)\1{2,}',  # 连续3个或更多相同字符
            r'(012|123|234|345|456|567|678|789|890)',  # 连续数字
            r'(abc|bcd|cde|def|efg|fgh|ghi|hij|ijk|jkl|klm|lmn|mno|nop|opq|pqr|qrs|rst|stu|tuv|uvw|vwx|wxy|xyz)',  # 连续字母
        ]
    
    def calculate_entropy(self, password: str) -> float:
        """计算密码熵值"""
        charset_size = 0
        if re.search(r'[a-z]', password):
            charset_size += 26
        if re.search(r'[A-Z]', password):
            charset_size += 26
        if re.search(r'\d', password):
            charset_size += 10
        if re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
            charset_size += len(self.special_chars)
        
        if charset_size == 0:
            return 0.0
        
        import math
        entropy = len(password) * math.log2(charset_size)
        return entropy
    
    def validate_password(self, password: str) -> PasswordStrengthResult:
        """验证密码强度"""
        feedback = []
        requirements_met = []
        requirements_missing = []
        
        # 长度检查
        if self.min_length <= len(password) <= self.max_length:
            requirements_met.append(f"长度符合要求 ({self.min_length}-{self.max_length}字符)")
        else:
            feedback.append(f"密码长度应在{self.min_length}-{self.max_length}字符之间")
            requirements_missing.append(f"长度不符合要求 ({len(password)}字符)")
        
        # 大写字母检查
        if self.require_uppercase:
            if re.search(r'[A-Z]', password):
                requirements_met.append("包含大写字母")
            else:
                feedback.append("密码必须包含至少一个大写字母")
                requirements_missing.append("缺少大写字母")
        
        # 小写字母检查
        if self.require_lowercase:
            if re.search(r'[a-z]', password):
                requirements_met.append("包含小写字母")
            else:
                feedback.append("密码必须包含至少一个小写字母")
                requirements_missing.append("缺少小写字母")
        
        # 数字检查
        if self.require_digits:
            if re.search(r'\d', password):
                requirements_met.append("包含数字")
            else:
                feedback.append("密码必须包含至少一个数字")
                requirements_missing.append("缺少数字")
        
        # 特殊字符检查
        if self.require_special_chars:
            special_count = sum(1 for c in password if c in self.special_chars)
            if special_count >= self.min_special_chars:
                requirements_met.append(f"包含特殊字符 (≥{self.min_special_chars}个)")
            else:
                feedback.append(f"密码必须包含至少{self.min_special_chars}个特殊字符")
                requirements_missing.append(f"缺少特殊字符 (仅{special_count}个)")
        
        # 重复字符检查
        consecutive_pattern = rf'(.)\1{{{self.max_consecutive_chars},}}'
        if re.search(consecutive_pattern, password):
            feedback.append(f"密码不应包含超过{self.max_consecutive_chars}个连续相同字符")
            requirements_missing.append("包含过多连续字符")
        
        # 禁用模式检查
        for pattern in self.banned_patterns:
            if re.search(pattern, password.lower()):
                feedback.append(f"密码不应包含常见模式")
                requirements_missing.append("包含禁用模式")
                break
        
        # 计算分数
        total_requirements = len(requirements_met) + len(requirements_missing)
        if total_requirements > 0:
            score = int((len(requirements_met) / total_requirements) * 100)
        else:
            score = 100  # 如果没有要求,则为满分
        
        # 考虑熵值
        entropy = self.calculate_entropy(password)
        if entropy < 30:
            score = max(0, score - 20)  # 低熵值扣分
        elif entropy > 60:
            score = min(100, score + 10)  # 高熵值加分
        
        is_strong = score >= 80  # 80分以上为强密码
        
        return PasswordStrengthResult(
            is_strong=is_strong,
            score=score,
            feedback=feedback,
            requirements_met=requirements_met,
            requirements_missing=requirements_missing
        )

# 使用示例
validator = PasswordValidator()

test_passwords = [
    "weak",           # 弱密码
    "StrongPass1!",   # 强密码
    "Password123",    # 中等强度
    "MyVerySecurePassword123!",  # 很强的密码
]

for pwd in test_passwords:
    result = validator.validate_password(pwd)
    print(f"密码: {pwd}")
    print(f"  强度: {'强' if result.is_strong else '弱'}")
    print(f"  分数: {result.score}/100")
    print(f"  满足要求: {len(result.requirements_met)}")
    print(f"  缺失要求: {len(result.requirements_missing)}")
    if result.feedback:
        print(f"  反馈: {', '.join(result.feedback)}")
    print()

Pydantic模型中的密码验证

from pydantic import BaseModel, field_validator, model_validator
from typing import Optional
from pydantic import EmailStr

class RegistrationRequest(BaseModel):
    """用户注册请求模型"""
    email: EmailStr
    password: str
    confirm_password: str
    first_name: str
    last_name: str
    phone: Optional[str] = None
    
    @field_validator("password")
    @classmethod
    def validate_password_strength(cls, v: str) -> str:
        """验证密码强度"""
        validator = PasswordValidator()
        result = validator.validate_password(v)
        
        if not result.is_strong:
            raise ValueError(
                f"密码强度不足 (得分: {result.score}/100). "
                f"建议: {', '.join(result.feedback) if result.feedback else '请使用更复杂的密码'}"
            )
        
        return v
    
    @field_validator("confirm_password")
    @classmethod
    def passwords_match(cls, v: str, values: dict) -> str:
        """验证确认密码是否匹配"""
        if "password" in values and v != values["password"]:
            raise ValueError("两次输入的密码不一致")
        return v
    
    @field_validator("first_name", "last_name")
    @classmethod
    def validate_name(cls, v: str) -> str:
        """验证姓名格式"""
        if not v or len(v.strip()) < 2:
            raise ValueError("姓名至少需要2个字符")
        if not v.replace(" ", "").isalpha():
            raise ValueError("姓名只能包含字母和空格")
        return v.strip().title()
    
    @model_validator(mode='after')
    def validate_phone_format(self):
        """验证电话号码格式"""
        if self.phone:
            # 简单的电话号码格式验证
            import re
            phone_pattern = r'^[\+]?[1-9][\d]{0,15}$'
            if not re.match(phone_pattern, self.phone.replace(" ", "").replace("-", "")):
                raise ValueError("电话号码格式不正确")
        return self

class PasswordChangeRequest(BaseModel):
    """密码修改请求模型"""
    old_password: str
    new_password: str
    confirm_new_password: str
    
    @field_validator("new_password")
    @classmethod
    def validate_new_password_strength(cls, v: str) -> str:
        """验证新密码强度"""
        validator = PasswordValidator()
        result = validator.validate_password(v)
        
        if not result.is_strong:
            raise ValueError(
                f"新密码强度不足 (得分: {result.score}/100). "
                f"建议: {', '.join(result.feedback) if result.feedback else '请使用更复杂的密码'}"
            )
        
        return v
    
    @model_validator(mode='after')
    def passwords_do_not_match_old(self):
        """验证新密码不能与旧密码相同"""
        if self.old_password == self.new_password:
            raise ValueError("新密码不能与旧密码相同")
        if self.new_password != self.confirm_new_password:
            raise ValueError("两次输入的新密码不一致")
        return self

# 使用示例
try:
    # 测试强密码注册
    strong_registration = RegistrationRequest(
        email="user@example.com",
        password="MySecurePassword123!",
        confirm_password="MySecurePassword123!",
        first_name="John",
        last_name="Doe",
        phone="+1234567890"
    )
    print("强密码注册验证通过")
except ValidationError as e:
    print(f"注册验证失败: {e}")

try:
    # 测试弱密码注册
    weak_registration = RegistrationRequest(
        email="user@example.com",
        password="weak",
        confirm_password="weak",
        first_name="Jane",
        last_name="Smith"
    )
except ValidationError as e:
    print(f"弱密码注册验证失败: {e}")

安全的用户注册流程

用户模型定义

# models/user.py
from sqlalchemy import Column, Integer, String, Boolean, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.sql import func
from datetime import datetime
from typing import Optional
import secrets

Base = declarative_base()

class User(Base):
    """用户模型"""
    __tablename__ = "users"
    
    id = Column(Integer, primary_key=True, index=True)
    email = Column(String(255), unique=True, index=True, nullable=False)
    username = Column(String(255), unique=True, index=True, nullable=True)
    hashed_password = Column(String(255), nullable=False)
    first_name = Column(String(100), nullable=False)
    last_name = Column(String(100), nullable=False)
    phone = Column(String(20), nullable=True)
    is_active = Column(Boolean, default=True)
    is_verified = Column(Boolean, default=False)
    is_locked = Column(Boolean, default=False)
    failed_login_attempts = Column(Integer, default=0)
    last_failed_login = Column(DateTime, nullable=True)
    password_changed_at = Column(DateTime, default=datetime.utcnow)
    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
    
    def get_full_name(self) -> str:
        """获取用户全名"""
        return f"{self.first_name} {self.last_name}".strip()
    
    def unlock_account(self):
        """解锁账户"""
        self.is_locked = False
        self.failed_login_attempts = 0
        self.last_failed_login = None

# 验证令牌模型
class VerificationToken(Base):
    """邮箱验证令牌"""
    __tablename__ = "verification_tokens"
    
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, nullable=False)
    token = Column(String(255), unique=True, index=True, nullable=False)
    expires_at = Column(DateTime, nullable=False)
    used = Column(Boolean, default=False)
    created_at = Column(DateTime, default=func.now())

class PasswordResetToken(Base):
    """密码重置令牌"""
    __tablename__ = "password_reset_tokens"
    
    id = Column(Integer, primary_key=True, index=True)
    user_id = Column(Integer, nullable=False)
    token = Column(String(255), unique=True, index=True, nullable=False)
    expires_at = Column(DateTime, nullable=False)
    used = Column(Boolean, default=False)
    created_at = Column(DateTime, default=func.now())

安全用户服务

# services/user_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update
from typing import Optional, Tuple
from datetime import datetime, timedelta
import secrets
import string

from models.user import User, VerificationToken, PasswordResetToken
from services.password_service import PasswordManager

class UserService:
    """安全用户服务"""
    
    def __init__(self, db: AsyncSession, password_manager: PasswordManager):
        self.db = db
        self.password_manager = password_manager
        self.account_lockout_threshold = 5  # 登录失败次数阈值
        self.account_lockout_duration = 30  # 账户锁定时长(分钟)
        self.verification_token_duration = 24  # 验证令牌有效期(小时)
        self.password_reset_duration = 1  # 密码重置令牌有效期(小时)
    
    async def register_user(
        self, 
        email: str, 
        password: str, 
        first_name: str, 
        last_name: str,
        username: Optional[str] = None,
        phone: Optional[str] = None
    ) -> Tuple[User, str]:
        """注册新用户"""
        # 检查邮箱是否已存在
        existing_user = await self.get_user_by_email(email)
        if existing_user:
            raise ValueError("该邮箱已被注册")
        
        # 检查用户名是否已存在
        if username:
            existing_username = await self.get_user_by_username(username)
            if existing_username:
                raise ValueError("该用户名已被使用")
        
        # 哈希密码
        hashed_password = self.password_manager.hash_password(password)
        
        # 创建用户
        user = User(
            email=email,
            username=username,
            hashed_password=hashed_password,
            first_name=first_name,
            last_name=last_name,
            phone=phone,
            is_active=True,
            is_verified=False,
            is_locked=False,
            failed_login_attempts=0
        )
        
        self.db.add(user)
        await self.db.commit()
        await self.db.refresh(user)
        
        # 生成验证令牌
        verification_token = await self.generate_verification_token(user.id)
        
        return user, verification_token
    
    async def login_user(self, email: str, password: str) -> Optional[User]:
        """用户登录"""
        user = await self.get_user_by_email(email)
        if not user:
            # 即使用户不存在也记录失败尝试,防止用户枚举
            await self.record_failed_login_attempt(email)
            return None
        
        # 检查账户是否被锁定
        if user.is_locked:
            if self._is_lockout_expired(user):
                # 自动解锁账户
                user.unlock_account()
                await self.db.commit()
            else:
                raise ValueError("账户已被锁定,请稍后再试")
        
        # 验证密码
        if self.password_manager.verify_password(password, user.hashed_password):
            # 登录成功,重置失败计数
            await self.reset_failed_login_attempts(user.id)
            return user
        else:
            # 登录失败,记录失败尝试
            await self.record_failed_login_attempt(user.email)
            return None
    
    async def change_password(
        self, 
        user_id: int, 
        old_password: str, 
        new_password: str
    ) -> bool:
        """更改密码"""
        user = await self.get_user_by_id(user_id)
        if not user:
            raise ValueError("用户不存在")
        
        # 验证旧密码
        if not self.password_manager.verify_password(old_password, user.hashed_password):
            raise ValueError("旧密码错误")
        
        # 检查新密码是否与旧密码相同
        if self.password_manager.verify_password(new_password, user.hashed_password):
            raise ValueError("新密码不能与旧密码相同")
        
        # 哈希新密码
        new_hashed_password = self.password_manager.hash_password(new_password)
        
        # 更新密码
        stmt = update(User).where(User.id == user_id).values(
            hashed_password=new_hashed_password,
            password_changed_at=datetime.utcnow()
        )
        await self.db.execute(stmt)
        await self.db.commit()
        
        return True
    
    async def reset_password(self, token: str, new_password: str) -> bool:
        """重置密码"""
        # 查找有效的密码重置令牌
        stmt = select(PasswordResetToken).where(
            PasswordResetToken.token == token,
            PasswordResetToken.used == False,
            PasswordResetToken.expires_at > datetime.utcnow()
        )
        result = await self.db.execute(stmt)
        reset_token = result.scalar_one_or_none()
        
        if not reset_token:
            raise ValueError("无效或已过期的重置令牌")
        
        # 哈希新密码
        new_hashed_password = self.password_manager.hash_password(new_password)
        
        # 更新用户密码
        stmt = update(User).where(User.id == reset_token.user_id).values(
            hashed_password=new_hashed_password,
            password_changed_at=datetime.utcnow()
        )
        await self.db.execute(stmt)
        
        # 标记令牌为已使用
        stmt = update(PasswordResetToken).where(
            PasswordResetToken.id == reset_token.id
        ).values(used=True)
        await self.db.execute(stmt)
        
        await self.db.commit()
        return True
    
    async def verify_email(self, token: str) -> bool:
        """验证邮箱"""
        # 查找有效的验证令牌
        stmt = select(VerificationToken).where(
            VerificationToken.token == token,
            VerificationToken.used == False,
            VerificationToken.expires_at > datetime.utcnow()
        )
        result = await self.db.execute(stmt)
        verification_token = result.scalar_one_or_none()
        
        if not verification_token:
            raise ValueError("无效或已过期的验证令牌")
        
        # 验证用户邮箱
        stmt = update(User).where(User.id == verification_token.user_id).values(
            is_verified=True
        )
        await self.db.execute(stmt)
        
        # 标记令牌为已使用
        stmt = update(VerificationToken).where(
            VerificationToken.id == verification_token.id
        ).values(used=True)
        await self.db.execute(stmt)
        
        await self.db.commit()
        return True
    
    async def generate_verification_token(self, user_id: int) -> str:
        """生成邮箱验证令牌"""
        token = self._generate_secure_token()
        expires_at = datetime.utcnow() + timedelta(hours=self.verification_token_duration)
        
        verification_token = VerificationToken(
            user_id=user_id,
            token=token,
            expires_at=expires_at
        )
        
        self.db.add(verification_token)
        await self.db.commit()
        
        return token
    
    async def generate_password_reset_token(self, user_id: int) -> str:
        """生成密码重置令牌"""
        token = self._generate_secure_token()
        expires_at = datetime.utcnow() + timedelta(hours=self.password_reset_duration)
        
        reset_token = PasswordResetToken(
            user_id=user_id,
            token=token,
            expires_at=expires_at
        )
        
        self.db.add(reset_token)
        await self.db.commit()
        
        return token
    
    async def get_user_by_email(self, email: str) -> Optional[User]:
        """根据邮箱获取用户"""
        stmt = select(User).where(User.email == email)
        result = await self.db.execute(stmt)
        return result.scalar_one_or_none()
    
    async def get_user_by_username(self, username: str) -> Optional[User]:
        """根据用户名获取用户"""
        stmt = select(User).where(User.username == username)
        result = await self.db.execute(stmt)
        return result.scalar_one_or_none()
    
    async def get_user_by_id(self, user_id: int) -> Optional[User]:
        """根据ID获取用户"""
        stmt = select(User).where(User.id == user_id)
        result = await self.db.execute(stmt)
        return result.scalar_one_or_none()
    
    async def record_failed_login_attempt(self, email: str):
        """记录登录失败尝试"""
        user = await self.get_user_by_email(email)
        if not user:
            # 如果用户不存在,仍然记录失败尝试以防止用户枚举
            return
        
        user.failed_login_attempts += 1
        user.last_failed_login = datetime.utcnow()
        
        # 检查是否达到锁定阈值
        if user.failed_login_attempts >= self.account_lockout_threshold:
            user.is_locked = True
        
        await self.db.commit()
    
    async def reset_failed_login_attempts(self, user_id: int):
        """重置登录失败计数"""
        stmt = update(User).where(User.id == user_id).values(
            failed_login_attempts=0,
            last_failed_login=None,
            is_locked=False
        )
        await self.db.execute(stmt)
        await self.db.commit()
    
    def _is_lockout_expired(self, user: User) -> bool:
        """检查账户锁定是否已过期"""
        if not user.last_failed_login:
            return True
        
        lockout_end = user.last_failed_login + timedelta(minutes=self.account_lockout_duration)
        return datetime.utcnow() > lockout_end
    
    def _generate_secure_token(self, length: int = 32) -> str:
        """生成安全的令牌"""
        alphabet = string.ascii_letters + string.digits
        return ''.join(secrets.choice(alphabet) for _ in range(length))

# 使用示例
async def demo_user_service():
    # 这里需要数据库连接
    # db_session = get_db_session()  # 假设有一个获取数据库会话的函数
    # password_manager = PasswordManager()
    # user_service = UserService(db_session, password_manager)
    
    # 示例操作
    """
    # 用户注册
    user, verification_token = await user_service.register_user(
        email="user@example.com",
        password="MySecurePassword123!",
        first_name="John",
        last_name="Doe"
    )
    
    # 用户登录
    logged_in_user = await user_service.login_user(
        email="user@example.com",
        password="MySecurePassword123!"
    )
    
    # 更改密码
    await user_service.change_password(
        user_id=logged_in_user.id,
        old_password="MySecurePassword123!",
        new_password="MyNewSecurePassword456@"
    )
    """
    pass

密码安全最佳实践

安全配置清单

"""
密码安全最佳实践清单:

✅ 技术层面
- [ ] 使用bcrypt或Argon2进行密码哈希
- [ ] 每个用户使用唯一随机盐值
- [ ] 设置合理的bcrypt成本因子(通常12)
- [ ] 实施密码强度验证
- [ ] 定期更新哈希算法
- [ ] 实施账户锁定策略
- [ ] 使用HTTPS传输密码数据
- [ ] 不在日志中记录密码
- [ ] 实施密码历史记录检查
- [ ] 定期强制密码更换

✅ 业务层面
- [ ] 提供密码重置功能
- [ ] 实施邮箱验证机制
- [ ] 提供两因素认证
- [ ] 记录安全事件日志
- [ ] 实施密码泄露检测
- [ ] 提供安全通知功能
- [ ] 定期安全审计

✅ 用户教育
- [ ] 提供密码强度指示器
- [ ] 教育用户密码安全知识
- [ ] 推荐使用密码管理器
- [ ] 提供安全最佳实践指南
"""

# 安全配置类
class SecurityConfig:
    """安全配置类"""
    
    def __init__(self):
        # 密码安全配置
        self.PASSWORD_MIN_LENGTH = 8
        self.PASSWORD_MAX_LENGTH = 128
        self.PASSWORD_REQUIRE_UPPERCASE = True
        self.PASSWORD_REQUIRE_LOWERCASE = True
        self.PASSWORD_REQUIRE_DIGITS = True
        self.PASSWORD_REQUIRE_SPECIAL_CHARS = True
        
        # 账户安全配置
        self.LOGIN_ATTEMPTS_THRESHOLD = 5
        self.ACCOUNT_LOCKOUT_DURATION = 30  # 分钟
        self.PASSWORD_RESET_TOKEN_EXPIRY = 1  # 小时
        self.EMAIL_VERIFICATION_TOKEN_EXPIRY = 24  # 小时
        
        # 会话安全配置
        self.SESSION_TIMEOUT = 30  # 分钟
        self.REMEMBER_ME_DURATION = 30  # 天
        self.TOKEN_EXPIRY = 15  # 分钟(access token)
        
        # 加密配置
        self.BCRYPT_ROUNDS = 12
        self.ENCRYPTION_KEY_LENGTH = 32  # 字节
        self.RANDOM_TOKEN_LENGTH = 32  # 字节

security_config = SecurityConfig()

安全中间件

from fastapi import Request, HTTPException, status
from fastapi.security import HTTPBearer
import time
from typing import Dict, List
from collections import defaultdict

class SecurityMiddleware:
    """安全中间件 - 实施多种安全检查"""
    
    def __init__(self):
        self.rate_limit_storage: Dict[str, List[float]] = defaultdict(list)
        self.brute_force_protection: Dict[str, Dict] = defaultdict(dict)
        self.rate_limit_window = 60  # 60秒窗口
        self.max_requests_per_window = 100  # 每窗口最大请求数
        self.max_login_attempts = 5  # 最大登录尝试次数
        self.block_duration = 300  # 封锁时长(秒)
    
    async def check_rate_limit(self, request: Request) -> bool:
        """检查速率限制"""
        client_ip = request.client.host
        now = time.time()
        
        # 清理过期的请求记录
        self.rate_limit_storage[client_ip] = [
            timestamp for timestamp in self.rate_limit_storage[client_ip]
            if now - timestamp < self.rate_limit_window
        ]
        
        # 检查是否超过限制
        if len(self.rate_limit_storage[client_ip]) >= self.max_requests_per_window:
            return False
        
        # 记录当前请求
        self.rate_limit_storage[client_ip].append(now)
        return True
    
    async def check_brute_force(self, ip_address: str, endpoint: str) -> bool:
        """检查暴力破解尝试"""
        key = f"{ip_address}:{endpoint}"
        now = time.time()
        
        if key in self.brute_force_protection:
            attempt_info = self.brute_force_protection[key]
            
            # 检查是否在封禁期间
            if now < attempt_info.get('blocked_until', 0):
                return False
            
            # 检查尝试次数
            recent_attempts = [
                t for t in attempt_info.get('attempts', [])
                if now - t < 300  # 5分钟内
            ]
            
            if len(recent_attempts) >= self.max_login_attempts:
                # 触发封禁
                attempt_info['blocked_until'] = now + self.block_duration
                return False
        else:
            self.brute_force_protection[key] = {'attempts': []}
        
        # 记录尝试
        self.brute_force_protection[key]['attempts'].append(now)
        return True
    
    async def cleanup_expired_blocks(self):
        """清理过期的封禁"""
        now = time.time()
        expired_keys = [
            key for key, info in self.brute_force_protection.items()
            if info.get('blocked_until', 0) < now
        ]
        
        for key in expired_keys:
            del self.brute_force_protection[key]

# 全局安全中间件实例
security_middleware = SecurityMiddleware()

# 在FastAPI应用中使用
"""
@app.middleware("http")
async def security_middleware_handler(request: Request, call_next):
    # 检查速率限制
    if not await security_middleware.check_rate_limit(request):
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail="请求过于频繁,请稍后再试"
        )
    
    # 检查暴力破解(针对登录端点)
    if request.url.path in ["/auth/login", "/auth/register"]:
        if not await security_middleware.check_brute_force(
            request.client.host, 
            request.url.path
        ):
            raise HTTPException(
                status_code=status.HTTP_429_TOO_MANY_REQUESTS,
                detail="尝试次数过多,请稍后再试"
            )
    
    response = await call_next(request)
    return response
"""

密码泄露检测

import hashlib
import requests
from typing import Optional

class PasswordBreachDetector:
    """密码泄露检测器 - 使用Have I Been Pwned API"""
    
    def __init__(self):
        self.api_url = "https://api.pwnedpasswords.com/range/"
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'FastAPI-Password-Security-Checker/1.0'
        })
    
    def _hash_and_truncate(self, password: str) -> tuple[str, str]:
        """哈希密码并截取前5位和其余部分"""
        sha1_hash = hashlib.sha1(password.encode('utf-8')).hexdigest().upper()
        prefix = sha1_hash[:5]
        suffix = sha1_hash[5:]
        return prefix, suffix
    
    def check_password_breach(self, password: str) -> tuple[bool, int]:
        """检查密码是否在泄露事件中"""
        try:
            prefix, suffix = self._hash_and_truncate(password)
            
            # 查询API
            response = self.session.get(f"{self.api_url}{prefix}")
            response.raise_for_status()
            
            # 解析响应
            hashes = response.text.splitlines()
            for hash_line in hashes:
                hash_suffix, count = hash_line.split(':')
                if hash_suffix == suffix:
                    return True, int(count)
            
            return False, 0
            
        except requests.RequestException:
            # API请求失败时不阻止用户(降级处理)
            return False, 0
        except Exception:
            return False, 0
    
    async def check_password_breach_async(self, password: str) -> tuple[bool, int]:
        """异步检查密码泄露"""
        import asyncio
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, self.check_password_breach, password)

# 使用示例
breach_detector = PasswordBreachDetector()

async def validate_password_not_breached(password: str) -> bool:
    """验证密码未在泄露事件中"""
    is_breached, count = await breach_detector.check_password_breach_async(password)
    
    if is_breached:
        print(f"警告:密码已在{count}次泄露事件中出现,请选择更安全的密码")
        return False
    
    return True

# 在密码验证中使用
"""
validator = PasswordValidator()
strength_result = validator.validate_password(password)

if strength_result.is_strong:
    # 额外检查密码是否泄露
    is_safe = await validate_password_not_breached(password)
    if not is_safe:
        raise ValueError("密码已在数据泄露事件中出现,请选择更安全的密码")
"""

高级安全技术

Argon2替代方案

# 安装: pip install argon2-cffi
try:
    from argon2 import PasswordHasher
    from argon2.exceptions import VerifyMismatchError, HashError
except ImportError:
    print("argon2-cffi未安装,跳过Argon2示例")
    PasswordHasher = None

if PasswordHasher:
    class Argon2PasswordManager:
        """Argon2密码管理器"""
        
        def __init__(
            self,
            time_cost: int = 3,
            memory_cost: int = 65536,  # 64 MiB
            parallelism: int = 4,
            hash_len: int = 32,
            salt_len: int = 16
        ):
            self.ph = PasswordHasher(
                time_cost=time_cost,
                memory_cost=memory_cost,
                parallelism=parallelism,
                hash_len=hash_len,
                salt_len=salt_len
            )
        
        def hash_password(self, password: str) -> str:
            """哈希密码"""
            try:
                return self.ph.hash(password)
            except HashError as e:
                raise ValueError(f"密码哈希失败: {e}")
        
        def verify_password(self, plain_password: str, hashed_password: str) -> bool:
            """验证密码"""
            try:
                self.ph.verify(hashed_password, plain_password)
                return True
            except VerifyMismatchError:
                return False
            except Exception:
                return False
        
        def check_needs_rehash(self, hashed_password: str) -> bool:
            """检查是否需要重新哈希"""
            return self.ph.check_needs_rehash(hashed_password)

    # 配置支持bcrypt和argon2的密码管理器
    hybrid_context = CryptContext(
        schemes=["bcrypt", "argon2"],
        deprecated=["auto"],
        bcrypt__default_rounds=12,
        argon2__time_cost=3,
        argon2__memory_cost=65536,
        argon2__parallelism=4,
    )

    class HybridPasswordManager:
        """混合密码管理器 - 支持多种算法"""
        
        def __init__(self):
            self.argon2_manager = Argon2PasswordManager() if PasswordHasher else None
            self.bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
        
        def hash_password(self, password: str, algorithm: str = "bcrypt") -> str:
            """使用指定算法哈希密码"""
            if algorithm == "argon2" and self.argon2_manager:
                return self.argon2_manager.hash_password(password)
            else:
                return self.bcrypt_context.hash(password)
        
        def verify_password(self, plain_password: str, hashed_password: str) -> bool:
            """验证密码(自动识别算法)"""
            if hashed_password.startswith('$argon2'):
                if self.argon2_manager:
                    return self.argon2_manager.verify_password(plain_password, hashed_password)
            elif hashed_password.startswith('$2'):
                return self.bcrypt_context.verify(plain_password, hashed_password)
            
            return False

# 使用示例
"""
hybrid_manager = HybridPasswordManager()

# 使用bcrypt哈希
bcrypt_hash = hybrid_manager.hash_password("MyPassword123!", "bcrypt")
print(f"bcrypt hash: {bcrypt_hash}")

# 使用argon2哈希(如果可用)
if PasswordHasher:
    argon2_hash = hybrid_manager.hash_password("MyPassword123!", "argon2")
    print(f"argon2 hash: {argon2_hash}")

# 验证两种哈希
print(f"bcrypt验证: {hybrid_manager.verify_password('MyPassword123!', bcrypt_hash)}")
if PasswordHasher:
    print(f"argon2验证: {hybrid_manager.verify_password('MyPassword123!', argon2_hash)}")
"""

多因素认证集成

import pyotp
import qrcode
from io import BytesIO
import base64

class TwoFactorAuthService:
    """双因素认证服务"""
    
    def __init__(self):
        self.totp_interval = 30  # TOTP间隔时间(秒)
    
    def generate_secret(self) -> str:
        """生成TOTP密钥"""
        return pyotp.random_base32()
    
    def generate_qr_code(self, secret: str, email: str, issuer: str = "YourApp") -> str:
        """生成QR码URL"""
        totp_uri = pyotp.totp.TOTP(secret).provisioning_uri(
            name=email,
            issuer_name=issuer
        )
        
        # 生成二维码
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(totp_uri)
        qr.make(fit=True)
        
        img = qr.make_image(fill_color="black", back_color="white")
        buffer = BytesIO()
        img.save(buffer, format="PNG")
        img_str = base64.b64encode(buffer.getvalue()).decode()
        
        return f"data:image/png;base64,{img_str}"
    
    def verify_totp(self, secret: str, token: str, window: int = 1) -> bool:
        """验证TOTP令牌"""
        totp = pyotp.TOTP(secret, interval=self.totp_interval)
        return totp.verify(token, valid_window=window)
    
    def generate_backup_codes(self, count: int = 10) -> list:
        """生成备用恢复码"""
        import secrets
        backup_codes = []
        for _ in range(count):
            code = secrets.token_urlsafe(8)[:8].upper()  # 8位大写字母数字
            backup_codes.append(code)
        return backup_codes

# 使用示例
"""
two_fa_service = TwoFactorAuthService()

# 为用户生成2FA设置
secret = two_fa_service.generate_secret()
qr_code_url = two_fa_service.generate_qr_code(secret, "user@example.com")

print(f"Secret: {secret}")
print(f"QR Code URL: {qr_code_url}")

# 验证用户输入的令牌
user_token = input("请输入TOTP令牌: ")
if two_fa_service.verify_totp(secret, user_token):
    print("2FA验证成功!")
else:
    print("2FA验证失败!")
"""

安全漏洞防范

常见安全漏洞防护

from fastapi import Request
import re
from typing import Optional

class SecurityVulnerabilityProtection:
    """安全漏洞防护类"""
    
    def __init__(self):
        # SQL注入防护模式
        self.sql_injection_patterns = [
            r"(?i)(union\s+select|drop\s+\w+|exec\s*\(|insert\s+into|delete\s+from|update\s+\w+\s+set)",
            r"(?i)(';|--|#|/\*|\*/|xp_|sp_|sysobjects|syscolumns)",
        ]
        
        # XSS防护模式
        self.xss_patterns = [
            r"(?i)(<script|javascript:|on\w+\s*=|<iframe|<object|<embed)",
            r"(?i)(document\.cookie|window\.alert|eval\s*\(|expression\s*\()",
        ]
        
        # 命令注入防护模式
        self.command_injection_patterns = [
            r"(?i)(;\s*rm\s|&&\s*rm\s|\|\s*rm\s|`.*`|\$\(.*\)|>\s*/|>>\s*/)",
            r"(?i)(\|\||&&|;|`|\$|\(|\)|\||>|<)",
        ]
    
    def sanitize_input(self, input_str: str) -> str:
        """输入净化"""
        if not input_str:
            return input_str
        
        # 移除潜在危险字符
        sanitized = input_str.strip()
        
        # URL解码(防止编码绕过)
        import urllib.parse
        try:
            sanitized = urllib.parse.unquote_plus(sanitized)
        except:
            pass
        
        return sanitized
    
    def detect_sql_injection(self, input_str: str) -> bool:
        """检测SQL注入"""
        if not input_str:
            return False
        
        sanitized = self.sanitize_input(input_str)
        for pattern in self.sql_injection_patterns:
            if re.search(pattern, sanitized):
                return True
        return False
    
    def detect_xss(self, input_str: str) -> bool:
        """检测XSS"""
        if not input_str:
            return False
        
        sanitized = self.sanitize_input(input_str)
        for pattern in self.xss_patterns:
            if re.search(pattern, sanitized):
                return True
        return False
    
    def detect_command_injection(self, input_str: str) -> bool:
        """检测命令注入"""
        if not input_str:
            return False
        
        sanitized = self.sanitize_input(input_str)
        for pattern in self.command_injection_patterns:
            if re.search(pattern, sanitized):
                return True
        return False
    
    def validate_email_format(self, email: str) -> bool:
        """验证邮箱格式"""
        pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
        return bool(re.match(pattern, email))
    
    def validate_password_against_common_list(self, password: str, common_passwords: list = None) -> bool:
        """检查密码是否在常见密码列表中"""
        if common_passwords is None:
            # 常见弱密码列表
            common_passwords = [
                'password', '123456', 'qwerty', 'admin', 'letmein',
                'welcome', 'monkey', '1234567890', 'abc123', 'password1'
            ]
        
        return password.lower() not in [pwd.lower() for pwd in common_passwords]

# 全局安全防护实例
security_protection = SecurityVulnerabilityProtection()

# 在FastAPI路由中使用
"""
@app.post("/register")
async def register_user(registration_data: RegistrationRequest):
    # 输入验证
    if security_protection.detect_sql_injection(registration_data.email):
        raise HTTPException(status_code=400, detail="无效的邮箱格式")
    
    if security_protection.detect_sql_injection(registration_data.password):
        raise HTTPException(status_code=400, detail="密码包含非法字符")
    
    # 密码安全检查
    if not security_protection.validate_password_against_common_list(registration_data.password):
        raise HTTPException(status_code=400, detail="密码过于常见,请选择更安全的密码")
    
    # 继续处理注册逻辑...
"""

安全日志记录

import logging
from datetime import datetime
from typing import Dict, Any
import json

class SecurityLogger:
    """安全日志记录器"""
    
    def __init__(self, logger_name: str = "security"):
        self.logger = logging.getLogger(logger_name)
        self.logger.setLevel(logging.INFO)
        
        # 创建安全日志处理器
        if not self.logger.handlers:
            handler = logging.FileHandler("security.log")
            formatter = logging.Formatter(
                '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
    
    def log_login_attempt(self, email: str, success: bool, ip_address: str, user_agent: str = None):
        """记录登录尝试"""
        event_data = {
            "event_type": "login_attempt",
            "email": email,
            "success": success,
            "ip_address": ip_address,
            "user_agent": user_agent,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        level = logging.INFO if success else logging.WARNING
        self.logger.log(level, f"LOGIN_ATTEMPT: {json.dumps(event_data)}")
    
    def log_password_change(self, user_id: int, ip_address: str, success: bool):
        """记录密码更改"""
        event_data = {
            "event_type": "password_change",
            "user_id": user_id,
            "success": success,
            "ip_address": ip_address,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        level = logging.INFO if success else logging.ERROR
        self.logger.log(level, f"PASSWORD_CHANGE: {json.dumps(event_data)}")
    
    def log_security_violation(self, violation_type: str, details: Dict[str, Any], ip_address: str):
        """记录安全违规"""
        event_data = {
            "event_type": "security_violation",
            "violation_type": violation_type,
            "details": details,
            "ip_address": ip_address,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        self.logger.warning(f"SECURITY_VIOLATION: {json.dumps(event_data)}")
    
    def log_account_lockout(self, user_id: int, email: str, reason: str, ip_address: str):
        """记录账户锁定"""
        event_data = {
            "event_type": "account_lockout",
            "user_id": user_id,
            "email": email,
            "reason": reason,
            "ip_address": ip_address,
            "timestamp": datetime.utcnow().isoformat()
        }
        
        self.logger.warning(f"ACCOUNT_LOCKOUT: {json.dumps(event_data)}")

# 全局安全日志记录器
security_logger = SecurityLogger()

# 在用户服务中使用
"""
class UserService:
    # ... 其他代码 ...
    
    async def login_user(self, email: str, password: str, request: Request) -> Optional[User]:
        user = await self.get_user_by_email(email)
        
        if not user:
            security_logger.log_login_attempt(
                email=email, 
                success=False, 
                ip_address=request.client.host,
                user_agent=request.headers.get("user-agent")
            )
            # 即使用户不存在也记录失败尝试
            await self.record_failed_login_attempt(email)
            return None
        
        if self.password_manager.verify_password(password, user.hashed_password):
            security_logger.log_login_attempt(
                email=email, 
                success=True, 
                ip_address=request.client.host,
                user_agent=request.headers.get("user-agent")
            )
            await self.reset_failed_login_attempts(user.id)
            return user
        else:
            security_logger.log_login_attempt(
                email=email, 
                success=False, 
                ip_address=request.client.host,
                user_agent=request.headers.get("user-agent")
            )
            await self.record_failed_login_attempt(user.email)
            return None
"""

性能优化与监控

密码哈希性能监控

import time
import statistics
from collections import deque
from typing import List, Dict
import asyncio

class PasswordHashingMonitor:
    """密码哈希性能监控器"""
    
    def __init__(self, max_samples: int = 1000):
        self.hash_times: deque = deque(maxlen=max_samples)
        self.verify_times: deque = deque(maxlen=max_samples)
        self.max_samples = max_samples
        self.total_hashes = 0
        self.total_verifies = 0
    
    def record_hash_time(self, duration: float):
        """记录哈希操作耗时"""
        self.hash_times.append(duration)
        self.total_hashes += 1
    
    def record_verify_time(self, duration: float):
        """记录验证操作耗时"""
        self.verify_times.append(duration)
        self.total_verifies += 1
    
    def get_hash_statistics(self) -> Dict[str, float]:
        """获取哈希操作统计信息"""
        if not self.hash_times:
            return {"count": 0, "avg_time": 0, "min_time": 0, "max_time": 0, "p95_time": 0}
        
        times_list = list(self.hash_times)
        return {
            "count": len(times_list),
            "avg_time": statistics.mean(times_list),
            "min_time": min(times_list),
            "max_time": max(times_list),
            "p95_time": sorted(times_list)[int(0.95 * len(times_list))] if times_list else 0,
            "total_operations": self.total_hashes
        }
    
    def get_verify_statistics(self) -> Dict[str, float]:
        """获取验证操作统计信息"""
        if not self.verify_times:
            return {"count": 0, "avg_time": 0, "min_time": 0, "max_time": 0, "p95_time": 0}
        
        times_list = list(self.verify_times)
        return {
            "count": len(times_list),
            "avg_time": statistics.mean(times_list),
            "min_time": min(times_list),
            "max_time": max(times_list),
            "p95_time": sorted(times_list)[int(0.95 * len(times_list))] if times_list else 0,
            "total_operations": self.total_verifies
        }
    
    def get_overall_performance(self) -> Dict[str, any]:
        """获取整体性能信息"""
        return {
            "hash_stats": self.get_hash_statistics(),
            "verify_stats": self.get_verify_statistics(),
            "system_load": len(self.hash_times) + len(self.verify_times),
            "cache_hit_ratio": getattr(self, 'cache_hits', 0) / max(getattr(self, 'total_cache_ops', 1), 1)
        }

# 全局监控实例
hash_monitor = PasswordHashingMonitor()

class MonitoredPasswordManager(PasswordManager):
    """带监控的密码管理器"""
    
    def __init__(self, context: CryptContext = pwd_context):
        super().__init__(context)
        self.monitor = hash_monitor
    
    def hash_password(self, password: str) -> str:
        start_time = time.time()
        result = super().hash_password(password)
        duration = time.time() - start_time
        self.monitor.record_hash_time(duration)
        return result
    
    def verify_password(self, plain_password: str, hashed_password: str) -> bool:
        start_time = time.time()
        result = super().verify_password(plain_password, hashed_password)
        duration = time.time() - start_time
        self.monitor.record_verify_time(duration)
        return result

# 使用示例
"""
monitored_manager = MonitoredPasswordManager()

# 执行一些哈希和验证操作
for i in range(10):
    password = f"TestPassword{i}!"
    hashed = monitored_manager.hash_password(password)
    verified = monitored_manager.verify_password(password, hashed)

# 获取性能统计
stats = hash_monitor.get_overall_performance()
print(f"哈希统计: {stats['hash_stats']}")
print(f"验证统计: {stats['verify_stats']}")
"""

缓存优化

import hashlib
from typing import Optional
import asyncio
from functools import wraps

class PasswordCache:
    """密码验证缓存 - 提升频繁验证的性能"""
    
    def __init__(self, max_size: int = 1000, ttl: int = 300):  # 5分钟TTL
        self.cache = {}
        self.max_size = max_size
        self.ttl = ttl  # 秒
    
    def _make_key(self, email: str, password: str) -> str:
        """创建缓存键"""
        combined = f"{email}:{password}"
        return hashlib.sha256(combined.encode()).hexdigest()[:16]
    
    def _is_expired(self, timestamp: float) -> bool:
        """检查是否过期"""
        return time.time() - timestamp > self.ttl
    
    def get(self, email: str, password: str) -> Optional[bool]:
        """获取缓存结果"""
        key = self._make_key(email, password)
        if key in self.cache:
            result, timestamp = self.cache[key]
            if not self._is_expired(timestamp):
                return result
            else:
                # 删除过期项
                del self.cache[key]
        return None
    
    def set(self, email: str, password: str, result: bool):
        """设置缓存结果"""
        if len(self.cache) >= self.max_size:
            # 简单的LRU:删除最早的项
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
        
        key = self._make_key(email, password)
        self.cache[key] = (result, time.time())

class CachedPasswordManager(MonitoredPasswordManager):
    """带缓存的密码管理器"""
    
    def __init__(self, context: CryptContext = pwd_context, use_cache: bool = True):
        super().__init__(context)
        self.use_cache = use_cache
        self.cache = PasswordCache() if use_cache else None
        self.cache_hits = 0
        self.total_cache_ops = 0
    
    def verify_password(self, plain_password: str, hashed_password: str, email: str = None) -> bool:
        if self.use_cache and email:
            self.total_cache_ops += 1
            cached_result = self.cache.get(email, plain_password)
            if cached_result is not None:
                self.cache_hits += 1
                return cached_result
        
        # 执行实际验证
        start_time = time.time()
        result = super().verify_password(plain_password, hashed_password)
        duration = time.time() - start_time
        
        # 只缓存成功的验证结果(防止暴力破解)
        if self.use_cache and email and result:
            self.cache.set(email, plain_password, result)
        
        self.monitor.record_verify_time(duration)
        return result

# 更新监控统计
def update_cache_stats():
    """更新缓存统计信息到监控器"""
    if hasattr(monitored_manager, 'cache_hits'):
        hash_monitor.cache_hits = monitored_manager.cache_hits
        hash_monitor.total_cache_ops = monitored_manager.total_cache_ops

与其他安全框架对比

特性FastAPI + PasslibDjango AuthFlask-SecurityAuthlib
集成难度中等低(Django内置)中等
性能中等中等
灵活性中等极高
学习曲线中等低(对Django用户)中等
功能完整性完整完整完整完整
异步支持原生有限有限原生
OAuth2支持需要额外库有限集成专业
密码安全优秀良好良好良好

框架选择建议

"""
选择密码安全框架的决策树:

1. 如果使用Django → Django内置认证系统
2. 如果需要完整的身份管理 → Flask-Security
3. 如果需要专业的OAuth/OIDC → Authlib
4. 如果使用FastAPI → 本教程的Passlib实现
5. 如果需要最大灵活性 → 自定义 + Passlib

FastAPI + Passlib优势:
- 与FastAPI深度集成
- 异步支持
- 类型安全
- 灵活的哈希算法支持
- 企业级安全特性
- 易于测试和维护
"""

总结

FastAPI中的密码哈希与安全实践提供了完整的安全认证解决方案:

  1. 密码哈希:使用bcrypt/Argon2等安全算法
  2. 强度验证:实施密码复杂度要求
  3. 安全存储:防彩虹表攻击的盐值机制
  4. 用户管理:完整的注册登录流程
  5. 安全防护:防暴力破解、速率限制
  6. 监控审计:安全事件日志记录
  7. 性能优化:缓存和异步处理

正确实施密码安全能够:

  • 保护用户数据安全
  • 满足合规要求
  • 防止数据泄露
  • 提升用户信任度

💡 关键要点:永远不要自己实现加密算法,使用经过安全审计的库(Passlib + bcrypt/argon2)。密码安全是Web应用的基石,必须给予足够重视。


🔗 扩展阅读