#Django综合实战项目 - 电商平台开发完整指南
📂 所属阶段:第四部分 — 实战项目
🎯 难度等级:专家级
⏰ 预计学习时间:12-15小时
🎒 前置知识:完成前三部分所有章节
#目录
#项目概述
#项目目标
"""
电商平台项目目标:
1. 核心业务功能
- 用户注册/登录/认证
- 商品浏览/搜索/分类
- 购物车管理
- 订单处理
- 支付集成
- 物流跟踪
2. 技术架构目标
- 微服务架构
- 高可用性设计
- 可扩展性保证
- 安全性保障
- 性能优化
3. 运营支撑功能
- 后台管理系统
- 数据分析
- 监控告警
- 日志审计
"""#项目范围
"""
项目功能模块:
1. 用户管理模块
- 用户注册/登录/认证
- 个人信息管理
- 地址管理
- 安全设置
2. 商品管理模块
- 商品分类管理
- 商品信息管理
- 库存管理
- 价格管理
3. 购物车模块
- 添加/删除商品
- 数量修改
- 优惠券应用
- 价格计算
4. 订单管理模块
- 订单创建
- 订单状态跟踪
- 退款处理
- 发票管理
5. 支付模块
- 支付网关集成
- 订单支付
- 退款处理
- 支付状态管理
6. 搜索模块
- 商品搜索
- 智能推荐
- 搜索建议
- 搜索分析
7. 通知模块
- 邮件通知
- 短信通知
- 站内信
- 推送通知
8. 管理后台
- 商品管理
- 订单管理
- 用户管理
- 数据统计
"""#技术栈选择
"""
技术栈选择:
1. 后端技术栈
- Django 4.2+ (Web框架)
- Django REST Framework (API开发)
- PostgreSQL (主数据库)
- Redis (缓存/会话)
- Celery (异步任务)
- RabbitMQ/Kafka (消息队列)
2. 前端技术栈
- Vue.js/React (前端框架)
- Axios (HTTP客户端)
- Element UI/Ant Design (UI组件)
- Webpack (打包工具)
3. 基础设施
- Docker (容器化)
- Nginx (反向代理)
- Gunicorn (WSGI服务器)
- Supervisor (进程管理)
4. 监控运维
- Prometheus (监控)
- Grafana (可视化)
- ELK Stack (日志分析)
- Sentry (错误追踪)
"""#项目架构设计
#微服务架构设计
"""
电商平台微服务架构:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ API Gateway │────│ Load Balancer │────│ Frontend │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Auth Service │ │ User Service │ │ Product Service │
│ │ │ │ │ │
│ - JWT Auth │ │ - User Profile │ │ - Product CRUD │
│ - OAuth2 │ │ - Address Mgmt │ │ - Category │
│ - Permission │ │ - Security │ │ - Inventory │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Cart Service │ │ Order Service │ │ Payment Service │
│ │ │ │ │ │
│ - Cart CRUD │ │ - Order Mgmt │ │ - Payment │
│ - Coupon │ │ - Status Track │ │ - Refund │
│ - Calculation │ │ - Invoice │ │ - Gateway │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Search Service │ │ Notify Service │ │ Logistics Ser. │
│ │ │ │ │ │
│ - Elasticsearch │ │ - Email/SMS │ │ - Tracking │
│ - Recommendation│ │ - Push │ │ - Shipping │
│ - Analytics │ │ - Notification │ │ - Delivery │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
└───────────────────────┼───────────────────────┘
│
┌─────────────────┐
│ Shared Services │
│ │
│ - PostgreSQL │
│ - Redis │
│ - RabbitMQ │
│ - File Storage │
└─────────────────┘
架构特点:
1. 服务独立部署
2. 数据隔离
3. 异步通信
4. 高可用设计
5. 水平扩展能力
"""#项目目录结构
ecommerce-platform/
├── docker-compose.yml
├── docker/
│ ├── nginx/
│ │ ├── nginx.conf
│ │ └── sites-available/
│ ├── gateway/
│ │ ├── Dockerfile
│ │ └── docker-compose.gateway.yml
│ └── services/
│ ├── user/
│ ├── product/
│ ├── order/
│ ├── payment/
│ ├── cart/
│ ├── search/
│ └── notification/
├── services/
│ ├── gateway/
│ │ ├── manage.py
│ │ ├── requirements.txt
│ │ ├── gateway/
│ │ │ ├── settings.py
│ │ │ ├── urls.py
│ │ │ ├── middleware/
│ │ │ └── apps/
│ │ │ └── gateway_app/
│ ├── user/
│ │ ├── manage.py
│ │ ├── requirements.txt
│ │ └── user_service/
│ │ ├── settings.py
│ │ ├── apps/
│ │ │ ├── user_auth/
│ │ │ ├── user_profile/
│ │ │ └── user_security/
│ │ └── api/
│ ├── product/
│ │ ├── manage.py
│ │ ├── product_service/
│ │ │ ├── models/
│ │ │ ├── serializers/
│ │ │ ├── views/
│ │ │ └── api/
│ ├── order/
│ ├── payment/
│ ├── cart/
│ ├── search/
│ └── notification/
├── frontend/
│ ├── package.json
│ ├── src/
│ │ ├── components/
│ │ ├── views/
│ │ ├── api/
│ │ └── store/
│ └── public/
├── shared/
│ ├── common/
│ ├── utils/
│ └── constants/
├── docs/
├── scripts/
│ ├── deploy.sh
│ ├── backup.sh
│ └── monitoring/
├── tests/
│ ├── integration/
│ ├── unit/
│ └── e2e/
└── README.md#配置管理
# shared/common/config.py
import os
from typing import Dict, Any
from decouple import config
import yaml
class ConfigManager:
"""配置管理器"""
def __init__(self):
self.env = config('ENVIRONMENT', default='development')
self.config = self._load_config()
def _load_config(self) -> Dict[str, Any]:
"""加载配置"""
config_path = f'config/{self.env}.yaml'
if os.path.exists(config_path):
with open(config_path, 'r', encoding='utf-8') as f:
return yaml.safe_load(f)
else:
# 默认配置
return self._get_default_config()
def _get_default_config(self) -> Dict[str, Any]:
"""获取默认配置"""
return {
'database': {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': config('DB_NAME', default='ecommerce'),
'USER': config('DB_USER', default='ecommerce_user'),
'PASSWORD': config('DB_PASSWORD', default=''),
'HOST': config('DB_HOST', default='localhost'),
'PORT': config('DB_PORT', default='5432'),
}
},
'redis': {
'LOCATION': config('REDIS_URL', default='redis://localhost:6379/0'),
},
'rabbitmq': {
'URL': config('RABBITMQ_URL', default='amqp://guest:guest@localhost:5672//'),
},
'services': {
'user': {
'URL': config('USER_SERVICE_URL', default='http://user-service:8000'),
'API_VERSION': 'v1',
},
'product': {
'URL': config('PRODUCT_SERVICE_URL', default='http://product-service:8000'),
'API_VERSION': 'v1',
},
'order': {
'URL': config('ORDER_SERVICE_URL', default='http://order-service:8000'),
'API_VERSION': 'v1',
},
'payment': {
'URL': config('PAYMENT_SERVICE_URL', default='http://payment-service:8000'),
'API_VERSION': 'v1',
}
}
}
def get(self, key: str, default=None):
"""获取配置值"""
keys = key.split('.')
value = self.config
for k in keys:
value = value.get(k, {})
return value if value != {} else default
# 环境配置示例
"""
# config/development.yaml
database:
default:
ENGINE: django.db.backends.postgresql
NAME: ecommerce_dev
USER: ecommerce_dev_user
PASSWORD: dev_password
HOST: localhost
PORT: 5432
redis:
LOCATION: redis://localhost:6379/0
services:
user:
URL: http://localhost:8001
product:
URL: http://localhost:8002
order:
URL: http://localhost:8003
payment:
URL: http://localhost:8004
# config/production.yaml
database:
default:
ENGINE: django.db.backends.postgresql
NAME: ecommerce_prod
USER: ecommerce_prod_user
PASSWORD: ${DB_PASSWORD}
HOST: ${DB_HOST}
PORT: 5432
redis:
LOCATION: redis://${REDIS_HOST}:6379/0
services:
user:
URL: http://user-service:8000
product:
URL: http://product-service:8000
order:
URL: http://order-service:8000
payment:
URL: http://payment-service:8000
"""#数据库设计
#用户服务数据库设计
# services/user/user_service/apps/user_auth/models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
from django.utils import timezone
import uuid
class User(AbstractUser):
"""用户模型"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
phone = models.CharField(max_length=20, blank=True, unique=True)
avatar = models.URLField(blank=True)
email_verified = models.BooleanField(default=False)
phone_verified = models.BooleanField(default=False)
is_premium = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'users'
indexes = [
models.Index(fields=['email']),
models.Index(fields=['phone']),
models.Index(fields=['created_at']),
]
class UserProfile(models.Model):
"""用户资料"""
GENDER_CHOICES = [
('M', 'Male'),
('F', 'Female'),
('O', 'Other'),
]
user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='profile')
nickname = models.CharField(max_length=50, blank=True)
gender = models.CharField(max_length=1, choices=GENDER_CHOICES, blank=True)
birth_date = models.DateField(null=True, blank=True)
bio = models.TextField(max_length=500, blank=True)
location = models.CharField(max_length=100, blank=True)
website = models.URLField(blank=True)
social_links = models.JSONField(default=dict, blank=True)
class Meta:
db_table = 'user_profiles'
class UserAddress(models.Model):
"""用户地址"""
ADDRESS_TYPE_CHOICES = [
('home', 'Home'),
('office', 'Office'),
('other', 'Other'),
]
user = models.ForeignKey(User, on_delete=models.CASCADE, related_name='addresses')
recipient_name = models.CharField(max_length=100)
phone = models.CharField(max_length=20)
province = models.CharField(max_length=50)
city = models.CharField(max_length=50)
district = models.CharField(max_length=50)
detail_address = models.TextField()
postal_code = models.CharField(max_length=10)
address_type = models.CharField(max_length=20, choices=ADDRESS_TYPE_CHOICES, default='home')
is_default = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'user_addresses'
indexes = [
models.Index(fields=['user', 'is_default']),
]
class UserSecurity(models.Model):
"""用户安全"""
user = models.OneToOneField(User, on_delete=models.CASCADE, related_name='security')
failed_login_attempts = models.IntegerField(default=0)
locked_until = models.DateTimeField(null=True, blank=True)
last_login_ip = models.GenericIPAddressField(null=True, blank=True)
login_history = models.JSONField(default=list, blank=True)
two_factor_enabled = models.BooleanField(default=False)
two_factor_secret = models.CharField(max_length=32, blank=True)
class Meta:
db_table = 'user_security'
# services/user/user_service/apps/user_auth/migrations/0001_initial.py
"""
from django.db import migrations, models
import django.utils.timezone
import uuid
class Migration(migrations.Migration):
initial = True
dependencies = [
('auth', '0012_alter_user_first_name_max_length'),
]
operations = [
migrations.CreateModel(
name='User',
fields=[
('id', models.UUIDField(default=uuid.uuid4, editable=False, primary_key=True, serialize=False)),
('password', models.CharField(max_length=128, verbose_name='password')),
('last_login', models.DateTimeField(blank=True, null=True, verbose_name='last login')),
('is_superuser', models.BooleanField(default=False, help_text='Designates that this user has all permissions without explicitly assigning them.', verbose_name='superuser status')),
('username', models.CharField(error_messages={'unique': 'A user with that username already exists.'}, help_text='Required. 150 characters or fewer. Letters, digits and @/./+/-/_ only.', max_length=150, unique=True, validators=[django.contrib.auth.validators.UnicodeUsernameValidator()], verbose_name='username')),
('first_name', models.CharField(blank=True, max_length=150, verbose_name='first name')),
('last_name', models.CharField(blank=True, max_length=150, verbose_name='last name')),
('email', models.EmailField(blank=True, max_length=254, verbose_name='email address')),
('is_staff', models.BooleanField(default=False, help_text='Designates whether the user can log into this admin site.', verbose_name='staff status')),
('is_active', models.BooleanField(default=True, help_text='Designates whether this user should be treated as active. Unselect this instead of deleting accounts.', verbose_name='active')),
('date_joined', models.DateTimeField(default=django.utils.timezone.now, verbose_name='date joined')),
('phone', models.CharField(blank=True, max_length=20, unique=True)),
('avatar', models.URLField(blank=True)),
('email_verified', models.BooleanField(default=False)),
('phone_verified', models.BooleanField(default=False)),
('is_premium', models.BooleanField(default=False)),
('created_at', models.DateTimeField(auto_now_add=True)),
('updated_at', models.DateTimeField(auto_now=True)),
],
options={
'db_table': 'users',
},
),
# 其他模型的迁移...
]
"""#商品服务数据库设计
# services/product/product_service/models.py
from django.db import models
from django.core.validators import MinValueValidator, MaxValueValidator
import uuid
class Category(models.Model):
"""商品分类"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
slug = models.SlugField(unique=True, max_length=120)
description = models.TextField(blank=True)
parent = models.ForeignKey('self', null=True, blank=True, on_delete=models.CASCADE, related_name='children')
level = models.PositiveIntegerField(default=0)
lft = models.PositiveIntegerField(db_index=True) # Left value for nested set
rght = models.PositiveIntegerField(db_index=True) # Right value for nested set
tree_id = models.PositiveIntegerField(db_index=True) # Tree identifier
position = models.PositiveIntegerField(default=0)
is_active = models.BooleanField(default=True)
icon = models.URLField(blank=True)
banner_image = models.URLField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'categories'
ordering = ['tree_id', 'lft']
indexes = [
models.Index(fields=['slug']),
models.Index(fields=['parent']),
models.Index(fields=['is_active', 'level']),
]
def __str__(self):
return self.name
class Brand(models.Model):
"""品牌"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
slug = models.SlugField(unique=True)
description = models.TextField(blank=True)
logo = models.URLField(blank=True)
website = models.URLField(blank=True)
country = models.CharField(max_length=50, blank=True)
established_year = models.PositiveIntegerField(null=True, blank=True)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'brands'
indexes = [
models.Index(fields=['slug']),
models.Index(fields=['is_active']),
]
class Product(models.Model):
"""商品"""
PRODUCT_STATUS_CHOICES = [
('draft', 'Draft'),
('pending_review', 'Pending Review'),
('approved', 'Approved'),
('rejected', 'Rejected'),
('active', 'Active'),
('inactive', 'Inactive'),
('discontinued', 'Discontinued'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200)
slug = models.SlugField(unique=True, max_length=250)
description = models.TextField()
short_description = models.CharField(max_length=500, blank=True)
category = models.ForeignKey(Category, on_delete=models.SET_NULL, null=True, related_name='products')
brand = models.ForeignKey(Brand, on_delete=models.SET_NULL, null=True, related_name='products')
sku = models.CharField(max_length=100, unique=True) # Stock Keeping Unit
barcode = models.CharField(max_length=50, blank=True, db_index=True) # 条形码
price = models.DecimalField(max_digits=10, decimal_places=2, validators=[MinValueValidator(0)])
cost_price = models.DecimalField(max_digits=10, decimal_places=2, validators=[MinValueValidator(0)], null=True, blank=True)
market_price = models.DecimalField(max_digits=10, decimal_places=2, validators=[MinValueValidator(0)], null=True, blank=True) # 市场价/划线价
tax_rate = models.DecimalField(max_digits=5, decimal_places=2, default=0, validators=[MinValueValidator(0), MaxValueValidator(100)])
weight = models.DecimalField(max_digits=8, decimal_places=3, null=True, blank=True, help_text="重量(kg)")
dimensions = models.JSONField(default=dict, blank=True, help_text="尺寸: {length: 0, width: 0, height: 0}") # 长宽高
color_options = models.JSONField(default=list, blank=True, help_text="颜色选项")
size_options = models.JSONField(default=list, blank=True, help_text="尺码选项")
material = models.CharField(max_length=100, blank=True)
origin_country = models.CharField(max_length=50, blank=True)
warranty_period = models.PositiveIntegerField(null=True, blank=True, help_text="保修期(月)")
status = models.CharField(max_length=20, choices=PRODUCT_STATUS_CHOICES, default='draft')
is_featured = models.BooleanField(default=False, help_text="是否推荐商品")
is_new = models.BooleanField(default=False, help_text="是否新品")
rating = models.DecimalField(max_digits=3, decimal_places=2, default=0, validators=[MinValueValidator(0), MaxValueValidator(5.0)])
review_count = models.PositiveIntegerField(default=0)
view_count = models.PositiveIntegerField(default=0)
sales_count = models.PositiveIntegerField(default=0)
favorite_count = models.PositiveIntegerField(default=0)
seo_title = models.CharField(max_length=200, blank=True)
seo_description = models.CharField(max_length=500, blank=True)
seo_keywords = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'products'
indexes = [
models.Index(fields=['slug']),
models.Index(fields=['sku']),
models.Index(fields=['barcode']),
models.Index(fields=['category']),
models.Index(fields=['brand']),
models.Index(fields=['status']),
models.Index(fields=['price']),
models.Index(fields=['rating']),
models.Index(fields=['created_at']),
models.Index(fields=['sales_count']),
]
def __str__(self):
return self.name
class ProductImage(models.Model):
"""商品图片"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
product = models.ForeignKey(Product, on_delete=models.CASCADE, related_name='images')
image_url = models.URLField()
alt_text = models.CharField(max_length=200, blank=True)
is_primary = models.BooleanField(default=False)
sort_order = models.PositiveIntegerField(default=0)
width = models.PositiveIntegerField(null=True, blank=True)
height = models.PositiveIntegerField(null=True, blank=True)
file_size = models.PositiveIntegerField(null=True, blank=True) # 文件大小(bytes)
upload_time = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'product_images'
ordering = ['sort_order', '-is_primary']
indexes = [
models.Index(fields=['product', 'is_primary']),
models.Index(fields=['sort_order']),
]
class ProductSpecification(models.Model):
"""商品规格"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
product = models.ForeignKey(Product, on_delete=models.CASCADE, related_name='specifications')
specification_name = models.CharField(max_length=100)
specification_value = models.CharField(max_length=500)
unit = models.CharField(max_length=20, blank=True) # 单位
sort_order = models.PositiveIntegerField(default=0)
class Meta:
db_table = 'product_specifications'
unique_together = ['product', 'specification_name']
ordering = ['sort_order']
class ProductInventory(models.Model):
"""商品库存"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
product = models.OneToOneField(Product, on_delete=models.CASCADE, related_name='inventory')
stock_quantity = models.PositiveIntegerField(default=0, help_text="实际库存数量")
reserved_quantity = models.PositiveIntegerField(default=0, help_text="预留库存数量")
available_quantity = models.PositiveIntegerField(default=0, help_text="可用库存数量")
safety_stock = models.PositiveIntegerField(default=10, help_text="安全库存数量")
reorder_point = models.PositiveIntegerField(default=20, help_text="补货点")
reorder_quantity = models.PositiveIntegerField(default=100, help_text="补货数量")
warehouse_location = models.CharField(max_length=100, blank=True, help_text="仓库位置")
last_stock_update = models.DateTimeField(auto_now=True)
stock_status = models.CharField(max_length=20, default='in_stock') # in_stock, low_stock, out_of_stock
class Meta:
db_table = 'product_inventory'
indexes = [
models.Index(fields=['available_quantity']),
models.Index(fields=['stock_status']),
]
class ProductReview(models.Model):
"""商品评价"""
RATING_CHOICES = [(i, i) for i in range(1, 6)]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
product = models.ForeignKey(Product, on_delete=models.CASCADE, related_name='reviews')
user = models.ForeignKey('user.User', on_delete=models.CASCADE, related_name='product_reviews') # 假设用户服务已同步
order_item = models.ForeignKey('order.OrderItem', on_delete=models.SET_NULL, null=True, blank=True) # 关联订单项
rating = models.PositiveSmallIntegerField(choices=RATING_CHOICES)
title = models.CharField(max_length=200, blank=True)
content = models.TextField()
is_verified_purchase = models.BooleanField(default=False) # 是否已购买验证
is_approved = models.BooleanField(default=False) # 是否已审核
helpful_count = models.PositiveIntegerField(default=0) # 有用投票数
reported_count = models.PositiveIntegerField(default=0) # 举报数
ip_address = models.GenericIPAddressField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'product_reviews'
unique_together = ['product', 'user']
indexes = [
models.Index(fields=['product', 'rating']),
models.Index(fields=['user']),
models.Index(fields=['is_approved']),
models.Index(fields=['created_at']),
]
class ProductQuestion(models.Model):
"""商品问答"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
product = models.ForeignKey(Product, on_delete=models.CASCADE, related_name='questions')
user = models.ForeignKey('user.User', on_delete=models.CASCADE, related_name='product_questions') # 假设用户服务已同步
question = models.TextField()
answer = models.TextField(blank=True)
answered_by = models.ForeignKey('user.User', on_delete=models.SET_NULL, null=True, blank=True, related_name='answered_questions') # 回答者
is_answered = models.BooleanField(default=False)
helpful_count = models.PositiveIntegerField(default=0)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'product_questions'
indexes = [
models.Index(fields=['product']),
models.Index(fields=['is_answered']),
models.Index(fields=['created_at']),
]#订单服务数据库设计
# services/order/order_service/models.py
from django.db import models
from django.core.validators import MinValueValidator, MaxValueValidator
import uuid
from decimal import Decimal
class Order(models.Model):
"""订单"""
ORDER_STATUS_CHOICES = [
('pending', 'Pending Payment'),
('paid', 'Paid'),
('confirmed', 'Confirmed'),
('preparing', 'Preparing'),
('shipped', 'Shipped'),
('delivered', 'Delivered'),
('cancelled', 'Cancelled'),
('refunded', 'Refunded'),
('disputed', 'Disputed'),
]
PAYMENT_STATUS_CHOICES = [
('pending', 'Pending'),
('processing', 'Processing'),
('completed', 'Completed'),
('failed', 'Failed'),
('refunded', 'Refunded'),
('partially_refunded', 'Partially Refunded'),
]
SHIPPING_STATUS_CHOICES = [
('not_shipped', 'Not Shipped'),
('shipped', 'Shipped'),
('in_transit', 'In Transit'),
('out_for_delivery', 'Out for Delivery'),
('delivered', 'Delivered'),
('returned', 'Returned'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
order_no = models.CharField(max_length=50, unique=True, db_index=True) # 订单号
user_id = models.UUIDField(db_index=True) # 外部用户服务关联
shipping_address = models.JSONField() # 收货地址(JSON格式)
billing_address = models.JSONField() # 账单地址(JSON格式)
items = models.JSONField() # 订单项(JSON格式,包含商品ID、数量、价格等)
# 价格相关
subtotal = models.DecimalField(max_digits=12, decimal_places=2, validators=[MinValueValidator(0)]) # 小计
discount_amount = models.DecimalField(max_digits=12, decimal_places=2, default=0, validators=[MinValueValidator(0)]) # 优惠金额
shipping_fee = models.DecimalField(max_digits=12, decimal_places=2, default=0, validators=[MinValueValidator(0)]) # 运费
tax_amount = models.DecimalField(max_digits=12, decimal_places=2, default=0, validators=[MinValueValidator(0)]) # 税费
total_amount = models.DecimalField(max_digits=12, decimal_places=2, validators=[MinValueValidator(0)]) # 总金额
paid_amount = models.DecimalField(max_digits=12, decimal_places=2, default=0, validators=[MinValueValidator(0)]) # 已付金额
# 状态
status = models.CharField(max_length=20, choices=ORDER_STATUS_CHOICES, default='pending')
payment_status = models.CharField(max_length=20, choices=PAYMENT_STATUS_CHOICES, default='pending')
shipping_status = models.CharField(max_length=20, choices=SHIPPING_STATUS_CHOICES, default='not_shipped')
# 时间戳
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
paid_at = models.DateTimeField(null=True, blank=True)
shipped_at = models.DateTimeField(null=True, blank=True)
delivered_at = models.DateTimeField(null=True, blank=True)
cancelled_at = models.DateTimeField(null=True, blank=True)
# 其他信息
notes = models.TextField(blank=True) # 用户备注
internal_notes = models.TextField(blank=True) # 内部备注
tracking_number = models.CharField(max_length=100, blank=True, db_index=True) # 物流单号
shipping_company = models.CharField(max_length=100, blank=True) # 快递公司
coupon_code = models.CharField(max_length=50, blank=True) # 优惠券代码
gift_message = models.TextField(blank=True) # 礼品留言
class Meta:
db_table = 'orders'
indexes = [
models.Index(fields=['order_no']),
models.Index(fields=['user_id']),
models.Index(fields=['status']),
models.Index(fields=['payment_status']),
models.Index(fields=['shipping_status']),
models.Index(fields=['tracking_number']),
models.Index(fields=['created_at']),
models.Index(fields=['total_amount']),
]
def __str__(self):
return f"Order {self.order_no}"
class OrderItem(models.Model):
"""订单项"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
order = models.ForeignKey(Order, on_delete=models.CASCADE, related_name='order_items')
product_id = models.UUIDField(db_index=True) # 外部商品服务关联
product_name = models.CharField(max_length=200) # 商品名称(快照)
product_sku = models.CharField(max_length=100) # 商品SKU(快照)
quantity = models.PositiveIntegerField(validators=[MinValueValidator(1)])
unit_price = models.DecimalField(max_digits=10, decimal_places=2, validators=[MinValueValidator(0)]) # 单价
total_price = models.DecimalField(max_digits=12, decimal_places=2, validators=[MinValueValidator(0)]) # 总价
discount_amount = models.DecimalField(max_digits=12, decimal_places=2, default=0, validators=[MinValueValidator(0)]) # 优惠金额
attributes = models.JSONField(default=dict, blank=True) # 商品属性(JSON格式)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'order_items'
indexes = [
models.Index(fields=['order']),
models.Index(fields=['product_id']),
]
class OrderPayment(models.Model):
"""订单支付"""
PAYMENT_METHOD_CHOICES = [
('credit_card', 'Credit Card'),
('debit_card', 'Debit Card'),
('paypal', 'PayPal'),
('alipay', 'Alipay'),
('wechat_pay', 'WeChat Pay'),
('bank_transfer', 'Bank Transfer'),
('cod', 'Cash on Delivery'),
]
PAYMENT_GATEWAY_CHOICES = [
('stripe', 'Stripe'),
('paypal', 'PayPal'),
('alipay', 'Alipay'),
('wechat', 'WeChat Pay'),
('razorpay', 'Razorpay'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
order = models.ForeignKey(Order, on_delete=models.CASCADE, related_name='payments')
payment_method = models.CharField(max_length=20, choices=PAYMENT_METHOD_CHOICES)
payment_gateway = models.CharField(max_length=20, choices=PAYMENT_GATEWAY_CHOICES)
transaction_id = models.CharField(max_length=100, unique=True) # 交易ID
amount = models.DecimalField(max_digits=12, decimal_places=2, validators=[MinValueValidator(0)])
currency = models.CharField(max_length=3, default='USD')
status = models.CharField(max_length=20, choices=Order.PAYMENT_STATUS_CHOICES, default='pending')
gateway_response = models.JSONField(default=dict, blank=True) # 支付网关响应
gateway_fee = models.DecimalField(max_digits=10, decimal_places=2, default=0, validators=[MinValueValidator(0)]) # 支付网关费用
processed_at = models.DateTimeField(null=True, blank=True)
refunded_at = models.DateTimeField(null=True, blank=True)
refund_transaction_id = models.CharField(max_length=100, blank=True) # 退款交易ID
refund_amount = models.DecimalField(max_digits=12, decimal_places=2, default=0, validators=[MinValueValidator(0)]) # 退款金额
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'order_payments'
indexes = [
models.Index(fields=['order']),
models.Index(fields=['transaction_id']),
models.Index(fields=['status']),
models.Index(fields=['created_at']),
]
class OrderShipping(models.Model):
"""订单配送"""
SHIPPING_METHOD_CHOICES = [
('standard', 'Standard Shipping'),
('express', 'Express Shipping'),
('overnight', 'Overnight Shipping'),
('pickup', 'Pickup from Store'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
order = models.OneToOneField(Order, on_delete=models.CASCADE, related_name='shipping_info')
shipping_method = models.CharField(max_length=20, choices=SHIPPING_METHOD_CHOICES)
shipping_company = models.CharField(max_length=100)
tracking_number = models.CharField(max_length=100, unique=True)
tracking_url = models.URLField(blank=True)
estimated_delivery = models.DateTimeField(null=True, blank=True) # 预计送达时间
actual_delivery = models.DateTimeField(null=True, blank=True) # 实际送达时间
shipping_cost = models.DecimalField(max_digits=10, decimal_places=2, validators=[MinValueValidator(0)])
shipping_address = models.JSONField() # 配送地址
insurance_cost = models.DecimalField(max_digits=10, decimal_places=2, default=0, validators=[MinValueValidator(0)]) # 保险费用
signature_required = models.BooleanField(default=False) # 是否需要签收
gift_wrapping = models.BooleanField(default=False) # 礼品包装
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'order_shipping'
indexes = [
models.Index(fields=['tracking_number']),
models.Index(fields=['shipping_company']),
]
class OrderRefund(models.Model):
"""订单退款"""
REFUND_REASON_CHOICES = [
('quality_issue', 'Quality Issue'),
('wrong_item', 'Wrong Item Sent'),
('not_as_described', 'Not as Described'),
('no_longer_needed', 'No Longer Needed'),
('delivery_issue', 'Delivery Issue'),
('duplicate_order', 'Duplicate Order'),
('other', 'Other'),
]
REFUND_STATUS_CHOICES = [
('requested', 'Requested'),
('approved', 'Approved'),
('processing', 'Processing'),
('completed', 'Completed'),
('rejected', 'Rejected'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
order = models.ForeignKey(Order, on_delete=models.CASCADE, related_name='refunds')
order_item = models.ForeignKey(OrderItem, on_delete=models.SET_NULL, null=True, blank=True, related_name='refunds') # 退款商品项
reason = models.CharField(max_length=20, choices=REFUND_REASON_CHOICES)
reason_detail = models.TextField(blank=True) # 退款原因详情
requested_by = models.UUIDField() # 申请人ID
amount = models.DecimalField(max_digits=12, decimal_places=2, validators=[MinValueValidator(0)]) # 退款金额
refund_method = models.CharField(max_length=20, choices=OrderPayment.PAYMENT_METHOD_CHOICES) # 退款方式
status = models.CharField(max_length=20, choices=REFUND_STATUS_CHOICES, default='requested')
processed_by = models.UUIDField(null=True, blank=True) # 处理人ID
processed_at = models.DateTimeField(null=True, blank=True)
refund_transaction_id = models.CharField(max_length=100, blank=True) # 退款交易ID
notes = models.TextField(blank=True) # 处理备注
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'order_refunds'
indexes = [
models.Index(fields=['order']),
models.Index(fields=['status']),
models.Index(fields=['created_at']),
]
class ShoppingCart(models.Model):
"""购物车"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user_id = models.UUIDField(db_index=True) # 用户ID
session_id = models.CharField(max_length=128, null=True, blank=True, db_index=True) # 会话ID(游客)
items = models.JSONField(default=list) # 购物车项(JSON格式)
total_items = models.PositiveIntegerField(default=0) # 总商品数
total_amount = models.DecimalField(max_digits=12, decimal_places=2, default=0, validators=[MinValueValidator(0)]) # 总金额
coupon_code = models.CharField(max_length=50, blank=True) # 优惠券
shipping_estimate = models.DecimalField(max_digits=10, decimal_places=2, default=0, validators=[MinValueValidator(0)]) # 预估运费
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'shopping_cart'
indexes = [
models.Index(fields=['user_id']),
models.Index(fields=['session_id']),
models.Index(fields=['created_at']),
]#用户服务模块
#用户认证服务
# services/user/user_service/apps/user_auth/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, generics
from rest_framework.permissions import IsAuthenticated, AllowAny
from rest_framework_simplejwt.tokens import RefreshToken
from django.contrib.auth import authenticate
from django.contrib.auth.password_validation import validate_password
from django.core.exceptions import ValidationError
from .models import User, UserProfile, UserSecurity
from .serializers import (
UserRegistrationSerializer,
UserLoginSerializer,
UserProfileSerializer,
ChangePasswordSerializer,
ResetPasswordSerializer
)
from .utils import send_verification_email, send_sms_code
from shared.common.responses import SuccessResponse, ErrorResponse
import logging
logger = logging.getLogger(__name__)
class UserRegistrationView(APIView):
"""用户注册视图"""
permission_classes = [AllowAny]
def post(self, request):
"""用户注册"""
serializer = UserRegistrationSerializer(data=request.data)
if serializer.is_valid():
try:
user = serializer.save()
# 发送验证邮件
send_verification_email(user.email, user.id)
# 创建用户资料
UserProfile.objects.create(user=user)
UserSecurity.objects.create(user=user)
# 生成JWT token
refresh = RefreshToken.for_user(user)
return SuccessResponse({
'user': {
'id': str(user.id),
'username': user.username,
'email': user.email,
'phone': user.phone,
},
'tokens': {
'refresh': str(refresh),
'access': str(refresh.access_token),
},
'message': '用户注册成功,请查收验证邮件'
}, status=status.HTTP_201_CREATED)
except ValidationError as e:
return ErrorResponse(str(e), status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"User registration failed: {str(e)}")
return ErrorResponse('注册失败,请稍后重试', status=status.HTTP_500_INTERNAL_SERVER_ERROR)
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class UserLoginView(APIView):
"""用户登录视图"""
permission_classes = [AllowAny]
def post(self, request):
"""用户登录"""
serializer = UserLoginSerializer(data=request.data)
if serializer.is_valid():
user = serializer.validated_data('user')
login(request, user)
# 更新安全信息
security, created = UserSecurity.objects.get_or_create(user=user)
security.failed_login_attempts = 0
security.last_login_ip = self.get_client_ip(request)
security.login_history.append({
'timestamp': timezone.now().isoformat(),
'ip': security.last_login_ip,
'user_agent': request.META.get('HTTP_USER_AGENT', '')
})
# 只保留最近100次登录记录
security.login_history = security.login_history[-100:]
security.save()
# 生成JWT token
refresh = RefreshToken.for_user(user)
return SuccessResponse({
'user': {
'id': str(user.id),
'username': user.username,
'email': user.email,
'phone': user.phone,
'email_verified': user.email_verified,
'phone_verified': user.phone_verified,
},
'tokens': {
'refresh': str(refresh),
'access': str(refresh.access_token),
},
'message': '登录成功'
})
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def get_client_ip(self, request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class UserProfileView(APIView):
"""用户资料视图"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""获取用户资料"""
profile, created = UserProfile.objects.get_or_create(user=request.user)
serializer = UserProfileSerializer(profile)
return SuccessResponse({
'user': {
'id': str(request.user.id),
'username': request.user.username,
'email': request.user.email,
'phone': request.user.phone,
'is_premium': request.user.is_premium,
'date_joined': request.user.date_joined,
},
'profile': serializer.data
})
def put(self, request):
"""更新用户资料"""
profile, created = UserProfile.objects.get_or_create(user=request.user)
serializer = UserProfileSerializer(profile, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
# 如果更新了邮箱,发送验证邮件
if 'email' in request.data and request.user.email != request.data['email']:
request.user.email = request.data['email']
request.user.email_verified = False
request.user.save()
send_verification_email(request.user.email, request.user.id)
return SuccessResponse({
'profile': serializer.data,
'message': '资料更新成功'
})
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class ChangePasswordView(APIView):
"""修改密码视图"""
permission_classes = [IsAuthenticated]
def post(self, request):
"""修改密码"""
serializer = ChangePasswordSerializer(data=request.data, context={'user': request.user})
if serializer.is_valid():
request.user.set_password(serializer.validated_data['new_password'])
request.user.save()
# 登出所有会话
from django.contrib.auth import logout
logout(request)
return SuccessResponse({
'message': '密码修改成功,请重新登录'
})
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class SendVerificationCodeView(APIView):
"""发送验证代码视图"""
permission_classes = [AllowAny]
def post(self, request):
"""发送邮箱或手机验证代码"""
email = request.data.get('email')
phone = request.data.get('phone')
if email:
# 发送邮箱验证
from django.contrib.auth.tokens import default_token_generator
from django.utils.http import urlsafe_base64_encode
from django.utils.encoding import force_bytes
uid = urlsafe_base64_encode(force_bytes(request.user.id))
token = default_token_generator.make_token(request.user)
send_verification_email(email, uid, token)
return SuccessResponse({'message': '验证邮件已发送'})
elif phone:
# 发送手机验证
send_sms_code(phone)
return SuccessResponse({'message': '验证码已发送'})
return ErrorResponse({'error': '请提供邮箱或手机号'})
class VerifyEmailView(APIView):
"""邮箱验证视图"""
permission_classes = [AllowAny]
def post(self, request):
"""验证邮箱"""
uid = request.data.get('uid')
token = request.data.get('token')
from django.contrib.auth.tokens import default_token_generator
from django.utils.http import urlsafe_base64_decode
from django.utils.encoding import force_text
try:
uid = force_text(urlsafe_base64_decode(uid))
user = User.objects.get(pk=uid)
except (TypeError, ValueError, OverflowError, User.DoesNotExist):
user = None
if user is not None and default_token_generator.check_token(user, token):
user.email_verified = True
user.save()
return SuccessResponse({'message': '邮箱验证成功'})
else:
return ErrorResponse({'error': '验证链接无效或已过期'}, status=status.HTTP_400_BAD_REQUEST)
# serializers.py
from rest_framework import serializers
from django.contrib.auth import authenticate
from django.contrib.auth.password_validation import validate_password
from .models import User, UserProfile
class UserRegistrationSerializer(serializers.ModelSerializer):
password = serializers.CharField(write_only=True, validators=[validate_password])
password_confirm = serializers.CharField(write_only=True)
class Meta:
model = User
fields = ['username', 'email', 'phone', 'password', 'password_confirm']
def validate(self, attrs):
if attrs['password'] != attrs['password_confirm']:
raise serializers.ValidationError('两次密码输入不一致')
return attrs
def create(self, validated_data):
validated_data.pop('password_confirm')
password = validated_data.pop('password')
user = User.objects.create_user(**validated_data)
user.set_password(password)
user.save()
return user
class UserLoginSerializer(serializers.Serializer):
username = serializers.CharField()
password = serializers.CharField()
remember_me = serializers.BooleanField(required=False, default=False)
def validate(self, attrs):
username = attrs.get('username')
password = attrs.get('password')
if username and password:
user = authenticate(username=username, password=password)
if user:
if not user.is_active:
raise serializers.ValidationError('账户已被禁用')
attrs['user'] = user
return attrs
else:
raise serializers.ValidationError('用户名或密码错误')
else:
raise serializers.ValidationError('必须提供用户名和密码')
class UserProfileSerializer(serializers.ModelSerializer):
class Meta:
model = UserProfile
fields = '__all__'
read_only_fields = ['user']
class ChangePasswordSerializer(serializers.Serializer):
old_password = serializers.CharField(required=True)
new_password = serializers.CharField(required=True, validators=[validate_password])
new_password_confirm = serializers.CharField(required=True)
def validate(self, attrs):
if attrs['new_password'] != attrs['new_password_confirm']:
raise serializers.ValidationError('两次新密码输入不一致')
old_password = attrs['old_password']
user = self.context['user']
if not user.check_password(old_password):
raise serializers.ValidationError('原密码错误')
if old_password == attrs['new_password']:
raise serializers.ValidationError('新密码不能与原密码相同')
return attrs#用户安全管理
# services/user/user_service/apps/user_security/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework import status
from django.core.cache import cache
from django.contrib.auth.hashers import check_password
from django.utils import timezone
from datetime import timedelta
from .models import UserSecurity
from shared.common.responses import SuccessResponse, ErrorResponse
import logging
logger = logging.getLogger(__name__)
class SecuritySettingsView(APIView):
"""安全设置视图"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""获取安全设置"""
security, created = UserSecurity.objects.get_or_create(user=request.user)
return SuccessResponse({
'two_factor_enabled': security.two_factor_enabled,
'failed_login_attempts': security.failed_login_attempts,
'locked_until': security.locked_until,
'login_history': security.login_history[-10:], # 最近10次登录
})
def put(self, request):
"""更新安全设置"""
security, created = UserSecurity.objects.get_or_create(user=request.user)
two_factor_enabled = request.data.get('two_factor_enabled')
if two_factor_enabled is not None:
security.two_factor_enabled = two_factor_enabled
if two_factor_enabled and not security.two_factor_secret:
# 生成两步验证密钥
import pyotp
secret = pyotp.random_base32()
security.two_factor_secret = secret
security.save()
return SuccessResponse({
'message': '安全设置已更新',
'two_factor_enabled': security.two_factor_enabled
})
class LoginHistoryView(APIView):
"""登录历史视图"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""获取登录历史"""
security, created = UserSecurity.objects.get_or_create(user=request.user)
# 返回最近50次登录记录
recent_logins = security.login_history[-50:]
return SuccessResponse({
'login_history': recent_logins,
'total_logins': len(security.login_history)
})
class DeviceManagementView(APIView):
"""设备管理视图"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""获取设备列表"""
# 这里可以从登录历史中提取设备信息
security, created = UserSecurity.objects.get_or_create(user=request.user)
devices = []
user_agents = {}
for login in security.login_history[-50:]:
ua = login.get('user_agent', '')
if ua in user_agents:
user_agents[ua] += 1
else:
user_agents[ua] = 1
for ua, count in user_agents.items():
devices.append({
'user_agent': ua,
'login_count': count,
'is_current': ua == request.META.get('HTTP_USER_AGENT', '')
})
return SuccessResponse({'devices': devices})
class PasswordResetRequestView(APIView):
"""密码重置请求视图"""
def post(self, request):
"""请求密码重置"""
email = request.data.get('email')
if not email:
return ErrorResponse({'error': '请提供邮箱地址'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(email=email)
except User.DoesNotExist:
# 为了安全,不透露用户是否存在
from django.core.mail import send_mail
# 发送一封通用的成功消息邮件
send_mail(
'密码重置说明',
'如果您的邮箱存在于我们的系统中,您将很快收到密码重置邮件。',
'noreply@yourdomain.com',
[email],
fail_silently=True,
)
return SuccessResponse({'message': '如果邮箱存在,重置链接将发送到您的邮箱'})
# 生成重置token并发送邮件
from django.contrib.auth.tokens import default_token_generator
from django.utils.http import urlsafe_base64_encode
from django.utils.encoding import force_bytes
from django.template.loader import render_to_string
from django.core.mail import EmailMultiAlternatives
uid = urlsafe_base64_encode(force_bytes(user.pk))
token = default_token_generator.make_token(user)
reset_link = f"https://yoursite.com/reset-password/{uid}/{token}/"
subject = '重置您的密码'
text_content = render_to_string('emails/password_reset.txt', {
'user': user,
'reset_link': reset_link,
})
html_content = render_to_string('emails/password_reset.html', {
'user': user,
'reset_link': reset_link,
})
msg = EmailMultiAlternatives(subject, text_content, 'noreply@yourdomain.com', [user.email])
msg.attach_alternative(html_content, "text/html")
msg.send()
return SuccessResponse({'message': '重置链接已发送到您的邮箱'})
class AccountLockingMiddleware:
"""账户锁定中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# 检查登录尝试
if request.path == '/api/auth/login/' and request.method == 'POST':
username = request.data.get('username')
if username:
security = UserSecurity.objects.filter(user__username=username).first()
if security and security.locked_until:
if timezone.now() < security.locked_until:
return ErrorResponse({
'error': '账户已被锁定,请稍后再试'
}, status=status.HTTP_423_LOCKED)
response = self.get_response(request)
return response
# 信号处理器
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.core.cache import cache
@receiver(post_save, sender=User)
def user_post_save_handler(sender, instance, created, **kwargs):
"""用户保存后的处理"""
if created:
# 新用户创建时的处理
from .tasks import send_welcome_email_task
send_welcome_email_task.delay(str(instance.id))
# 清除相关缓存
cache.delete(f'user_profile_{instance.id}')
cache.delete(f'user_permissions_{instance.id}')#商品服务模块
#商品管理服务
# services/product/product_service/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, generics, filters
from rest_framework.permissions import IsAuthenticated, AllowAny
from django_filters.rest_framework import DjangoFilterBackend
from django.core.cache import cache
from django.db import transaction
from django.db.models import Q, Count, Avg, Sum
from .models import Product, Category, Brand, ProductImage, ProductInventory, ProductReview
from .serializers import (
ProductSerializer, CategorySerializer, BrandSerializer,
ProductDetailSerializer, ProductInventorySerializer, ProductReviewSerializer
)
from shared.common.responses import SuccessResponse, ErrorResponse
from shared.common.pagination import CustomPageNumberPagination
import logging
logger = logging.getLogger(__name__)
class ProductListView(generics.ListAPIView):
"""商品列表视图"""
serializer_class = ProductSerializer
pagination_class = CustomPageNumberPagination
filter_backends = [DjangoFilterBackend, filters.SearchFilter, filters.OrderingFilter]
search_fields = ['name', 'description', 'sku', 'brand__name', 'category__name']
ordering_fields = ['price', 'rating', 'created_at', 'sales_count']
ordering = ['-created_at']
def get_queryset(self):
"""获取商品查询集"""
queryset = Product.objects.filter(status='active').select_related(
'category', 'brand', 'inventory'
).prefetch_related('images')
# 分类筛选
category_slug = self.request.query_params.get('category')
if category_slug:
queryset = queryset.filter(category__slug=category_slug)
# 品牌筛选
brand_slug = self.request.query_params.get('brand')
if brand_slug:
queryset = queryset.filter(brand__slug=brand_slug)
# 价格区间筛选
min_price = self.request.query_params.get('min_price')
max_price = self.request.query_params.get('max_price')
if min_price:
queryset = queryset.filter(price__gte=min_price)
if max_price:
queryset = queryset.filter(price__lte=max_price)
# 库存状态筛选
in_stock = self.request.query_params.get('in_stock')
if in_stock:
if in_stock.lower() in ['true', '1']:
queryset = queryset.filter(inventory__available_quantity__gt=0)
else:
queryset = queryset.filter(inventory__available_quantity=0)
# 评分筛选
min_rating = self.request.query_params.get('min_rating')
if min_rating:
queryset = queryset.filter(rating__gte=min_rating)
return queryset
def list(self, request, *args, **kwargs):
"""商品列表"""
# 使用缓存
cache_key = f"product_list_{request.GET.urlencode()}"
cached_data = cache.get(cache_key)
if cached_data:
return Response(cached_data)
response = super().list(request, *args, **kwargs)
cache.set(cache_key, response.data, 300) # 缓存5分钟
return response
class ProductDetailView(APIView):
"""商品详情视图"""
permission_classes = [AllowAny]
def get(self, request, pk):
"""获取商品详情"""
cache_key = f"product_detail_{pk}"
cached_data = cache.get(cache_key)
if cached_data:
# 增加浏览量
Product.objects.filter(id=pk).update(view_count=models.F('view_count') + 1)
return SuccessResponse(cached_data)
try:
product = Product.objects.select_related(
'category', 'brand', 'inventory'
).prefetch_related(
'images', 'specifications', 'reviews'
).get(id=pk, status='active')
# 增加浏览量
Product.objects.filter(id=pk).update(view_count=models.F('view_count') + 1)
serializer = ProductDetailSerializer(product)
data = serializer.data
# 缓存详情
cache.set(cache_key, data, 600) # 缓存10分钟
return SuccessResponse(data)
except Product.DoesNotExist:
return ErrorResponse({'error': '商品不存在或已下架'}, status=status.HTTP_404_NOT_FOUND)
class ProductCreateView(APIView):
"""商品创建视图"""
permission_classes = [IsAuthenticated]
def post(self, request):
"""创建商品"""
# 验证用户权限(比如商家角色)
if not self.has_seller_permission(request.user):
return ErrorResponse({'error': '权限不足'}, status=status.HTTP_403_FORBIDDEN)
with transaction.atomic():
# 创建商品基本信息
product_data = request.data.copy()
images_data = product_data.pop('images', [])
specifications_data = product_data.pop('specifications', [])
inventory_data = product_data.pop('inventory', {})
serializer = ProductSerializer(data=product_data)
if serializer.is_valid():
product = serializer.save()
# 处理商品图片
for img_data in images_data:
ProductImage.objects.create(product=product, **img_data)
# 处理商品规格
for spec_data in specifications_data:
ProductSpecification.objects.create(product=product, **spec_data)
# 处理商品库存
if inventory_data:
ProductInventory.objects.create(product=product, **inventory_data)
# 清除相关缓存
self.clear_product_cache(product.id)
return SuccessResponse({
'product': ProductSerializer(product).data,
'message': '商品创建成功'
}, status=status.HTTP_201_CREATED)
else:
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def has_seller_permission(self, user):
"""检查用户是否有商家权限"""
# 这里可以根据实际业务逻辑判断
return user.groups.filter(name='Seller').exists()
class ProductUpdateView(APIView):
"""商品更新视图"""
permission_classes = [IsAuthenticated]
def put(self, request, pk):
"""更新商品"""
try:
product = Product.objects.get(id=pk)
# 验证权限
if not self.has_edit_permission(request.user, product):
return ErrorResponse({'error': '权限不足'}, status=status.HTTP_403_FORBIDDEN)
with transaction.atomic():
# 更新商品基本信息
product_data = request.data.copy()
images_data = product_data.pop('images', [])
specifications_data = product_data.pop('specifications', [])
inventory_data = product_data.pop('inventory', {})
serializer = ProductSerializer(product, data=product_data, partial=True)
if serializer.is_valid():
product = serializer.save()
# 更新商品图片
if images_data:
product.images.all().delete()
for img_data in images_data:
ProductImage.objects.create(product=product, **img_data)
# 更新商品规格
if specifications_data:
product.specifications.all().delete()
for spec_data in specifications_data:
ProductSpecification.objects.create(product=product, **spec_data)
# 更新商品库存
if inventory_data:
inventory, created = ProductInventory.objects.get_or_create(
product=product,
defaults=inventory_data
)
if not created:
for key, value in inventory_data.items():
setattr(inventory, key, value)
inventory.save()
# 清除相关缓存
self.clear_product_cache(product.id)
return SuccessResponse({
'product': ProductDetailSerializer(product).data,
'message': '商品更新成功'
})
else:
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Product.DoesNotExist:
return ErrorResponse({'error': '商品不存在'}, status=status.HTTP_404_NOT_FOUND)
def has_edit_permission(self, user, product):
"""检查编辑权限"""
# 根据实际业务逻辑判断权限
return user.is_staff or user.groups.filter(name='Admin').exists()
class ProductInventoryView(APIView):
"""商品库存视图"""
permission_classes = [IsAuthenticated]
def get(self, request, pk):
"""获取商品库存信息"""
try:
inventory = ProductInventory.objects.get(product_id=pk)
serializer = ProductInventorySerializer(inventory)
return SuccessResponse(serializer.data)
except ProductInventory.DoesNotExist:
return ErrorResponse({'error': '库存信息不存在'}, status=status.HTTP_404_NOT_FOUND)
def put(self, request, pk):
"""更新商品库存"""
try:
inventory = ProductInventory.objects.get(product_id=pk)
# 检查权限
if not self.has_inventory_permission(request.user):
return ErrorResponse({'error': '权限不足'}, status=status.HTTP_403_FORBIDDEN)
serializer = ProductInventorySerializer(inventory, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
# 更新库存状态
self.update_stock_status(inventory)
# 清除相关缓存
self.clear_product_cache(pk)
return SuccessResponse({
'inventory': serializer.data,
'message': '库存更新成功'
})
else:
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except ProductInventory.DoesNotExist:
return ErrorResponse({'error': '库存信息不存在'}, status=status.HTTP_404_NOT_FOUND)
def has_inventory_permission(self, user):
"""检查库存管理权限"""
return (user.is_staff or
user.groups.filter(name__in=['Admin', 'InventoryManager']).exists())
def update_stock_status(self, inventory):
"""更新库存状态"""
if inventory.available_quantity <= 0:
inventory.stock_status = 'out_of_stock'
elif inventory.available_quantity <= inventory.safety_stock:
inventory.stock_status = 'low_stock'
else:
inventory.stock_status = 'in_stock'
inventory.save()
def clear_product_cache(self, product_id):
"""清除商品相关缓存"""
cache.delete(f"product_detail_{product_id}")
cache.delete(f"product_inventory_{product_id}")
class CategoryListView(generics.ListAPIView):
"""分类列表视图"""
serializer_class = CategorySerializer
def get_queryset(self):
"""获取分类查询集"""
queryset = Category.objects.filter(is_active=True)
# 筛选顶级分类
parent = self.request.query_params.get('parent')
if parent:
queryset = queryset.filter(parent_id=parent)
else:
queryset = queryset.filter(parent__isnull=True)
return queryset
class BrandListView(generics.ListAPIView):
"""品牌列表视图"""
serializer_class = BrandSerializer
filter_backends = [filters.SearchFilter]
search_fields = ['name', 'description']
class ProductReviewView(APIView):
"""商品评价视图"""
permission_classes = [IsAuthenticated]
def get(self, request, product_pk):
"""获取商品评价"""
reviews = ProductReview.objects.filter(
product_id=product_pk,
is_approved=True
).select_related('user').order_by('-created_at')
# 分页处理
paginator = CustomPageNumberPagination()
paginated_reviews = paginator.paginate_queryset(reviews, request)
serializer = ProductReviewSerializer(paginated_reviews, many=True)
return paginator.get_paginated_response(serializer.data)
def post(self, request, product_pk):
"""添加商品评价"""
# 检查用户是否购买过该商品
if not self.has_purchased(request.user, product_pk):
return ErrorResponse(
{'error': '只有购买过的用户才能评价'},
status=status.HTTP_403_FORBIDDEN
)
data = request.data.copy()
data['product_id'] = product_pk
data['user_id'] = request.user.id
serializer = ProductReviewSerializer(data=data)
if serializer.is_valid():
review = serializer.save()
# 更新商品评分
self.update_product_rating(product_pk)
return SuccessResponse({
'review': serializer.data,
'message': '评价提交成功,等待审核'
}, status=status.HTTP_201_CREATED)
else:
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
def has_purchased(self, user, product_pk):
"""检查用户是否购买过该商品"""
# 这里需要检查订单信息
# 简化实现,实际应查询订单服务
return True # 暂时允许所有用户评价
def update_product_rating(self, product_pk):
"""更新商品评分"""
from django.db.models import Avg
avg_rating = ProductReview.objects.filter(
product_id=product_pk,
is_approved=True
).aggregate(avg_rating=Avg('rating'))['avg_rating']
if avg_rating:
Product.objects.filter(id=product_pk).update(
rating=round(avg_rating, 2),
review_count=ProductReview.objects.filter(
product_id=product_pk,
is_approved=True
).count()
)
# serializers.py for product service
from rest_framework import serializers
from .models import Product, Category, Brand, ProductImage, ProductSpecification, ProductInventory
class ProductImageSerializer(serializers.ModelSerializer):
class Meta:
model = ProductImage
fields = '__all__'
class ProductSpecificationSerializer(serializers.ModelSerializer):
class Meta:
model = ProductSpecification
fields = '__all__'
class ProductInventorySerializer(serializers.ModelSerializer):
class Meta:
model = ProductInventory
fields = '__all__'
class ProductSerializer(serializers.ModelSerializer):
class Meta:
model = Product
fields = '__all__'
read_only_fields = ['id', 'created_at', 'updated_at', 'rating', 'review_count',
'view_count', 'sales_count', 'favorite_count']
class ProductDetailSerializer(serializers.ModelSerializer):
images = ProductImageSerializer(many=True, read_only=True)
specifications = ProductSpecificationSerializer(many=True, read_only=True)
inventory = ProductInventorySerializer(read_only=True)
category_name = serializers.CharField(source='category.name', read_only=True)
brand_name = serializers.CharField(source='brand.name', read_only=True)
class Meta:
model = Product
fields = '__all__'
depth = 1
class CategorySerializer(serializers.ModelSerializer):
children = serializers.SerializerMethodField()
class Meta:
model = Category
fields = '__all__'
def get_children(self, obj):
"""获取子分类"""
children = Category.objects.filter(parent=obj, is_active=True)
return CategorySerializer(children, many=True).data
class BrandSerializer(serializers.ModelSerializer):
class Meta:
model = Brand
fields = '__all__'
class ProductReviewSerializer(serializers.ModelSerializer):
user_name = serializers.CharField(source='user.username', read_only=True)
class Meta:
model = ProductReview
fields = '__all__'
read_only_fields = ['id', 'user', 'is_approved', 'helpful_count',
'reported_count', 'created_at', 'updated_at']#商品搜索服务
# services/search/search_service/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django_elasticsearch_dsl_drf.constants import LOOKUP_QUERY_CONTAINS
from django_elasticsearch_dsl_drf.filter_backends import (
FilteringFilterBackend,
IdsFilterBackend,
OrderingFilterBackend,
DefaultOrderingFilterBackend,
)
from django_elasticsearch_dsl_drf.viewsets import DocumentViewSet
from django_elasticsearch_dsl_drf.pagination import PageNumberPagination
from .documents import ProductDocument
from shared.common.responses import SuccessResponse, ErrorResponse
class ProductSearchView(APIView):
"""商品搜索视图"""
def get(self, request):
"""商品搜索"""
query = request.query_params.get('q', '')
category = request.query_params.get('category', '')
brand = request.query_params.get('brand', '')
min_price = request.query_params.get('min_price', '')
max_price = request.query_params.get('max_price', '')
if not query:
return ErrorResponse({'error': '搜索关键词不能为空'}, status=status.HTTP_400_BAD_REQUEST)
# 构建搜索查询
search = ProductDocument.search()
# 基本搜索
search = search.query("multi_match", query=query, fields=['name', 'description', 'brand.name', 'category.name'])
# 过滤条件
if category:
search = search.filter('term', category__slug=category)
if brand:
search = search.filter('term', brand__slug=brand)
if min_price:
search = search.filter('range', price={'gte': min_price})
if max_price:
search = search.filter('range', price={'lte': max_price})
# 排序
sort_by = request.query_params.get('sort', '-created_at')
search = search.sort(sort_by)
# 分页
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 20))
start = (page - 1) * page_size
end = start + page_size
search = search[start:end]
# 执行搜索
response = search.execute()
results = []
for hit in response:
results.append({
'id': hit.meta.id,
'name': hit.name,
'description': hit.description[:200] + '...' if len(hit.description) > 200 else hit.description,
'price': hit.price,
'rating': hit.rating,
'category': {
'name': hit.category.name,
'slug': hit.category.slug
} if hasattr(hit, 'category') else {},
'brand': {
'name': hit.brand.name,
'slug': hit.brand.slug
} if hasattr(hit, 'brand') else {},
'image_url': hit.images[0]['image_url'] if hit.images else '',
})
return SuccessResponse({
'results': results,
'count': response.hits.total.value,
'page': page,
'page_size': page_size,
'total_pages': (response.hits.total.value + page_size - 1) // page_size,
'query': query,
})
class ProductRecommendationView(APIView):
"""商品推荐视图"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""获取商品推荐"""
user_id = request.user.id
# 基于用户历史行为的推荐
recommendations = self.get_user_based_recommendations(user_id)
# 基于商品相似度的推荐
also_bought = self.get_also_bought_recommendations(user_id)
# 热门商品推荐
popular_products = self.get_popular_products()
return SuccessResponse({
'user_based': recommendations,
'also_bought': also_bought,
'popular': popular_products,
})
def get_user_based_recommendations(self, user_id):
"""基于用户的推荐"""
# 这里应该实现推荐算法
# 简化实现:返回相似用户喜欢的商品
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("""
SELECT p.id, p.name, p.price, p.rating, pi.image_url
FROM products p
LEFT JOIN product_images pi ON p.id = pi.product_id AND pi.is_primary = true
WHERE p.status = 'active'
ORDER BY RANDOM()
LIMIT 10
""")
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results
def get_also_bought_recommendations(self, user_id):
"""购买此商品的用户还购买了"""
# 简化实现:返回随机商品
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("""
SELECT p.id, p.name, p.price, p.rating, pi.image_url
FROM products p
LEFT JOIN product_images pi ON p.id = pi.product_id AND pi.is_primary = true
WHERE p.status = 'active'
ORDER BY RANDOM()
LIMIT 10
""")
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results
def get_popular_products(self):
"""热门商品推荐"""
from django.db import connection
with connection.cursor() as cursor:
cursor.execute("""
SELECT p.id, p.name, p.price, p.rating, p.sales_count, pi.image_url
FROM products p
LEFT JOIN product_images pi ON p.id = pi.product_id AND pi.is_primary = true
WHERE p.status = 'active'
ORDER BY p.sales_count DESC
LIMIT 10
""")
columns = [col[0] for col in cursor.description]
results = [dict(zip(columns, row)) for row in cursor.fetchall()]
return results
# documents.py for elasticsearch
from django_elasticsearch_dsl import Document, Index, fields
from django_elasticsearch_dsl.registries import registry
from .models import Product, Category, Brand, ProductImage
# 商品搜索文档定义
product_index = Index('products')
product_index.settings(
number_of_shards=1,
number_of_replicas=0
)
@registry.register_document
class ProductDocument(Document):
"""商品搜索文档"""
id = fields.KeywordField()
name = fields.TextField(
analyzer='ik_max_word',
search_analyzer='ik_smart'
)
description = fields.TextField(
analyzer='ik_max_word',
search_analyzer='ik_smart'
)
price = fields.FloatField()
rating = fields.FloatField()
sales_count = fields.IntegerField()
view_count = fields.IntegerField()
category = fields.ObjectField(properties={
'id': fields.KeywordField(),
'name': fields.TextField(analyzer='ik_max_word'),
'slug': fields.KeywordField(),
})
brand = fields.ObjectField(properties={
'id': fields.KeywordField(),
'name': fields.TextField(analyzer='ik_max_word'),
'slug': fields.KeywordField(),
})
images = fields.NestedField(properties={
'image_url': fields.TextField(),
'is_primary': fields.BooleanField(),
})
class Index:
name = 'products'
class Django:
model = Product
fields = [
'sku',
'barcode',
'short_description',
'is_featured',
'is_new',
'created_at',
'updated_at',
]
related_models = [Category, Brand, ProductImage]
def get_instances_from_related(self, related_instance):
"""当相关模型更新时,更新索引"""
if isinstance(related_instance, Category):
return related_instance.products.all()
elif isinstance(related_instance, Brand):
return related_instance.products.all()
elif isinstance(related_instance, ProductImage):
return related_instance.product
return None#订单服务模块
#订单管理服务
# services/order/order_service/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, generics
from rest_framework.permissions import IsAuthenticated
from django.db import transaction
from django.core.cache import cache
from django.utils import timezone
from decimal import Decimal
from .models import Order, OrderItem, OrderPayment, OrderShipping, OrderRefund, ShoppingCart
from .serializers import (
OrderSerializer, OrderItemSerializer, OrderPaymentSerializer,
OrderShippingSerializer, OrderRefundSerializer, ShoppingCartSerializer
)
from shared.common.responses import SuccessResponse, ErrorResponse
from shared.common.pagination import CustomPageNumberPagination
import logging
logger = logging.getLogger(__name__)
class ShoppingCartView(APIView):
"""购物车视图"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""获取购物车"""
try:
cart, created = ShoppingCart.objects.get_or_create(
user_id=request.user.id,
defaults={
'items': [],
'total_items': 0,
'total_amount': Decimal('0.00')
}
)
# 更新总计
self.update_cart_totals(cart)
serializer = ShoppingCartSerializer(cart)
return SuccessResponse(serializer.data)
except Exception as e:
logger.error(f"Error getting shopping cart: {str(e)}")
return ErrorResponse({'error': '获取购物车失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def post(self, request):
"""添加商品到购物车"""
try:
cart, created = ShoppingCart.objects.get_or_create(
user_id=request.user.id,
defaults={
'items': [],
'total_items': 0,
'total_amount': Decimal('0.00')
}
)
product_id = request.data.get('product_id')
quantity = int(request.data.get('quantity', 1))
attributes = request.data.get('attributes', {})
if not product_id:
return ErrorResponse({'error': '商品ID不能为空'}, status=status.HTTP_400_BAD_REQUEST)
# 检查商品是否存在和库存
product = self.check_product_availability(product_id, quantity)
if not product:
return ErrorResponse({'error': '商品不存在或库存不足'}, status=status.HTTP_400_BAD_REQUEST)
# 添加到购物车
cart.items.append({
'product_id': str(product_id),
'quantity': quantity,
'unit_price': float(product.price),
'total_price': float(product.price * quantity),
'attributes': attributes,
'added_at': timezone.now().isoformat()
})
cart.save()
self.update_cart_totals(cart)
serializer = ShoppingCartSerializer(cart)
return SuccessResponse({
'cart': serializer.data,
'message': '商品已添加到购物车'
})
except Exception as e:
logger.error(f"Error adding to cart: {str(e)}")
return ErrorResponse({'error': '添加到购物车失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def put(self, request):
"""更新购物车商品"""
try:
cart = ShoppingCart.objects.get(user_id=request.user.id)
product_id = request.data.get('product_id')
quantity = int(request.data.get('quantity', 1))
for item in cart.items:
if item['product_id'] == str(product_id):
item['quantity'] = quantity
item['total_price'] = float(item['unit_price'] * quantity)
item['updated_at'] = timezone.now().isoformat()
break
else:
return ErrorResponse({'error': '购物车中不存在该商品'}, status=status.HTTP_404_NOT_FOUND)
cart.save()
self.update_cart_totals(cart)
serializer = ShoppingCartSerializer(cart)
return SuccessResponse({
'cart': serializer.data,
'message': '购物车商品已更新'
})
except ShoppingCart.DoesNotExist:
return ErrorResponse({'error': '购物车不存在'}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Error updating cart: {str(e)}")
return ErrorResponse({'error': '更新购物车失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def delete(self, request):
"""从购物车删除商品"""
try:
cart = ShoppingCart.objects.get(user_id=request.user.id)
product_id = request.query_params.get('product_id')
if not product_id:
return ErrorResponse({'error': '商品ID不能为空'}, status=status.HTTP_400_BAD_REQUEST)
cart.items = [item for item in cart.items if item['product_id'] != str(product_id)]
cart.save()
self.update_cart_totals(cart)
serializer = ShoppingCartSerializer(cart)
return SuccessResponse({
'cart': serializer.data,
'message': '商品已从购物车删除'
})
except ShoppingCart.DoesNotExist:
return ErrorResponse({'error': '购物车不存在'}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Error removing from cart: {str(e)}")
return ErrorResponse({'error': '删除商品失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def check_product_availability(self, product_id, quantity):
"""检查商品可用性"""
# 这里应该调用商品服务检查库存
# 简化实现
from django.apps import apps
Product = apps.get_model('product.Product')
ProductInventory = apps.get_model('product.ProductInventory')
try:
product = Product.objects.get(id=product_id, status='active')
inventory = ProductInventory.objects.get(product=product)
if inventory.available_quantity < quantity:
return None
return product
except:
return None
def update_cart_totals(self, cart):
"""更新购物车总计"""
total_items = sum(item['quantity'] for item in cart.items)
total_amount = sum(Decimal(str(item['total_price'])) for item in cart.items)
cart.total_items = total_items
cart.total_amount = total_amount
cart.save()
class OrderListView(generics.ListAPIView):
"""订单列表视图"""
serializer_class = OrderSerializer
pagination_class = CustomPageNumberPagination
permission_classes = [IsAuthenticated]
def get_queryset(self):
"""获取用户订单查询集"""
return Order.objects.filter(user_id=self.request.user.id).order_by('-created_at')
def list(self, request, *args, **kwargs):
"""订单列表"""
queryset = self.filter_queryset(self.get_queryset())
page = self.paginate_queryset(queryset)
if page is not None:
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
return Response(serializer.data)
class OrderDetailView(APIView):
"""订单详情视图"""
permission_classes = [IsAuthenticated]
def get(self, request, pk):
"""获取订单详情"""
try:
order = Order.objects.get(id=pk, user_id=request.user.id)
serializer = OrderSerializer(order)
return SuccessResponse(serializer.data)
except Order.DoesNotExist:
return ErrorResponse({'error': '订单不存在'}, status=status.HTTP_404_NOT_FOUND)
class OrderCreateView(APIView):
"""订单创建视图"""
permission_classes = [IsAuthenticated]
def post(self, request):
"""创建订单"""
try:
with transaction.atomic():
# 从购物车创建订单
cart = ShoppingCart.objects.get(user_id=request.user.id)
if not cart.items:
return ErrorResponse({'error': '购物车为空'}, status=status.HTTP_400_BAD_REQUEST)
# 验证库存
validation_result = self.validate_inventory(cart.items)
if not validation_result['valid']:
return ErrorResponse(
{'error': f'库存不足: {validation_result["message"]}'},
status=status.HTTP_400_BAD_REQUEST
)
# 创建订单
order_data = request.data.copy()
order_data['user_id'] = request.user.id
order_data['items'] = cart.items
order_data['subtotal'] = cart.total_amount
# 计算总价(包含运费、税费等)
shipping_fee = self.calculate_shipping_fee(cart.items, order_data.get('shipping_address'))
tax_amount = self.calculate_tax(cart.total_amount)
order_data['shipping_fee'] = shipping_fee
order_data['tax_amount'] = tax_amount
order_data['total_amount'] = cart.total_amount + shipping_fee + tax_amount
order_data['order_no'] = self.generate_order_no()
serializer = OrderSerializer(data=order_data)
if serializer.is_valid():
order = serializer.save()
# 清空购物车
cart.items = []
cart.total_items = 0
cart.total_amount = Decimal('0.00')
cart.save()
# 预留库存
self.reserve_inventory(cart.items)
# 发送订单创建通知
self.send_order_notification(order)
return SuccessResponse({
'order': OrderSerializer(order).data,
'message': '订单创建成功'
}, status=status.HTTP_201_CREATED)
else:
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except ShoppingCart.DoesNotExist:
return ErrorResponse({'error': '购物车不存在'}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Error creating order: {str(e)}")
return ErrorResponse({'error': '订单创建失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def validate_inventory(self, cart_items):
"""验证库存"""
# 这里应该调用商品服务验证库存
# 简化实现
for item in cart_items:
product_id = item['product_id']
quantity = item['quantity']
# 模拟库存检查
if quantity > 100: # 假设最大库存为100
return {
'valid': False,
'message': f'商品 {product_id} 库存不足'
}
return {'valid': True, 'message': ''}
def calculate_shipping_fee(self, items, shipping_address):
"""计算运费"""
# 简化运费计算
# 实际应用中应根据重量、距离、快递公司等因素计算
total_weight = sum(item.get('weight', 0.5) * item['quantity'] for item in items)
base_fee = Decimal('10.00')
weight_fee = Decimal(str(total_weight * 2))
return base_fee + weight_fee
def calculate_tax(self, amount):
"""计算税费"""
tax_rate = Decimal('0.1') # 10%税率
return amount * tax_rate
def generate_order_no(self):
"""生成订单号"""
import uuid
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
unique_id = str(uuid.uuid4()).split('-')[0][:6].upper()
return f'ORD{timestamp}{unique_id}'
def reserve_inventory(self, cart_items):
"""预留库存"""
# 这里应该调用商品服务预留库存
# 简化实现
for item in cart_items:
product_id = item['product_id']
quantity = item['quantity']
# 调用库存服务预留库存
pass
def send_order_notification(self, order):
"""发送订单通知"""
# 发送消息到通知服务
from django.apps import apps
try:
NotificationService = apps.get_app_config('notification').get_model('NotificationService')
NotificationService.send_order_notification(order)
except:
# 如果通知服务不可用,记录日志
logger.warning(f"Failed to send order notification for order {order.id}")
class OrderUpdateView(APIView):
"""订单更新视图"""
permission_classes = [IsAuthenticated]
def put(self, request, pk):
"""更新订单"""
try:
order = Order.objects.get(id=pk, user_id=request.user.id)
# 检查订单状态是否允许更新
if order.status not in ['pending', 'confirmed']:
return ErrorResponse(
{'error': '订单状态不允许更新'},
status=status.HTTP_400_BAD_REQUEST
)
serializer = OrderSerializer(order, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
return SuccessResponse({
'order': OrderSerializer(order).data,
'message': '订单更新成功'
})
else:
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Order.DoesNotExist:
return ErrorResponse({'error': '订单不存在'}, status=status.HTTP_404_NOT_FOUND)
class OrderCancelView(APIView):
"""订单取消视图"""
permission_classes = [IsAuthenticated]
def post(self, request, pk):
"""取消订单"""
try:
order = Order.objects.get(id=pk, user_id=request.user.id)
# 检查订单状态是否允许取消
if order.status not in ['pending', 'confirmed', 'preparing']:
return ErrorResponse(
{'error': '订单状态不允许取消'},
status=status.HTTP_400_BAD_REQUEST
)
# 更新订单状态
order.status = 'cancelled'
order.cancelled_at = timezone.now()
order.save()
# 释放库存
self.release_inventory(order)
# 发送取消通知
self.send_cancel_notification(order)
return SuccessResponse({
'order': OrderSerializer(order).data,
'message': '订单已取消'
})
except Order.DoesNotExist:
return ErrorResponse({'error': '订单不存在'}, status=status.HTTP_404_NOT_FOUND)
def release_inventory(self, order):
"""释放库存"""
# 调用商品服务释放库存
pass
def send_cancel_notification(self, order):
"""发送取消通知"""
pass
class OrderPaymentView(APIView):
"""订单支付视图"""
permission_classes = [IsAuthenticated]
def post(self, request, pk):
"""处理订单支付"""
try:
order = Order.objects.get(id=pk, user_id=request.user.id)
if order.status != 'pending':
return ErrorResponse(
{'error': '订单状态不允许支付'},
status=status.HTTP_400_BAD_REQUEST
)
payment_data = request.data.copy()
payment_data['order_id'] = order.id
payment_data['amount'] = order.total_amount
serializer = OrderPaymentSerializer(data=payment_data)
if serializer.is_valid():
payment = serializer.save()
# 处理支付结果
if payment.status == 'completed':
order.status = 'paid'
order.payment_status = 'completed'
order.paid_at = timezone.now()
order.save()
# 发送支付成功通知
self.send_payment_success_notification(order, payment)
return SuccessResponse({
'payment': OrderPaymentSerializer(payment).data,
'order': OrderSerializer(order).data,
'message': '支付处理成功'
})
else:
return ErrorResponse(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
except Order.DoesNotExist:
return ErrorResponse({'error': '订单不存在'}, status=status.HTTP_404_NOT_FOUND)
def send_payment_success_notification(self, order, payment):
"""发送支付成功通知"""
pass
# serializers.py for order service
from rest_framework import serializers
from .models import Order, OrderItem, OrderPayment, OrderShipping, OrderRefund, ShoppingCart
class OrderItemSerializer(serializers.ModelSerializer):
class Meta:
model = OrderItem
fields = '__all__'
class OrderPaymentSerializer(serializers.ModelSerializer):
class Meta:
model = OrderPayment
fields = '__all__'
read_only_fields = ['id', 'transaction_id', 'processed_at', 'created_at', 'updated_at']
class OrderShippingSerializer(serializers.ModelSerializer):
class Meta:
model = OrderShipping
fields = '__all__'
class OrderRefundSerializer(serializers.ModelSerializer):
class Meta:
model = OrderRefund
fields = '__all__'
read_only_fields = ['id', 'processed_at', 'created_at', 'updated_at']
class OrderSerializer(serializers.ModelSerializer):
order_items = OrderItemSerializer(many=True, read_only=True)
payments = OrderPaymentSerializer(many=True, read_only=True)
shipping_info = OrderShippingSerializer(read_only=True)
refunds = OrderRefundSerializer(many=True, read_only=True)
class Meta:
model = Order
fields = '__all__'
read_only_fields = ['id', 'order_no', 'user_id', 'created_at', 'updated_at', 'paid_at', 'shipped_at', 'delivered_at', 'cancelled_at']
class ShoppingCartSerializer(serializers.ModelSerializer):
class Meta:
model = ShoppingCart
fields = '__all__'
read_only_fields = ['id', 'user_id', 'created_at', 'updated_at']#订单状态管理
# services/order/order_service/status_manager.py
from django.utils import timezone
from .models import Order
class OrderStatusManager:
"""订单状态管理器"""
# 状态转换规则
STATUS_TRANSITIONS = {
'pending': ['paid', 'cancelled'],
'paid': ['confirmed', 'cancelled'],
'confirmed': ['preparing', 'cancelled'],
'preparing': ['shipped', 'cancelled'],
'shipped': ['delivered', 'cancelled'],
'delivered': ['refunded'],
'cancelled': [],
'refunded': [],
'disputed': ['refunded', 'cancelled'],
}
@classmethod
def can_transition(cls, from_status, to_status):
"""检查状态转换是否合法"""
return to_status in cls.STATUS_TRANSITIONS.get(from_status, [])
@classmethod
def update_status(cls, order_id, new_status, user_id=None, notes=''):
"""更新订单状态"""
try:
order = Order.objects.get(id=order_id)
if not cls.can_transition(order.status, new_status):
return {
'success': False,
'error': f'不能从 {order.status} 状态转换到 {new_status} 状态'
}
old_status = order.status
order.status = new_status
# 设置时间戳
if new_status == 'paid':
order.paid_at = timezone.now()
elif new_status == 'shipped':
order.shipped_at = timezone.now()
elif new_status == 'delivered':
order.delivered_at = timezone.now()
elif new_status == 'cancelled':
order.cancelled_at = timezone.now()
order.save()
# 记录状态变更日志
cls.log_status_change(order_id, old_status, new_status, user_id, notes)
return {
'success': True,
'order': order,
'message': f'订单状态已从 {old_status} 更新为 {new_status}'
}
except Order.DoesNotExist:
return {
'success': False,
'error': '订单不存在'
}
@classmethod
def log_status_change(cls, order_id, old_status, new_status, user_id, notes):
"""记录状态变更日志"""
# 这里应该记录到专门的状态变更日志表
# 简化实现
from .models import OrderStatusChange
OrderStatusChange.objects.create(
order_id=order_id,
old_status=old_status,
new_status=new_status,
changed_by=user_id,
notes=notes
)
class OrderStatusChange(models.Model):
"""订单状态变更记录"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
order = models.ForeignKey(Order, on_delete=models.CASCADE, related_name='status_changes')
old_status = models.CharField(max_length=20)
new_status = models.CharField(max_length=20)
changed_by = models.UUIDField(null=True, blank=True) # 操作员ID
notes = models.TextField(blank=True)
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'order_status_changes'
ordering = ['-created_at']
# 任务调度 - 自动状态更新
from celery import shared_task
from datetime import timedelta
@shared_task
def auto_update_order_status():
"""自动更新订单状态"""
now = timezone.now()
# 自动取消未支付的订单(超过30分钟)
pending_orders = Order.objects.filter(
status='pending',
created_at__lt=now - timedelta(minutes=30)
)
for order in pending_orders:
OrderStatusManager.update_status(
order.id,
'cancelled',
notes='自动取消:超时未支付'
)
# 自动确认已支付的订单
paid_orders = Order.objects.filter(
status='paid',
updated_at__lt=now - timedelta(minutes=5) # 支付后5分钟自动确认
)
for order in paid_orders:
OrderStatusManager.update_status(
order.id,
'confirmed',
notes='自动确认:支付成功'
)#支付服务模块
#支付网关集成
# services/payment/payment_service/gateways/base.py
from abc import ABC, abstractmethod
from decimal import Decimal
from typing import Dict, Any
class BasePaymentGateway(ABC):
"""支付网关基类"""
def __init__(self, config: Dict[str, Any]):
self.config = config
@abstractmethod
def create_payment(self, amount: Decimal, currency: str, order_info: Dict[str, Any]) -> Dict[str, Any]:
"""创建支付"""
pass
@abstractmethod
def verify_payment(self, transaction_id: str) -> Dict[str, Any]:
"""验证支付"""
pass
@abstractmethod
def refund_payment(self, transaction_id: str, amount: Decimal = None) -> Dict[str, Any]:
"""退款"""
pass
# services/payment/payment_service/gateways/stripe_gateway.py
import stripe
from decimal import Decimal
from .base import BasePaymentGateway
class StripeGateway(BasePaymentGateway):
"""Stripe支付网关"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
stripe.api_key = config.get('secret_key')
self.public_key = config.get('public_key')
def create_payment(self, amount: Decimal, currency: str, order_info: Dict[str, Any]) -> Dict[str, Any]:
"""创建Stripe支付"""
try:
# 将金额转换为分(Stripe以分为单位)
amount_cents = int(amount * 100)
intent = stripe.PaymentIntent.create(
amount=amount_cents,
currency=currency.lower(),
metadata={
'order_id': order_info.get('order_id'),
'user_id': order_info.get('user_id'),
'description': order_info.get('description', ''),
},
automatic_payment_methods={
'enabled': True,
},
)
return {
'success': True,
'client_secret': intent.client_secret,
'transaction_id': intent.id,
'amount': str(amount),
'currency': currency,
'status': intent.status,
}
except stripe.error.StripeError as e:
return {
'success': False,
'error': str(e),
'error_code': e.code if hasattr(e, 'code') else 'unknown_error',
}
def verify_payment(self, transaction_id: str) -> Dict[str, Any]:
"""验证Stripe支付"""
try:
intent = stripe.PaymentIntent.retrieve(transaction_id)
return {
'success': True,
'transaction_id': intent.id,
'amount': str(Decimal(intent.amount) / 100), # 转换回元
'currency': intent.currency,
'status': intent.status,
'payment_method': intent.payment_method_types[0] if intent.payment_method_types else None,
'created': intent.created,
}
except stripe.error.StripeError as e:
return {
'success': False,
'error': str(e),
'error_code': e.code if hasattr(e, 'code') else 'unknown_error',
}
def refund_payment(self, transaction_id: str, amount: Decimal = None) -> Dict[str, Any]:
"""Stripe退款"""
try:
refund_kwargs = {'payment_intent': transaction_id}
if amount:
refund_kwargs['amount'] = int(amount * 100) # 转换为分
refund = stripe.Refund.create(**refund_kwargs)
return {
'success': True,
'refund_id': refund.id,
'transaction_id': transaction_id,
'amount': str(Decimal(refund.amount) / 100),
'currency': refund.currency,
'status': refund.status,
'created': refund.created,
}
except stripe.error.StripeError as e:
return {
'success': False,
'error': str(e),
'error_code': e.code if hasattr(e, 'code') else 'unknown_error',
}
# services/payment/payment_service/gateways/alipay_gateway.py
from alipay import AliPay
from decimal import Decimal
from .base import BasePaymentGateway
import json
class AlipayGateway(BasePaymentGateway):
"""支付宝支付网关"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.alipay = AliPay(
appid=config.get('app_id'),
app_notify_url=config.get('notify_url'), # 默认回调url
app_private_key_string=config.get('app_private_key'),
alipay_public_key_string=config.get('alipay_public_key'),
debug=config.get('debug', False), # 默认False
)
def create_payment(self, amount: Decimal, currency: str, order_info: Dict[str, Any]) -> Dict[str, Any]:
"""创建支付宝支付"""
try:
# 支付宝只支持人民币
if currency.upper() != 'CNY':
return {
'success': False,
'error': '支付宝只支持人民币支付',
}
out_trade_no = order_info.get('order_id')
subject = order_info.get('subject', f'订单支付-{out_trade_no}')
body = order_info.get('description', '')
# 创建电脑网站支付请求
order_string = self.alipay.api_alipay_trade_page_pay(
out_trade_no=out_trade_no,
total_amount=str(amount),
subject=subject,
body=body,
return_url=order_info.get('return_url'),
notify_url=order_info.get('notify_url'),
)
pay_url = f"https://openapi.alipay.com/gateway.do?{order_string}"
return {
'success': True,
'pay_url': pay_url,
'transaction_id': out_trade_no,
'amount': str(amount),
'currency': currency,
'status': 'created',
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
def verify_payment(self, transaction_id: str) -> Dict[str, Any]:
"""验证支付宝支付"""
try:
# 查询交易状态
response = self.alipay.api_alipay_trade_query(out_trade_no=transaction_id)
if response.get('code') == '10000': # 成功
trade_status = response.get('trade_status')
return {
'success': True,
'transaction_id': transaction_id,
'amount': response.get('total_amount'),
'currency': 'CNY',
'status': self._map_alipay_status(trade_status),
'trade_no': response.get('trade_no'), # 支付宝交易号
'gmt_payment': response.get('gmt_payment'),
}
else:
return {
'success': False,
'error': response.get('sub_msg', '验证失败'),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
def refund_payment(self, transaction_id: str, amount: Decimal = None) -> Dict[str, Any]:
"""支付宝退款"""
try:
refund_amount = str(amount) if amount else None
response = self.alipay.api_alipay_trade_refund(
out_trade_no=transaction_id,
refund_amount=refund_amount,
)
if response.get('code') == '10000': # 成功
return {
'success': True,
'refund_id': response.get('trade_no'),
'transaction_id': transaction_id,
'amount': response.get('refund_fee'),
'currency': 'CNY',
'status': 'success',
'gmt_refund': response.get('gmt_refund_pay'),
}
else:
return {
'success': False,
'error': response.get('sub_msg', '退款失败'),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
def _map_alipay_status(self, alipay_status: str) -> str:
"""映射支付宝状态到系统状态"""
status_mapping = {
'WAIT_BUYER_PAY': 'pending',
'TRADE_CLOSED': 'failed',
'TRADE_SUCCESS': 'completed',
'TRADE_FINISHED': 'completed',
}
return status_mapping.get(alipay_status, 'unknown')
# services/payment/payment_service/gateways/wechat_gateway.py
import requests
import json
from decimal import Decimal
from .base import BasePaymentGateway
from Crypto.Cipher import AES
import hashlib
import time
class WechatPayGateway(BasePaymentGateway):
"""微信支付网关"""
def __init__(self, config: Dict[str, Any]):
super().__init__(config)
self.app_id = config.get('app_id')
self.mch_id = config.get('mch_id')
self.api_key = config.get('api_key')
self.cert_path = config.get('cert_path')
self.key_path = config.get('key_path')
def create_payment(self, amount: Decimal, currency: str, order_info: Dict[str, Any]) -> Dict[str, Any]:
"""创建微信支付"""
try:
# 微信支付只支持人民币
if currency.upper() != 'CNY':
return {
'success': False,
'error': '微信支付只支持人民币支付',
}
# 金额转换为分
total_fee = int(amount * 100)
# 统一下单参数
params = {
'appid': self.app_id,
'mch_id': self.mch_id,
'nonce_str': self._generate_nonce_str(),
'body': order_info.get('subject', f'订单支付-{order_info.get("order_id")}'),
'out_trade_no': order_info.get('order_id'),
'total_fee': total_fee,
'spbill_create_ip': order_info.get('client_ip', '127.0.0.1'),
'notify_url': order_info.get('notify_url'),
'trade_type': 'JSAPI', # 小程序支付
'openid': order_info.get('openid'), # 用户openid
}
# 签名
params['sign'] = self._generate_sign(params)
# 发送请求
xml_data = self._dict_to_xml(params)
response = requests.post(
'https://api.mch.weixin.qq.com/pay/unifiedorder',
data=xml_data.encode('utf-8'),
headers={'Content-Type': 'application/xml'}
)
result = self._xml_to_dict(response.content.decode('utf-8'))
if result.get('return_code') == 'SUCCESS' and result.get('result_code') == 'SUCCESS':
# 生成小程序支付参数
pay_params = {
'appId': self.app_id,
'timeStamp': str(int(time.time())),
'nonceStr': self._generate_nonce_str(),
'package': f'prepay_id={result["prepay_id"]}',
'signType': 'MD5',
}
pay_params['paySign'] = self._generate_sign(pay_params)
return {
'success': True,
'pay_params': pay_params,
'transaction_id': result['prepay_id'],
'amount': str(amount),
'currency': currency,
'status': 'created',
}
else:
return {
'success': False,
'error': result.get('return_msg', '创建支付失败'),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
def verify_payment(self, transaction_id: str) -> Dict[str, Any]:
"""验证微信支付"""
# 微信支付的验证通常通过异步通知完成
# 这里简化实现,实际应该查询微信订单状态
return {
'success': True,
'transaction_id': transaction_id,
'status': 'checking', # 需要通过异步通知确认
}
def refund_payment(self, transaction_id: str, amount: Decimal = None) -> Dict[str, Any]:
"""微信支付退款"""
try:
if not amount:
return {
'success': False,
'error': '退款金额不能为空',
}
# 退款金额转换为分
refund_fee = int(amount * 100)
# 退款参数
params = {
'appid': self.app_id,
'mch_id': self.mch_id,
'nonce_str': self._generate_nonce_str(),
'out_trade_no': transaction_id, # 原订单号
'out_refund_no': f'refund_{transaction_id}', # 退款单号
'total_fee': refund_fee, # 原订单金额
'refund_fee': refund_fee, # 退款金额
'op_user_id': self.mch_id, # 操作员账号
}
# 签名
params['sign'] = self._generate_sign(params)
# 发送退款请求
xml_data = self._dict_to_xml(params)
response = requests.post(
'https://api.mch.weixin.qq.com/secapi/pay/refund',
data=xml_data.encode('utf-8'),
headers={'Content-Type': 'application/xml'},
cert=(self.cert_path, self.key_path) # 需要证书
)
result = self._xml_to_dict(response.content.decode('utf-8'))
if result.get('return_code') == 'SUCCESS' and result.get('result_code') == 'SUCCESS':
return {
'success': True,
'refund_id': result['refund_id'],
'transaction_id': transaction_id,
'amount': str(amount),
'currency': 'CNY',
'status': 'processing', # 退款处理中
}
else:
return {
'success': False,
'error': result.get('return_msg', '退款失败'),
}
except Exception as e:
return {
'success': False,
'error': str(e),
}
def _generate_nonce_str(self) -> str:
"""生成随机字符串"""
import random
import string
return ''.join(random.choices(string.ascii_letters + string.digits, k=32))
def _generate_sign(self, params: Dict[str, Any]) -> str:
"""生成签名"""
# 按参数名ASCII码从小到大排序
sorted_params = sorted(params.items())
# 拼接参数字符串
query_string = '&'.join([f'{k}={v}' for k, v in sorted_params if v != ''])
query_string += f'&key={self.api_key}'
# MD5加密
md5_hash = hashlib.md5(query_string.encode('utf-8'))
return md5_hash.hexdigest().upper()
def _dict_to_xml(self, params: Dict[str, Any]) -> str:
"""字典转XML"""
xml = '<xml>'
for key, value in params.items():
xml += f'<{key}><![CDATA[{value}]]></{key}>'
xml += '</xml>'
return xml
def _xml_to_dict(self, xml_str: str) -> Dict[str, Any]:
"""XML转字典"""
# 简化实现,实际应该使用xml解析库
import xml.etree.ElementTree as ET
root = ET.fromstring(xml_str)
return {child.tag: child.text for child in root}
# 支付服务管理器
class PaymentServiceManager:
"""支付服务管理器"""
def __init__(self):
self.gateways = {}
def register_gateway(self, name: str, gateway_class, config: Dict[str, Any]):
"""注册支付网关"""
self.gateways[name] = gateway_class(config)
def get_gateway(self, name: str):
"""获取支付网关"""
return self.gateways.get(name)
def create_payment(self, gateway_name: str, amount: Decimal, currency: str, order_info: Dict[str, Any]):
"""创建支付"""
gateway = self.get_gateway(gateway_name)
if not gateway:
return {'success': False, 'error': f'支付网关 {gateway_name} 不存在'}
return gateway.create_payment(amount, currency, order_info)
def verify_payment(self, gateway_name: str, transaction_id: str):
"""验证支付"""
gateway = self.get_gateway(gateway_name)
if not gateway:
return {'success': False, 'error': f'支付网关 {gateway_name} 不存在'}
return gateway.verify_payment(transaction_id)
def refund_payment(self, gateway_name: str, transaction_id: str, amount: Decimal = None):
"""退款"""
gateway = self.get_gateway(gateway_name)
if not gateway:
return {'success': False, 'error': f'支付网关 {gateway_name} 不存在'}
return gateway.refund_payment(transaction_id, amount)
# 初始化支付服务管理器
payment_manager = PaymentServiceManager()
# 注册支付网关
"""
# 在settings或初始化代码中注册
payment_manager.register_gateway('stripe', StripeGateway, {
'public_key': 'pk_test_...',
'secret_key': 'sk_test_...',
})
payment_manager.register_gateway('alipay', AlipayGateway, {
'app_id': '2021000123456789',
'app_private_key': 'your_private_key',
'alipay_public_key': 'alipay_public_key',
'debug': True,
})
payment_manager.register_gateway('wechat', WechatPayGateway, {
'app_id': 'wxd678efh567hg6787',
'mch_id': '1234567890',
'api_key': 'your_api_key',
'cert_path': '/path/to/cert.pem',
'key_path': '/path/to/key.pem',
})
"""#支付回调处理
# services/payment/payment_service/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.http import HttpResponse
from .models import OrderPayment
from .gateways.base import BasePaymentGateway
from shared.common.responses import SuccessResponse, ErrorResponse
import json
import logging
logger = logging.getLogger(__name__)
@method_decorator(csrf_exempt, name='dispatch')
class PaymentCallbackView(APIView):
"""支付回调视图"""
def post(self, request, gateway_name):
"""处理支付回调"""
try:
if gateway_name == 'alipay':
return self.handle_alipay_callback(request)
elif gateway_name == 'wechat':
return self.handle_wechat_callback(request)
else:
return ErrorResponse({'error': '不支持的支付网关'}, status=status.HTTP_400_BAD_REQUEST)
except Exception as e:
logger.error(f"Payment callback error: {str(e)}")
return HttpResponse('ERROR', status=200)
def handle_alipay_callback(self, request):
"""处理支付宝回调"""
from alipay import AliPay
# 初始化支付宝
alipay = AliPay(
appid='your_app_id',
app_notify_url=None, # 回调URL在创建订单时指定
app_private_key_string='your_private_key',
alipay_public_key_string='alipay_public_key',
debug=True, # 开发环境
)
# 获取POST数据
data = request.POST.dict()
# 验证签名
success = alipay.verify(data, data.get('sign'))
if success and data.get('trade_status') in ('TRADE_SUCCESS', 'TRADE_FINISHED'):
# 处理支付成功
out_trade_no = data.get('out_trade_no')
trade_no = data.get('trade_no')
total_amount = data.get('total_amount')
# 更新支付记录
try:
payment = OrderPayment.objects.get(transaction_id=out_trade_no)
payment.status = 'completed'
payment.gateway_response = data
payment.processed_at = timezone.now()
payment.save()
# 更新订单状态
order = payment.order
order.status = 'paid'
order.payment_status = 'completed'
order.paid_at = timezone.now()
order.save()
logger.info(f"Alipay payment completed: {out_trade_no}")
except OrderPayment.DoesNotExist:
logger.error(f"Payment record not found: {out_trade_no}")
return HttpResponse('success')
def handle_wechat_callback(self, request):
"""处理微信回调"""
import xml.etree.ElementTree as ET
# 解析XML数据
root = ET.fromstring(request.body.decode('utf-8'))
data = {child.tag: child.text for child in root}
# 验证签名(简化验证)
if data.get('return_code') == 'SUCCESS' and data.get('result_code') == 'SUCCESS':
# 处理支付成功
out_trade_no = data.get('out_trade_no')
transaction_id = data.get('transaction_id')
total_fee = data.get('total_fee')
# 更新支付记录
try:
payment = OrderPayment.objects.get(transaction_id=out_trade_no)
payment.status = 'completed'
payment.gateway_response = data
payment.processed_at = timezone.now()
payment.save()
# 更新订单状态
order = payment.order
order.status = 'paid'
order.payment_status = 'completed'
order.paid_at = timezone.now()
order.save()
logger.info(f"WeChat payment completed: {out_trade_no}")
except OrderPayment.DoesNotExist:
logger.error(f"Payment record not found: {out_trade_no}")
return HttpResponse('<xml><return_code><![CDATA[SUCCESS]]></return_code><return_msg><![CDATA[OK]]></return_msg></xml>')
class PaymentProcessView(APIView):
"""支付处理视图"""
permission_classes = [IsAuthenticated]
def post(self, request):
"""处理支付"""
order_id = request.data.get('order_id')
payment_method = request.data.get('payment_method')
gateway = request.data.get('gateway')
if not all([order_id, payment_method, gateway]):
return ErrorResponse({'error': '缺少必要参数'}, status=status.HTTP_400_BAD_REQUEST)
try:
from ..order.models import Order
order = Order.objects.get(id=order_id, user_id=request.user.id)
if order.status != 'pending':
return ErrorResponse({'error': '订单状态不允许支付'}, status=status.HTTP_400_BAD_REQUEST)
# 创建支付记录
payment_data = {
'order': order,
'payment_method': payment_method,
'payment_gateway': gateway,
'amount': order.total_amount,
'currency': 'CNY', # 默认人民币
}
payment = OrderPayment.objects.create(**payment_data)
# 通过支付网关处理支付
order_info = {
'order_id': str(order.id),
'user_id': str(request.user.id),
'subject': f'订单支付-{order.order_no}',
'description': f'支付订单 {order.order_no}',
'return_url': request.data.get('return_url'),
'notify_url': request.build_absolute_uri(f'/api/payment/callback/{gateway}/'),
'client_ip': self.get_client_ip(request),
}
result = payment_manager.create_payment(
gateway,
order.total_amount,
'CNY',
order_info
)
if result['success']:
# 更新支付记录
payment.transaction_id = result['transaction_id']
payment.status = result.get('status', 'pending')
payment.gateway_response = result
payment.save()
return SuccessResponse({
'payment': {
'id': str(payment.id),
'transaction_id': payment.transaction_id,
'amount': str(payment.amount),
'status': payment.status,
},
'payment_details': result,
'message': '支付创建成功'
})
else:
# 支付创建失败,更新状态
payment.status = 'failed'
payment.gateway_response = result
payment.save()
return ErrorResponse({'error': result['error']}, status=status.HTTP_400_BAD_REQUEST)
except Order.DoesNotExist:
return ErrorResponse({'error': '订单不存在'}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Payment process error: {str(e)}")
return ErrorResponse({'error': '支付处理失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def get_client_ip(self, request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class PaymentRefundView(APIView):
"""支付退款视图"""
permission_classes = [IsAuthenticated]
def post(self, request, payment_id):
"""处理退款"""
try:
payment = OrderPayment.objects.get(id=payment_id)
# 检查权限和状态
if payment.order.user_id != request.user.id:
return ErrorResponse({'error': '无权操作'}, status=status.HTTP_403_FORBIDDEN)
if payment.status != 'completed':
return ErrorResponse({'error': '支付状态不允许退款'}, status=status.HTTP_400_BAD_REQUEST)
# 获取退款金额
refund_amount = request.data.get('amount')
if refund_amount:
refund_amount = Decimal(str(refund_amount))
else:
refund_amount = payment.amount # 全额退款
# 执行退款
result = payment_manager.refund_payment(
payment.payment_gateway,
payment.transaction_id,
refund_amount
)
if result['success']:
# 创建退款记录
from ..order.models import OrderRefund
refund = OrderRefund.objects.create(
order=payment.order,
reason=request.data.get('reason', 'other'),
reason_detail=request.data.get('reason_detail', ''),
requested_by=request.user.id,
amount=refund_amount,
refund_method=payment.payment_method,
status='processing'
)
# 更新支付状态
payment.status = 'refunded' if refund_amount == payment.amount else 'partially_refunded'
payment.save()
return SuccessResponse({
'refund': {
'id': str(refund.id),
'amount': str(refund.amount),
'status': refund.status,
},
'message': '退款申请已提交'
})
else:
return ErrorResponse({'error': result['error']}, status=status.HTTP_400_BAD_REQUEST)
except OrderPayment.DoesNotExist:
return ErrorResponse({'error': '支付记录不存在'}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Payment refund error: {str(e)}")
return ErrorResponse({'error': '退款处理失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)#通知服务模块
#通知系统设计
# services/notification/notification_service/models.py
from django.db import models
from django.utils import timezone
import uuid
class NotificationTemplate(models.Model):
"""通知模板"""
CHANNEL_CHOICES = [
('email', 'Email'),
('sms', 'SMS'),
('push', 'Push Notification'),
('in_app', 'In-App Notification'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True) # 模板名称
code = models.CharField(max_length=50, unique=True) # 模板代码
channel = models.CharField(max_length=20, choices=CHANNEL_CHOICES) # 通知渠道
subject_template = models.TextField() # 标题模板
content_template = models.TextField() # 内容模板
variables = models.JSONField(default=list) # 模板变量
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'notification_templates'
class Notification(models.Model):
"""通知"""
STATUS_CHOICES = [
('pending', 'Pending'),
('sending', 'Sending'),
('sent', 'Sent'),
('failed', 'Failed'),
('delivered', 'Delivered'),
('read', 'Read'),
]
PRIORITY_CHOICES = [
('low', 'Low'),
('normal', 'Normal'),
('high', 'High'),
('urgent', 'Urgent'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
template = models.ForeignKey(NotificationTemplate, on_delete=models.CASCADE)
recipient_id = models.UUIDField(db_index=True) # 接收者ID
recipient_email = models.EmailField(blank=True) # 接收者邮箱
recipient_phone = models.CharField(max_length=20, blank=True) # 接收者手机号
recipient_device_token = models.TextField(blank=True) # 设备令牌
subject = models.CharField(max_length=200) # 通知标题
content = models.TextField() # 通知内容
channel = models.CharField(max_length=20, choices=NotificationTemplate.CHANNEL_CHOICES) # 通知渠道
priority = models.CharField(max_length=20, choices=PRIORITY_CHOICES, default='normal')
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default='pending')
send_time = models.DateTimeField(null=True, blank=True) # 发送时间
delivery_time = models.DateTimeField(null=True, blank=True) # 送达时间
read_time = models.DateTimeField(null=True, blank=True) # 阅读时间
retry_count = models.PositiveIntegerField(default=0) # 重试次数
max_retries = models.PositiveIntegerField(default=3) # 最大重试次数
error_message = models.TextField(blank=True) # 错误信息
extra_data = models.JSONField(default=dict, blank=True) # 额外数据
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'notifications'
indexes = [
models.Index(fields=['recipient_id']),
models.Index(fields=['status']),
models.Index(fields=['channel']),
models.Index(fields=['created_at']),
models.Index(fields=['recipient_id', 'status']),
]
class NotificationPreference(models.Model):
"""通知偏好设置"""
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
user_id = models.UUIDField(unique=True) # 用户ID
email_notifications = models.BooleanField(default=True) # 邮件通知
sms_notifications = models.BooleanField(default=True) # 短信通知
push_notifications = models.BooleanField(default=True) # 推送通知
in_app_notifications = models.BooleanField(default=True) # 应用内通知
marketing_emails = models.BooleanField(default=True) # 营销邮件
promotional_sms = models.BooleanField(default=True) # 推广短信
notification_schedule = models.JSONField(default=dict, blank=True) # 通知时间安排
blocked_channels = models.JSONField(default=list, blank=True) # 阻止的渠道
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'notification_preferences'
class NotificationChannel(models.Model):
"""通知渠道配置"""
CHANNEL_CHOICES = [
('email_smtp', 'SMTP Email'),
('email_sendgrid', 'SendGrid'),
('email_mailgun', 'Mailgun'),
('sms_twilio', 'Twilio SMS'),
('sms_aliyun', '阿里云短信'),
('push_firebase', 'Firebase Push'),
('push_apns', 'APNs Push'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100, unique=True)
channel_type = models.CharField(max_length=20, choices=CHANNEL_CHOICES)
is_active = models.BooleanField(default=True)
config = models.JSONField() # 渠道配置
rate_limit = models.PositiveIntegerField(default=100) # 速率限制
retry_on_failure = models.BooleanField(default=True) # 失败时重试
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'notification_channels'
# services/notification/notification_service/services.py
from django.core.mail import send_mail
from django.conf import settings
from django.template import Template, Context
from django.utils import timezone
from twilio.rest import Client
import firebase_admin
from firebase_admin import messaging
import requests
import logging
logger = logging.getLogger(__name__)
class NotificationService:
"""通知服务"""
def __init__(self):
self.channels = {}
self.load_channels()
def load_channels(self):
"""加载通知渠道"""
from .models import NotificationChannel
channels = NotificationChannel.objects.filter(is_active=True)
for channel in channels:
self.channels[channel.channel_type] = channel.config
def send_notification(self, notification):
"""发送通知"""
try:
if notification.channel == 'email':
return self.send_email(notification)
elif notification.channel == 'sms':
return self.send_sms(notification)
elif notification.channel == 'push':
return self.send_push(notification)
elif notification.channel == 'in_app':
return self.send_in_app(notification)
else:
raise ValueError(f"Unsupported notification channel: {notification.channel}")
except Exception as e:
logger.error(f"Failed to send notification {notification.id}: {str(e)}")
notification.status = 'failed'
notification.error_message = str(e)
notification.save()
return False
def send_email(self, notification):
"""发送邮件"""
try:
send_mail(
subject=notification.subject,
message=notification.content,
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[notification.recipient_email],
fail_silently=False,
)
notification.status = 'sent'
notification.send_time = timezone.now()
notification.save()
return True
except Exception as e:
notification.status = 'failed'
notification.error_message = str(e)
notification.save()
return False
def send_sms(self, notification):
"""发送短信"""
try:
# 使用Twilio发送短信
account_sid = self.channels.get('sms_twilio', {}).get('account_sid')
auth_token = self.channels.get('sms_twilio', {}).get('auth_token')
if account_sid and auth_token:
client = Client(account_sid, auth_token)
message = client.messages.create(
body=notification.content,
from_=self.channels.get('sms_twilio', {}).get('from_number'),
to=notification.recipient_phone
)
notification.status = 'sent'
notification.send_time = timezone.now()
notification.extra_data['message_sid'] = message.sid
notification.save()
return True
else:
raise Exception("Twilio credentials not configured")
except Exception as e:
notification.status = 'failed'
notification.error_message = str(e)
notification.save()
return False
def send_push(self, notification):
"""发送推送通知"""
try:
# 使用Firebase发送推送
if not firebase_admin._apps:
cred = firebase_admin.credentials.Certificate(
self.channels.get('push_firebase', {}).get('credentials_path')
)
firebase_admin.initialize_app(cred)
message = messaging.Message(
notification=messaging.Notification(
title=notification.subject,
body=notification.content,
),
token=notification.recipient_device_token,
)
response = messaging.send(message)
notification.status = 'sent'
notification.send_time = timezone.now()
notification.extra_data['message_id'] = response
notification.save()
return True
except Exception as e:
notification.status = 'failed'
notification.error_message = str(e)
notification.save()
return False
def send_in_app(self, notification):
"""发送应用内通知"""
# 应用内通知通常存储在数据库中,由前端轮询获取
notification.status = 'sent'
notification.send_time = timezone.now()
notification.save()
return True
def process_pending_notifications(self):
"""处理待发送的通知"""
from .models import Notification
pending_notifications = Notification.objects.filter(
status='pending'
).select_related('template').order_by('priority', 'created_at')
for notification in pending_notifications:
if self.check_recipient_preferences(notification):
self.send_notification(notification)
def check_recipient_preferences(self, notification):
"""检查接收者偏好设置"""
from .models import NotificationPreference
try:
preference = NotificationPreference.objects.get(user_id=notification.recipient_id)
# 检查是否阻止了该渠道
if notification.channel in preference.blocked_channels:
return False
# 检查是否启用了该类型的通知
channel_setting = f"{notification.channel}_notifications"
if hasattr(preference, channel_setting):
if not getattr(preference, channel_setting):
return False
return True
except NotificationPreference.DoesNotExist:
# 如果没有偏好设置,默认允许所有通知
return True
# services/notification/notification_service/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from rest_framework.permissions import IsAuthenticated
from django.utils import timezone
from .models import Notification, NotificationPreference
from .services import NotificationService
from shared.common.responses import SuccessResponse, ErrorResponse
import logging
logger = logging.getLogger(__name__)
class NotificationTemplateView(APIView):
"""通知模板视图"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""获取通知模板列表"""
from .models import NotificationTemplate
templates = NotificationTemplate.objects.filter(is_active=True)
data = []
for template in templates:
data.append({
'id': str(template.id),
'name': template.name,
'code': template.code,
'channel': template.channel,
'subject_template': template.subject_template,
'content_template': template.content_template,
'variables': template.variables,
})
return SuccessResponse({'templates': data})
class NotificationSendView(APIView):
"""发送通知视图"""
permission_classes = [IsAuthenticated]
def post(self, request):
"""发送通知"""
try:
template_code = request.data.get('template_code')
recipient_id = request.data.get('recipient_id')
context = request.data.get('context', {})
if not all([template_code, recipient_id]):
return ErrorResponse({'error': '缺少必要参数'}, status=status.HTTP_400_BAD_REQUEST)
from .models import NotificationTemplate
try:
template = NotificationTemplate.objects.get(code=template_code, is_active=True)
except NotificationTemplate.DoesNotExist:
return ErrorResponse({'error': '模板不存在或已停用'}, status=status.HTTP_404_NOT_FOUND)
# 获取用户信息
from django.apps import apps
User = apps.get_model('user.User')
try:
user = User.objects.get(id=recipient_id)
except User.DoesNotExist:
return ErrorResponse({'error': '接收者不存在'}, status=status.HTTP_404_NOT_FOUND)
# 构建通知内容
subject = self.render_template(template.subject_template, context)
content = self.render_template(template.content_template, context)
# 创建通知记录
notification_data = {
'template': template,
'recipient_id': recipient_id,
'recipient_email': user.email,
'recipient_phone': getattr(user, 'phone', ''),
'subject': subject,
'content': content,
'channel': template.channel,
'priority': request.data.get('priority', 'normal'),
}
notification = Notification.objects.create(**notification_data)
# 发送通知
service = NotificationService()
success = service.send_notification(notification)
if success:
return SuccessResponse({
'notification_id': str(notification.id),
'status': notification.status,
'message': '通知发送成功'
})
else:
return ErrorResponse({
'error': f'通知发送失败: {notification.error_message}'
}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
except Exception as e:
logger.error(f"Send notification error: {str(e)}")
return ErrorResponse({'error': '发送通知失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def render_template(self, template_str, context):
"""渲染模板"""
from django.template import Template, Context
template = Template(template_str)
ctx = Context(context)
return template.render(ctx)
class UserNotificationView(APIView):
"""用户通知视图"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""获取用户通知列表"""
notifications = Notification.objects.filter(
recipient_id=request.user.id
).order_by('-created_at')
# 分页
page = int(request.query_params.get('page', 1))
page_size = int(request.query_params.get('page_size', 20))
start = (page - 1) * page_size
end = start + page_size
paginated_notifications = notifications[start:end]
data = []
for notification in paginated_notifications:
data.append({
'id': str(notification.id),
'subject': notification.subject,
'content': notification.content,
'channel': notification.channel,
'status': notification.status,
'priority': notification.priority,
'created_at': notification.created_at,
'read_at': notification.read_time,
})
return SuccessResponse({
'notifications': data,
'total': notifications.count(),
'page': page,
'page_size': page_size,
'total_pages': (notifications.count() + page_size - 1) // page_size,
})
def put(self, request, notification_id):
"""标记通知为已读"""
try:
notification = Notification.objects.get(
id=notification_id,
recipient_id=request.user.id
)
if not notification.read_time:
notification.read_time = timezone.now()
notification.status = 'read'
notification.save()
return SuccessResponse({'message': '通知已标记为已读'})
except Notification.DoesNotExist:
return ErrorResponse({'error': '通知不存在'}, status=status.HTTP_404_NOT_FOUND)
class NotificationPreferenceView(APIView):
"""通知偏好设置视图"""
permission_classes = [IsAuthenticated]
def get(self, request):
"""获取通知偏好设置"""
try:
preference, created = NotificationPreference.objects.get_or_create(
user_id=request.user.id,
defaults={
'email_notifications': True,
'sms_notifications': True,
'push_notifications': True,
'in_app_notifications': True,
'marketing_emails': True,
'promotional_sms': True,
}
)
return SuccessResponse({
'email_notifications': preference.email_notifications,
'sms_notifications': preference.sms_notifications,
'push_notifications': preference.push_notifications,
'in_app_notifications': preference.in_app_notifications,
'marketing_emails': preference.marketing_emails,
'promotional_sms': preference.promotional_sms,
'notification_schedule': preference.notification_schedule,
'blocked_channels': preference.blocked_channels,
})
except Exception as e:
logger.error(f"Get notification preference error: {str(e)}")
return ErrorResponse({'error': '获取偏好设置失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def put(self, request):
"""更新通知偏好设置"""
try:
preference, created = NotificationPreference.objects.get_or_create(
user_id=request.user.id
)
# 更新偏好设置
for field in [
'email_notifications', 'sms_notifications', 'push_notifications',
'in_app_notifications', 'marketing_emails', 'promotional_sms'
]:
if field in request.data:
setattr(preference, field, request.data[field])
if 'notification_schedule' in request.data:
preference.notification_schedule = request.data['notification_schedule']
if 'blocked_channels' in request.data:
preference.blocked_channels = request.data['blocked_channels']
preference.save()
return SuccessResponse({
'message': '偏好设置已更新',
'preferences': {
'email_notifications': preference.email_notifications,
'sms_notifications': preference.sms_notifications,
'push_notifications': preference.push_notifications,
'in_app_notifications': preference.in_app_notifications,
'marketing_emails': preference.marketing_emails,
'promotional_sms': preference.promotional_sms,
'notification_schedule': preference.notification_schedule,
'blocked_channels': preference.blocked_channels,
}
})
except Exception as e:
logger.error(f"Update notification preference error: {str(e)}")
return ErrorResponse({'error': '更新偏好设置失败'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
# 通知服务的Celery任务
from celery import shared_task
@shared_task
def send_notification_async(notification_id):
"""异步发送通知"""
try:
notification = Notification.objects.get(id=notification_id)
service = NotificationService()
return service.send_notification(notification)
except Notification.DoesNotExist:
logger.error(f"Notification {notification_id} not found")
return False
@shared_task
def process_pending_notifications():
"""处理待发送的通知"""
service = NotificationService()
service.process_pending_notifications()
# serializers.py for notification service
from rest_framework import serializers
from .models import Notification, NotificationTemplate, NotificationPreference
class NotificationSerializer(serializers.ModelSerializer):
class Meta:
model = Notification
fields = '__all__'
read_only_fields = ['id', 'status', 'send_time', 'delivery_time', 'read_time', 'created_at', 'updated_at']
class NotificationTemplateSerializer(serializers.ModelSerializer):
class Meta:
model = NotificationTemplate
fields = '__all__'
read_only_fields = ['id', 'created_at', 'updated_at']
class NotificationPreferenceSerializer(serializers.ModelSerializer):
class Meta:
model = NotificationPreference
fields = '__all__'
read_only_fields = ['id', 'user_id', 'created_at', 'updated_at']#API网关实现
#网关配置与路由
# services/gateway/gateway/settings.py
"""
# API网关配置示例
import os
from corsheaders.defaults import default_headers
# 基础配置
DEBUG = os.environ.get('DEBUG', 'False').lower() == 'true'
SECRET_KEY = os.environ.get('SECRET_KEY', 'your-secret-key')
# 允许的主机
ALLOWED_HOSTS = ['gateway.yourdomain.com', 'localhost', '127.0.0.1']
# 已安装的应用
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'corsheaders',
'drf_yasg', # API文档
'gateway_app', # 网关应用
]
# 中间件配置
MIDDLEWARE = [
'corsheaders.middleware.CorsMiddleware',
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
# 自定义中间件
'gateway_app.middleware.AuthenticationMiddleware',
'gateway_app.middleware.RateLimitMiddleware',
'gateway_app.middleware.LoggingMiddleware',
'gateway_app.middleware.CircuitBreakerMiddleware',
'gateway_app.middleware.RequestValidationMiddleware',
'gateway_app.middleware.ResponseTransformationMiddleware',
]
# REST Framework配置
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': [
'rest_framework_simplejwt.authentication.JWTAuthentication',
'rest_framework.authentication.SessionAuthentication',
],
'DEFAULT_PERMISSION_CLASSES': [
'rest_framework.permissions.IsAuthenticated',
],
'DEFAULT_THROTTLE_CLASSES': [
'rest_framework.throttling.AnonRateThrottle',
'rest_framework.throttling.UserRateThrottle'
],
'DEFAULT_THROTTLE_RATES': {
'anon': '100/hour',
'user': '1000/hour'
},
'DEFAULT_SCHEMA_CLASS': 'rest_framework.schemas.coreapi.AutoSchema',
}
# 服务注册配置
SERVICE_REGISTRY = {
'consul': {
'host': os.environ.get('CONSUL_HOST', 'localhost'),
'port': int(os.environ.get('CONSUL_PORT', 8500)),
}
}
# 缓存配置
CACHES = {
'default': {
'BACKEND': 'django_redis.cache.RedisCache',
'LOCATION': os.environ.get('REDIS_URL', 'redis://localhost:6379/1'),
'OPTIONS': {
'CLIENT_CLASS': 'django_redis.client.DefaultClient',
}
}
}
# 数据库配置
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.environ.get('DB_NAME', 'gateway_db'),
'USER': os.environ.get('DB_USER', 'gateway_user'),
'PASSWORD': os.environ.get('DB_PASSWORD', ''),
'HOST': os.environ.get('DB_HOST', 'localhost'),
'PORT': os.environ.get('DB_PORT', '5432'),
}
}
# 日志配置
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'verbose': {
'format': '{levelname} {asctime} {module} {process:d} {thread:d} {message}',
'style': '{',
},
'simple': {
'format': '{levelname} {message}',
'style': '{',
},
},
'handlers': {
'console': {
'class': 'logging.StreamHandler',
'formatter': 'verbose',
},
'file': {
'class': 'logging.FileHandler',
'filename': '/var/log/gateway.log',
'formatter': 'verbose',
},
},
'root': {
'handlers': ['console', 'file'],
'level': 'INFO',
},
'loggers': {
'gateway_app': {
'handlers': ['console', 'file'],
'level': 'INFO',
'propagate': False,
},
},
}
"""
# services/gateway/gateway_app/models.py
from django.db import models
import json
class Service(models.Model):
"""服务注册模型"""
name = models.CharField(max_length=100, unique=True)
url = models.URLField()
description = models.TextField(blank=True)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'services'
def __str__(self):
return self.name
class ServiceInstance(models.Model):
"""服务实例模型"""
service = models.ForeignKey(Service, on_delete=models.CASCADE, related_name='instances')
host = models.CharField(max_length=100)
port = models.IntegerField()
health_status = models.CharField(max_length=20, default='unknown')
weight = models.IntegerField(default=1)
metadata = models.JSONField(default=dict, blank=True)
last_heartbeat = models.DateTimeField(auto_now=True)
is_active = models.BooleanField(default=True)
class Meta:
db_table = 'service_instances'
class APIRoute(models.Model):
"""API路由模型"""
METHOD_CHOICES = [
('GET', 'GET'),
('POST', 'POST'),
('PUT', 'PUT'),
('DELETE', 'DELETE'),
('PATCH', 'PATCH'),
('OPTIONS', 'OPTIONS'),
('HEAD', 'HEAD'),
]
path = models.CharField(max_length=200)
target_service = models.ForeignKey(Service, on_delete=models.CASCADE)
methods = models.JSONField(default=list) # ['GET', 'POST']
rate_limit = models.IntegerField(default=100) # 每分钟请求数
timeout = models.IntegerField(default=30) # 超时时间(秒)
retry_count = models.IntegerField(default=3) # 重试次数
circuit_breaker_enabled = models.BooleanField(default=True)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'api_routes'
unique_together = ['path', 'target_service']
class RequestLog(models.Model):
"""请求日志模型"""
route = models.ForeignKey(APIRoute, on_delete=models.CASCADE)
method = models.CharField(max_length=10)
path = models.CharField(max_length=500)
status_code = models.IntegerField()
response_time = models.FloatField() # 毫秒
client_ip = models.GenericIPAddressField()
user_agent = models.TextField(blank=True)
request_headers = models.JSONField(default=dict)
response_headers = models.JSONField(default=dict)
timestamp = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = 'request_logs'
indexes = [
models.Index(fields=['timestamp']),
models.Index(fields=['route', 'timestamp']),
]
class RateLimitRule(models.Model):
"""限流规则模型"""
SCOPE_CHOICES = [
('global', 'Global'),
('user', 'User'),
('ip', 'IP Address'),
('endpoint', 'Endpoint'),
]
scope = models.CharField(max_length=20, choices=SCOPE_CHOICES)
scope_value = models.CharField(max_length=100) # 用户ID、IP地址、端点等
limit = models.IntegerField() # 限制次数
window_size = models.IntegerField() # 时间窗口(秒)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'rate_limit_rules'
unique_together = ['scope', 'scope_value']
class CircuitBreakerRule(models.Model):
"""熔断器规则模型"""
service = models.ForeignKey(Service, on_delete=models.CASCADE)
failure_threshold = models.IntegerField(default=5) # 失败阈值
timeout = models.IntegerField(default=60) # 熔断超时(秒)
recovery_timeout = models.IntegerField(default=30) # 恢复时间(秒)
is_active = models.BooleanField(default=True)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
class Meta:
db_table = 'circuit_breaker_rules'
# services/gateway/gateway_app/services.py
import requests
import json
import time
import logging
from typing import Dict, Any, Optional
from django.core.cache import cache
from .models import Service, ServiceInstance, APIRoute
logger = logging.getLogger(__name__)
class ServiceDiscovery:
"""服务发现服务"""
def __init__(self):
self.cache_ttl = 30 # 30秒缓存
def get_service_instance(self, service_name: str) -> Optional[ServiceInstance]:
"""获取服务实例"""
cache_key = f"service_instance_{service_name}"
instance = cache.get(cache_key)
if instance is None:
try:
service = Service.objects.get(name=service_name, is_active=True)
instance = ServiceInstance.objects.filter(
service=service,
is_active=True,
health_status='healthy'
).first()
if instance:
cache.set(cache_key, instance, self.cache_ttl)
except Service.DoesNotExist:
logger.error(f"Service {service_name} not found")
return None
return instance
def get_all_instances(self, service_name: str) -> list:
"""获取所有服务实例"""
instances = ServiceInstance.objects.filter(
service__name=service_name,
is_active=True,
health_status='healthy'
)
return list(instances)
def register_service(self, service_data: Dict[str, Any]) -> bool:
"""注册服务"""
try:
service, created = Service.objects.get_or_create(
name=service_data['name'],
defaults={
'url': service_data['url'],
'description': service_data.get('description', '')
}
)
if 'instances' in service_data:
for instance_data in service_data['instances']:
ServiceInstance.objects.update_or_create(
service=service,
host=instance_data['host'],
port=instance_data['port'],
defaults={
'weight': instance_data.get('weight', 1),
'metadata': instance_data.get('metadata', {}),
'is_active': True
}
)
# 清除相关缓存
cache.delete_pattern(f"service_instance_{service.name}*")
return True
except Exception as e:
logger.error(f"Failed to register service: {e}")
return False
def deregister_service(self, service_name: str) -> bool:
"""注销服务"""
try:
service = Service.objects.get(name=service_name)
service.is_active = False
service.save()
# 清除缓存
cache.delete_pattern(f"service_instance_{service_name}*")
return True
except Service.DoesNotExist:
return False
class LoadBalancer:
"""负载均衡器"""
def __init__(self):
self.algorithm = 'round_robin'
self.current_index = {}
def select_instance(self, instances: list) -> Optional[ServiceInstance]:
"""选择服务实例"""
if not instances:
return None
active_instances = [inst for inst in instances if inst.is_active and inst.health_status == 'healthy']
if not active_instances:
return None
if self.algorithm == 'round_robin':
return self._round_robin_selection(active_instances)
elif self.algorithm == 'weighted_round_robin':
return self._weighted_round_robin_selection(active_instances)
elif self.algorithm == 'least_connections':
return self._least_connections_selection(active_instances)
else:
return active_instances[0] # 默认轮询
def _round_robin_selection(self, instances: list) -> ServiceInstance:
"""轮询选择"""
service_name = instances[0].service.name
current_idx = self.current_index.get(service_name, 0)
selected = instances[current_idx % len(instances)]
self.current_index[service_name] = (current_idx + 1) % len(instances)
return selected
def _weighted_round_robin_selection(self, instances: list) -> ServiceInstance:
"""加权轮询选择"""
# 简化实现:按权重比例分配
total_weight = sum(inst.weight for inst in instances)
if total_weight == 0:
return instances[0]
# 这里可以实现更复杂的加权算法
return self._round_robin_selection(instances)
def _least_connections_selection(self, instances: list) -> ServiceInstance:
"""最少连接选择"""
# 简化实现:返回第一个实例
# 实际应用中需要维护连接数统计
return instances[0]
class APIClient:
"""API客户端"""
def __init__(self):
self.session = requests.Session()
self.timeout = 30
def forward_request(self,
instance: ServiceInstance,
path: str,
method: str,
headers: dict,
body: Optional[str] = None,
query_params: Optional[dict] = None) -> dict:
"""转发请求到后端服务"""
url = f"http://{instance.host}:{instance.port}{path}"
start_time = time.time()
try:
response = self.session.request(
method=method.upper(),
url=url,
headers=headers,
data=body,
params=query_params,
timeout=self.timeout
)
response_time = (time.time() - start_time) * 1000 # 转换为毫秒
return {
'status_code': response.status_code,
'headers': dict(response.headers),
'body': response.text,
'response_time': response_time
}
except requests.exceptions.Timeout:
logger.error(f"Request timeout to {url}")
return {
'status_code': 504,
'headers': {},
'body': '{"error": "Gateway Timeout"}',
'response_time': (time.time() - start_time) * 1000
}
except requests.exceptions.ConnectionError:
logger.error(f"Connection error to {url}")
return {
'status_code': 502,
'headers': {},
'body': '{"error": "Bad Gateway"}',
'response_time': (time.time() - start_time) * 1000
}
except Exception as e:
logger.error(f"Unexpected error forwarding request: {e}")
return {
'status_code': 500,
'headers': {},
'body': '{"error": "Internal Server Error"}',
'response_time': (time.time() - start_time) * 1000
}
class CircuitBreaker:
"""熔断器"""
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.state = {} # {service_name: {'state': 'closed/open/half-open', 'failure_count': 0, 'last_failure_time': timestamp}}
def call(self, service_name: str, func, *args, **kwargs):
"""执行带熔断的调用"""
service_state = self.state.get(service_name, {
'state': 'closed', # closed, open, half-open
'failure_count': 0,
'last_failure_time': 0
})
if service_state['state'] == 'open':
# 检查是否应该进入半开状态
if time.time() - service_state['last_failure_time'] > self.timeout:
service_state['state'] = 'half_open'
self.state[service_name] = service_state
else:
# 熔断状态,直接返回错误
raise Exception(f"Service {service_name} is currently unavailable")
try:
result = func(*args, **kwargs)
# 调用成功,重置状态
if service_state['state'] != 'closed':
service_state['state'] = 'closed'
service_state['failure_count'] = 0
self.state[service_name] = service_state
return result
except Exception as e:
# 失败调用,更新状态
service_state['failure_count'] += 1
service_state['last_failure_time'] = time.time()
if service_state['failure_count'] >= self.failure_threshold:
service_state['state'] = 'open'
self.state[service_name] = service_state
raise e
class RateLimiter:
"""限流器"""
def __init__(self, redis_client):
self.redis_client = redis_client
def is_allowed(self, key: str, limit: int, window: int) -> bool:
"""检查是否允许请求"""
pipe = self.redis_client.pipeline()
# 获取当前时间戳
now = time.time()
window_start = now - window
# 移除窗口外的请求记录
pipe.zremrangebyscore(key, 0, window_start)
# 获取当前窗口内的请求数
pipe.zcard(key)
# 添加当前请求
pipe.zadd(key, {str(now): now})
# 设置过期时间
pipe.expire(key, window)
results = pipe.execute()
current_requests = results[1]
return current_requests < limit
# services/gateway/gateway_app/middleware.py
from django.http import JsonResponse
import time
import logging
from django.core.cache import cache
from .models import Service, ServiceInstance, RateLimitRule
from .services import RateLimiter
import redis
logger = logging.getLogger(__name__)
redis_client = redis.from_url('redis://localhost:6379/0')
rate_limiter = RateLimiter(redis_client)
class RequestValidationMiddleware:
"""请求验证中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
# 验证请求格式和内容
if request.method in ['POST', 'PUT', 'PATCH']:
content_type = request.META.get('CONTENT_TYPE', '')
if 'application/json' in content_type:
try:
# 验证JSON格式
import json
if request.body:
json.loads(request.body.decode('utf-8'))
except json.JSONDecodeError:
return JsonResponse(
{'error': 'Invalid JSON format'},
status=400
)
response = self.get_response(request)
return response
class ResponseTransformationMiddleware:
"""响应转换中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
response = self.get_response(request)
# 统一响应格式
if response.get('Content-Type', '').startswith('application/json'):
try:
import json
data = json.loads(response.content.decode('utf-8'))
# 如果不是统一格式,则包装
if not isinstance(data, dict) or 'data' not in data:
transformed_data = {
'success': True,
'data': data,
'message': 'Success',
'timestamp': time.time()
}
response.content = json.dumps(transformed_data)
except:
pass
# 添加安全头
response['X-Content-Type-Options'] = 'nosniff'
response['X-Frame-Options'] = 'DENY'
response['X-XSS-Protection'] = '1; mode=block'
return response
class RateLimitMiddleware:
"""限流中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
client_ip = self._get_client_ip(request)
endpoint = request.path
user_id = getattr(request.user, 'id', None) if hasattr(request, 'user') else None
# 检查全局限流
global_key = f"rate_limit:global:{endpoint}"
global_rule = RateLimitRule.objects.filter(scope='global', scope_value=endpoint).first()
if global_rule:
if not rate_limiter.is_allowed(global_key, global_rule.limit, global_rule.window_size):
return JsonResponse({
'error': 'Rate limit exceeded',
'retry_after': global_rule.window_size
}, status=429)
# 检查用户限流
if user_id:
user_key = f"rate_limit:user:{user_id}:{endpoint}"
user_rule = RateLimitRule.objects.filter(scope='user', scope_value=str(user_id)).first()
if user_rule:
if not rate_limiter.is_allowed(user_key, user_rule.limit, user_rule.window_size):
return JsonResponse({
'error': 'User rate limit exceeded',
'retry_after': user_rule.window_size
}, status=429)
# 检查IP限流
ip_key = f"rate_limit:ip:{client_ip}:{endpoint}"
ip_rule = RateLimitRule.objects.filter(scope='ip', scope_value=client_ip).first()
if ip_rule:
if not rate_limiter.is_allowed(ip_key, ip_rule.limit, ip_rule.window_size):
return JsonResponse({
'error': 'IP rate limit exceeded',
'retry_after': ip_rule.window_size
}, status=429)
response = self.get_response(request)
return response
def _get_client_ip(self, request):
"""获取客户端IP"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
class CircuitBreakerMiddleware:
"""熔断中间件"""
def __init__(self, get_response):
self.get_response = get_response
self.circuit_breakers = {}
def __call__(self, request):
# 这里可以实现基于服务的熔断逻辑
response = self.get_response(request)
return response
class LoggingMiddleware:
"""日志中间件"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
start_queries = len(connection.queries)
response = self.get_response(request)
duration = (time.time() - start_time) * 1000 # 毫秒
query_count = len(connection.queries) - start_queries
logger.info(f"REQUEST: {request.method} {request.path} "
f"STATUS: {response.status_code} "
f"DURATION: {duration:.2f}ms "
f"QUERIES: {query_count} "
f"IP: {self._get_client_ip(request)}")
return response
def _get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip#网关核心功能实现
# services/gateway/gateway_app/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status
from django.http import JsonResponse, HttpResponse
from django.views import View
import json
import time
import logging
from .services import ServiceDiscovery, LoadBalancer, APIClient, CircuitBreaker
from .models import APIRoute, RequestLog
from shared.common.responses import SuccessResponse, ErrorResponse
logger = logging.getLogger(__name__)
class GatewayView(View):
"""API网关视图"""
def __init__(self):
super().__init__()
self.service_discovery = ServiceDiscovery()
self.load_balancer = LoadBalancer()
self.api_client = APIClient()
self.circuit_breaker = CircuitBreaker()
def dispatch(self, request, *args, **kwargs):
"""请求分发"""
path = request.path
method = request.method
# 查找匹配的路由
try:
route = APIRoute.objects.get(
path=path,
methods__contains=[method],
is_active=True
)
except APIRoute.DoesNotExist:
return JsonResponse({'error': 'Route not found'}, status=404)
# 检查服务实例
instance = self.service_discovery.get_service_instance(route.target_service.name)
if not instance:
return JsonResponse({'error': 'Service unavailable'}, status=503)
# 构建请求头
headers = self._build_headers(request)
# 构建请求体
body = request.body.decode('utf-8') if request.body else None
# 构建查询参数
query_params = dict(request.GET)
# 执行请求
try:
response_data = self.circuit_breaker.call(
route.target_service.name,
self.api_client.forward_request,
instance,
path,
method,
headers,
body,
query_params
)
# 记录请求日志
self._log_request(route, request, response_data)
# 返回响应
response = HttpResponse(
content=response_data['body'],
status=response_data['status_code'],
content_type='application/json'
)
# 设置响应头
for key, value in response_data['headers'].items():
response[key] = value
return response
except Exception as e:
logger.error(f"Gateway error: {e}")
return JsonResponse({'error': 'Internal Server Error'}, status=500)
def _build_headers(self, request):
"""构建请求头"""
headers = {}
# 复制原始请求头
for key, value in request.META.items():
if key.startswith('HTTP_'):
header_name = key[5:].replace('_', '-').title()
headers[header_name] = value
# 添加认证信息
auth_header = request.META.get('HTTP_AUTHORIZATION')
if auth_header:
headers['Authorization'] = auth_header
# 添加网关标识
headers['X-Gateway-Forwarded'] = 'true'
headers['X-Forwarded-For'] = request.META.get('REMOTE_ADDR', '')
headers['X-Forwarded-Proto'] = request.scheme
return headers
def _log_request(self, route, request, response_data):
"""记录请求日志"""
try:
RequestLog.objects.create(
route=route,
method=request.method,
path=request.path,
status_code=response_data['status_code'],
response_time=response_data['response_time'],
client_ip=request.META.get('REMOTE_ADDR', ''),
user_agent=request.META.get('HTTP_USER_AGENT', ''),
request_headers=dict(request.headers),
response_headers=response_data['headers']
)
except Exception as e:
logger.error(f"Failed to log request: {e}")
class ServiceRegistryView(APIView):
"""服务注册视图"""
def post(self, request):
"""注册服务"""
service_data = request.data
discovery = ServiceDiscovery()
success = discovery.register_service(service_data)
if success:
return Response({'message': 'Service registered successfully'}, status=status.HTTP_201_CREATED)
else:
return Response({'error': 'Failed to register service'}, status=status.HTTP_400_BAD_REQUEST)
def delete(self, request, service_name):
"""注销服务"""
discovery = ServiceDiscovery()
success = discovery.deregister_service(service_name)
if success:
return Response({'message': 'Service deregistered successfully'})
else:
return Response({'error': 'Service not found'}, status=status.HTTP_404_NOT_FOUND}
class HealthCheckView(APIView):
"""健康检查视图"""
def get(self, request):
"""网关健康检查"""
return Response({
'status': 'healthy',
'timestamp': time.time(),
'version': '1.0.0',
'services': self._check_backend_services()
})
def _check_backend_services(self):
"""检查后端服务健康状态"""
services = Service.objects.filter(is_active=True)
health_status = {}
for service in services:
instance = self.service_discovery.get_service_instance(service.name)
health_status[service.name] = {
'status': 'healthy' if instance else 'unhealthy',
'instance': instance.host if instance else None,
'port': instance.port if instance else None
}
return health_status
class RouteManagementView(APIView):
"""路由管理视图"""
def get(self, request):
"""获取路由列表"""
routes = APIRoute.objects.filter(is_active=True)
route_data = []
for route in routes:
route_data.append({
'id': str(route.id),
'path': route.path,
'service': route.target_service.name,
'methods': route.methods,
'rate_limit': route.rate_limit,
'timeout': route.timeout,
'created_at': route.created_at,
})
return SuccessResponse({'routes': route_data})
def post(self, request):
"""创建路由"""
try:
service_name = request.data.get('service_name')
path = request.data.get('path')
methods = request.data.get('methods', [])
try:
service = Service.objects.get(name=service_name, is_active=True)
except Service.DoesNotExist:
return ErrorResponse({'error': 'Service not found'}, status=status.HTTP_404_NOT_FOUND)
route = APIRoute.objects.create(
path=path,
target_service=service,
methods=methods,
rate_limit=request.data.get('rate_limit', 100),
timeout=request.data.get('timeout', 30),
retry_count=request.data.get('retry_count', 3),
)
return SuccessResponse({
'route': {
'id': str(route.id),
'path': route.path,
'service': route.target_service.name,
'methods': route.methods,
},
'message': 'Route created successfully'
})
except Exception as e:
logger.error(f"Create route error: {str(e)}")
return ErrorResponse({'error': 'Failed to create route'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def put(self, request, route_id):
"""更新路由"""
try:
route = APIRoute.objects.get(id=route_id)
for field in ['path', 'methods', 'rate_limit', 'timeout', 'retry_count']:
if field in request.data:
setattr(route, field, request.data[field])
route.save()
return SuccessResponse({
'route': {
'id': str(route.id),
'path': route.path,
'service': route.target_service.name,
'methods': route.methods,
},
'message': 'Route updated successfully'
})
except APIRoute.DoesNotExist:
return ErrorResponse({'error': 'Route not found'}, status=status.HTTP_404_NOT_FOUND)
except Exception as e:
logger.error(f"Update route error: {str(e)}")
return ErrorResponse({'error': 'Failed to update route'}, status=status.HTTP_500_INTERNAL_SERVER_ERROR)
def delete(self, request, route_id):
"""删除路由"""
try:
route = APIRoute.objects.get(id=route_id)
route.is_active = False # 软删除
route.save()
return SuccessResponse({'message': 'Route deleted successfully'})
except APIRoute.DoesNotExist:
return ErrorResponse({'error': 'Route not found'}, status=status.HTTP_404_NOT_FOUND)
class AnalyticsView(APIView):
"""分析视图"""
def get(self, request):
"""获取网关分析数据"""
from datetime import datetime, timedelta
from django.db.models import Count, Avg
# 时间范围
days = int(request.query_params.get('days', 7))
start_date = datetime.now() - timedelta(days=days)
# 请求统计
request_stats = RequestLog.objects.filter(
timestamp__gte=start_date
).aggregate(
total_requests=Count('id'),
avg_response_time=Avg('response_time'),
status_2xx=Count('id', filter=models.Q(status_code__range=(200, 299))),
status_4xx=Count('id', filter=models.Q(status_code__range=(400, 499))),
status_5xx=Count('id', filter=models.Q(status_code__range=(500, 599))),
)
# 按路径统计
path_stats = RequestLog.objects.filter(
timestamp__gte=start_date
).values('route__path').annotate(
count=Count('id'),
avg_response_time=Avg('response_time')
).order_by('-count')[:10]
# 按状态码统计
status_stats = RequestLog.objects.filter(
timestamp__gte=start_date
).values('status_code').annotate(
count=Count('id')
).order_by('-count')
return SuccessResponse({
'request_stats': request_stats,
'top_paths': list(path_stats),
'status_codes': list(status_stats),
'period_days': days,
})
# services/gateway/gateway_app/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('health/', views.HealthCheckView.as_view(), name='gateway-health'),
path('services/register/', views.ServiceRegistryView.as_view(), name='service-register'),
path('services/<str:service_name>/deregister/', views.ServiceRegistryView.as_view(), name='service-deregister'),
path('routes/', views.RouteManagementView.as_view(), name='routes-list'),
path('routes/create/', views.RouteManagementView.as_view(), name='route-create'),
path('routes/<uuid:route_id>/', views.RouteManagementView.as_view(), name='route-detail'),
path('analytics/', views.AnalyticsView.as_view(), name='analytics'),
]#前端集成
#Vue.js前端应用
// frontend/package.json
{
"name": "ecommerce-frontend",
"version": "1.0.0",
"description": "电商平台前端应用",
"main": "src/main.js",
"scripts": {
"dev": "vite",
"build": "vite build",
"preview": "vite preview",
"lint": "eslint src --ext .vue,.js,.jsx,.cjs,.mjs --fix",
"format": "prettier --write src/"
},
"dependencies": {
"vue": "^3.3.0",
"vue-router": "^4.2.0",
"vuex": "^4.1.0",
"axios": "^1.4.0",
"element-plus": "^2.3.0",
"sass": "^1.62.0",
"normalize.css": "^8.0.1",
"@vueuse/core": "^10.1.0"
},
"devDependencies": {
"@vitejs/plugin-vue": "^4.2.0",
"vite": "^4.3.0",
"eslint": "^8.40.0",
"prettier": "^2.8.8",
"eslint-plugin-vue": "^9.11.0"
}
}
// frontend/src/main.js
import { createApp } from 'vue'
import { createPinia } from 'pinia'
import ElementPlus from 'element-plus'
import 'element-plus/dist/index.css'
import * as ElementPlusIconsVue from '@element-plus/icons-vue'
import App from './App.vue'
import router from './router'
import store from './store'
import './assets/styles/index.scss'
const app = createApp(App)
// 注册所有图标
for (const [key, component] of Object.entries(ElementPlusIconsVue)) {
app.component(key, component)
}
app.use(createPinia())
app.use(router)
app.use(store)
app.use(ElementPlus)
app.mount('#app')
// frontend/src/api/index.js
import axios from 'axios'
// 创建axios实例
const api = axios.create({
baseURL: process.env.VUE_APP_API_BASE_URL || 'http://localhost:8000/api/v1/',
timeout: 10000,
headers: {
'Content-Type': 'application/json'
}
})
// 请求拦截器
api.interceptors.request.use(
config => {
// 添加认证token
const token = localStorage.getItem('access_token')
if (token) {
config.headers.Authorization = `Bearer ${token}`
}
return config
},
error => {
return Promise.reject(error)
}
)
// 响应拦截器
api.interceptors.response.use(
response => {
return response.data
},
error => {
// 统一错误处理
if (error.response?.status === 401) {
// 重定向到登录页
localStorage.removeItem('access_token')
localStorage.removeItem('refresh_token')
window.location.href = '/login'
}
return Promise.reject(error)
}
)
export default api
// frontend/src/api/auth.js
import api from './index'
export const authAPI = {
// 用户注册
register(userData) {
return api.post('/auth/register/', userData)
},
// 用户登录
login(credentials) {
return api.post('/auth/login/', credentials)
},
// 用户登出
logout() {
return api.post('/auth/logout/')
},
// 获取用户信息
getUserProfile() {
return api.get('/auth/profile/')
},
// 更新用户信息
updateUserProfile(profileData) {
return api.put('/auth/profile/', profileData)
},
// 修改密码
changePassword(passwordData) {
return api.post('/auth/change-password/', passwordData)
}
}
// frontend/src/api/product.js
import api from './index'
export const productAPI = {
// 获取商品列表
getProducts(params = {}) {
return api.get('/products/', params)
},
// 获取商品详情
getProduct(productId) {
return api.get(`/products/${productId}/`)
},
// 搜索商品
searchProducts(query, params = {}) {
return api.get(`/products/search/?q=${query}`, params)
},
// 获取商品分类
getCategories() {
return api.get('/categories/')
},
// 获取商品品牌
getBrands() {
return api.get('/brands/')
},
// 添加商品评价
addReview(productId, reviewData) {
return api.post(`/products/${productId}/reviews/`, reviewData)
}
}
// frontend/src/api/cart.js
import api from './index'
export const cartAPI = {
// 获取购物车
getCart() {
return api.get('/cart/')
},
// 添加商品到购物车
addToCart(itemData) {
return api.post('/cart/', itemData)
},
// 更新购物车商品
updateCartItem(productId, quantity) {
return api.put('/cart/', { product_id: productId, quantity })
},
// 从购物车删除商品
removeFromCart(productId) {
return api.delete(`/cart/?product_id=${productId}`)
}
}
// frontend/src/api/order.js
import api from './index'
export const orderAPI = {
// 创建订单
createOrder(orderData) {
return api.post('/orders/', orderData)
},
// 获取订单列表
getOrders(params = {}) {
return api.get('/orders/', params)
},
// 获取订单详情
getOrder(orderId) {
return api.get(`/orders/${orderId}/`)
},
// 取消订单
cancelOrder(orderId) {
return api.post(`/orders/${orderId}/cancel/`)
},
// 支付订单
payOrder(orderId, paymentData) {
return api.post(`/orders/${orderId}/pay/`, paymentData)
}
}
// frontend/src/store/modules/auth.js
import { defineStore } from 'pinia'
import { authAPI } from '@/api/auth'
export const useAuthStore = defineStore('auth', {
state: () => ({
user: null,
accessToken: localStorage.getItem('access_token'),
refreshToken: localStorage.getItem('refresh_token'),
isAuthenticated: false
}),
getters: {
isLoggedIn: (state) => !!state.accessToken,
currentUser: (state) => state.user
},
actions: {
async login(credentials) {
try {
const response = await authAPI.login(credentials)
const { user, tokens } = response.data
this.user = user
this.accessToken = tokens.access
this.refreshToken = tokens.refresh
this.isAuthenticated = true
localStorage.setItem('access_token', tokens.access)
localStorage.setItem('refresh_token', tokens.refresh)
return { success: true, user }
} catch (error) {
return { success: false, error: error.response?.data }
}
},
async register(userData) {
try {
const response = await authAPI.register(userData)
const { user, tokens } = response.data
this.user = user
this.accessToken = tokens.access
this.refreshToken = tokens.refresh
this.isAuthenticated = true
localStorage.setItem('access_token', tokens.access)
localStorage.setItem('refresh_token', tokens.refresh)
return { success: true, user }
} catch (error) {
return { success: false, error: error.response?.data }
}
},
async logout() {
try {
await authAPI.logout()
} catch (error) {
console.error('Logout error:', error)
} finally {
this.user = null
this.accessToken = null
this.refreshToken = null
this.isAuthenticated = false
localStorage.removeItem('access_token')
localStorage.removeItem('refresh_token')
}
},
async fetchUserProfile() {
if (!this.accessToken) return
try {
const response = await authAPI.getUserProfile()
this.user = response.data.user
} catch (error) {
console.error('Fetch user profile error:', error)
if (error.response?.status === 401) {
this.logout()
}
}
}
}
})
// frontend/src/store/modules/cart.js
import { defineStore } from 'pinia'
import { cartAPI } from '@/api/cart'
export const useCartStore = defineStore('cart', {
state: () => ({
items: [],
totalItems: 0,
totalAmount: 0
}),
getters: {
cartItemCount: (state) => state.items.length,
cartTotalPrice: (state) => state.totalAmount
},
actions: {
async fetchCart() {
try {
const response = await cartAPI.getCart()
const cart = response.data
this.items = cart.items || []
this.totalItems = cart.total_items || 0
this.totalAmount = cart.total_amount || 0
} catch (error) {
console.error('Fetch cart error:', error)
}
},
async addToCart(product, quantity = 1) {
try {
const response = await cartAPI.addToCart({
product_id: product.id,
quantity: quantity,
attributes: product.attributes || {}
})
await this.fetchCart()
return { success: true }
} catch (error) {
return { success: false, error: error.response?.data }
}
},
async updateCartItem(productId, quantity) {
try {
await cartAPI.updateCartItem(productId, quantity)
await this.fetchCart()
} catch (error) {
console.error('Update cart item error:', error)
}
},
async removeFromCart(productId) {
try {
await cartAPI.removeFromCart(productId)
await this.fetchCart()
} catch (error) {
console.error('Remove from cart error:', error)
}
}
}
})
// frontend/src/components/Header.vue
<template>
<header class="header">
<div class="container">
<div class="header-content">
<div class="logo">
<router-link to="/">
<h1>电商平台</h1>
</router-link>
</div>
<nav class="nav">
<router-link to="/" class="nav-item">首页</router-link>
<router-link to="/products" class="nav-item">商品</router-link>
<router-link to="/categories" class="nav-item">分类</router-link>
<router-link to="/about" class="nav-item">关于我们</router-link>
</nav>
<div class="header-actions">
<div class="search-box">
<el-input
v-model="searchQuery"
placeholder="搜索商品..."
@keyup.enter="handleSearch"
class="search-input"
>
<template #suffix>
<el-icon @click="handleSearch"><Search /></el-icon>
</template>
</el-input>
</div>
<div class="user-actions">
<router-link to="/cart" class="cart-link">
<el-badge :value="cartItemCount" class="item" type="danger" :hidden="cartItemCount === 0">
<el-icon><ShoppingCart /></el-icon>
</el-badge>
</router-link>
<div v-if="isLoggedIn" class="user-menu">
<el-dropdown>
<span class="el-dropdown-link">
{{ currentUser.username }}
<el-icon class="el-icon--right"><ArrowDown /></el-icon>
</span>
<template #dropdown>
<el-dropdown-menu>
<el-dropdown-item @click="$router.push('/profile')">个人资料</el-dropdown-item>
<el-dropdown-item @click="$router.push('/orders')">我的订单</el-dropdown-item>
<el-dropdown-item @click="handleLogout">退出登录</el-dropdown-item>
</el-dropdown-menu>
</template>
</el-dropdown>
</div>
<div v-else class="auth-buttons">
<el-button type="primary" @click="$router.push('/login')">登录</el-button>
<el-button @click="$router.push('/register')">注册</el-button>
</div>
</div>
</div>
</div>
</div>
</header>
</template>
<script setup>
import { ref, computed } from 'vue'
import { useRouter } from 'vue-router'
import { useAuthStore } from '@/store/modules/auth'
import { useCartStore } from '@/store/modules/cart'
import { Search, ShoppingCart, ArrowDown } from '@element-plus/icons-vue'
const router = useRouter()
const authStore = useAuthStore()
const cartStore = useCartStore()
const searchQuery = ref('')
const isLoggedIn = computed(() => authStore.isLoggedIn)
const currentUser = computed(() => authStore.currentUser)
const cartItemCount = computed(() => cartStore.cartItemCount)
const handleSearch = () => {
if (searchQuery.value.trim()) {
router.push(`/search?q=${encodeURIComponent(searchQuery.value)}`)
searchQuery.value = ''
}
}
const handleLogout = async () => {
await authStore.logout()
router.push('/login')
}
</script>
<style scoped>
.header {
background: #fff;
box-shadow: 0 2px 4px rgba(0,0,0,.1);
position: sticky;
top: 0;
z-index: 1000;
}
.container {
max-width: 1200px;
margin: 0 auto;
padding: 0 20px;
}
.header-content {
display: flex;
align-items: center;
justify-content: space-between;
height: 60px;
}
.logo h1 {
margin: 0;
font-size: 1.5rem;
color: #409eff;
}
.nav {
display: flex;
gap: 2rem;
}
.nav-item {
text-decoration: none;
color: #333;
font-weight: 500;
transition: color 0.3s;
}
.nav-item.router-link-active,
.nav-item:hover {
color: #409eff;
}
.header-actions {
display: flex;
align-items: center;
gap: 1rem;
}
.search-box {
width: 300px;
}
.user-actions {
display: flex;
align-items: center;
gap: 1rem;
}
.cart-link {
color: #333;
font-size: 1.2rem;
text-decoration: none;
}
.auth-buttons {
display: flex;
gap: 0.5rem;
}
</style>
// frontend/src/views/Home.vue
<template>
<div class="home">
<!-- 轮播图 -->
<el-carousel :interval="4000" type="card" height="200px">
<el-carousel-item v-for="item in banners" :key="item.id">
<h3>{{ item.title }}</h3>
</el-carousel-item>
</el-carousel>
<!-- 热门商品 -->
<section class="featured-products">
<h2>热门推荐</h2>
<div class="products-grid">
<ProductCard
v-for="product in featuredProducts"
:key="product.id"
:product="product"
/>
</div>
</section>
<!-- 商品分类 -->
<section class="categories">
<h2>商品分类</h2>
<div class="categories-grid">
<CategoryCard
v-for="category in categories"
:key="category.id"
:category="category"
/>
</div>
</section>
</div>
</template>
<script setup>
import { ref, onMounted } from 'vue'
import ProductCard from '@/components/ProductCard.vue'
import CategoryCard from '@/components/CategoryCard.vue'
import { productAPI } from '@/api/product'
const banners = ref([
{ id: 1, title: '夏季大促', subtitle: '全场五折起' },
{ id: 2, title: '新品上市', subtitle: '抢先体验' },
{ id: 3, title: '会员专享', subtitle: '尊享特权' }
])
const featuredProducts = ref([])
const categories = ref([])
onMounted(async () => {
try {
// 获取热门商品
const productsResponse = await productAPI.getProducts({
ordering: '-sales_count',
limit: 8
})
featuredProducts.value = productsResponse.data.results || []
// 获取商品分类
const categoriesResponse = await productAPI.getCategories()
categories.value = categoriesResponse.data.results || []
} catch (error) {
console.error('Failed to load home data:', error)
}
})
</script>
<style scoped>
.home {
padding: 20px 0;
}
.el-carousel__item h3 {
display: flex;
color: #475669;
opacity: 0.75;
line-height: 200px;
margin: 0;
font-size: 18px;
font-weight: bold;
}
.el-carousel__item:nth-child(2n) {
background-color: #99a9bf;
}
.el-carousel__item:nth-child(2n + 1) {
background-color: #d3dce6;
}
.featured-products,
.categories {
margin: 40px 0;
padding: 0 20px;
}
.featured-products h2,
.categories h2 {
font-size: 1.5rem;
margin-bottom: 20px;
text-align: center;
}
.products-grid,
.categories-grid {
display: grid;
grid-template-columns: repeat(auto-fill, minmax(250px, 1fr));
gap: 20px;
}
</style>#部署上线
#Docker容器化部署
# services/gateway/Dockerfile
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# 复制应用代码
COPY . .
# 收集静态文件
RUN python manage.py collectstatic --noinput
# 创建非root用户
RUN useradd --create-home --shell /bin/bash app
USER app
EXPOSE 8000
CMD ["gunicorn", "--bind", "0.0.0.0:8000", "--workers", "4", "gateway.wsgi:application"]
# services/user/Dockerfile
FROM python:3.11-slim
WORKDIR /app
RUN apt-get update && apt-get install -y \
gcc \
postgresql-client \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN python manage.py collectstatic --noinput
RUN useradd --create-home --shell /bin/bash app
USER app
EXPOSE 8001
CMD ["gunicorn", "--bind", "0.0.0.0:8001", "--workers", "4", "user_service.wsgi:application"]
# docker-compose.yml
version: '3.8'
services:
# 数据库
postgres:
image: postgres:15
container_name: ecommerce_postgres
environment:
POSTGRES_DB: ecommerce
POSTGRES_USER: ecommerce_user
POSTGRES_PASSWORD: ${DB_PASSWORD:-password}
volumes:
- postgres_data:/var/lib/postgresql/data
- ./init.sql:/docker-entrypoint-initdb.d/init.sql
ports:
- "5432:5432"
networks:
- ecommerce_network
# Redis
redis:
image: redis:7-alpine
container_name: ecommerce_redis
ports:
- "6379:6379"
volumes:
- redis_data:/data
networks:
- ecommerce_network
# RabbitMQ
rabbitmq:
image: rabbitmq:3-management
container_name: ecommerce_rabbitmq
environment:
RABBITMQ_DEFAULT_USER: ${RABBITMQ_USER:-guest}
RABBITMQ_DEFAULT_PASS: ${RABBITMQ_PASS:-guest}
ports:
- "5672:5672"
- "15672:15672"
volumes:
- rabbitmq_data:/var/lib/rabbitmq
networks:
- ecommerce_network
# Elasticsearch (用于搜索服务)
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.8.0
container_name: ecommerce_elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ulimits:
memlock:
soft: -1
hard: -1
ports:
- "9200:9200"
volumes:
- es_data:/usr/share/elasticsearch/data
networks:
- ecommerce_network
# API网关
gateway:
build: ./services/gateway
container_name: ecommerce_gateway
depends_on:
- postgres
- redis
environment:
- DEBUG=${DEBUG:-False}
- DB_HOST=postgres
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY:-your-secret-key}
ports:
- "8000:8000"
networks:
- ecommerce_network
restart: unless-stopped
# 用户服务
user-service:
build: ./services/user
container_name: ecommerce_user_service
depends_on:
- postgres
- redis
environment:
- DEBUG=${DEBUG:-False}
- DB_HOST=postgres
- REDIS_URL=redis://redis:6379/0
- SECRET_KEY=${SECRET_KEY:-your-secret-key}
ports:
- "8001:8001"
networks:
- ecommerce_network
restart: unless-stopped
# 商品服务
product-service:
build: ./services/product
container_name: ecommerce_product_service
depends_on:
- postgres
- redis
- elasticsearch
environment:
- DEBUG=${DEBUG:-False}
- DB_HOST=postgres
- REDIS_URL=redis://redis:6379/0
- ELASTICSEARCH_HOST=elasticsearch
- SECRET_KEY=${SECRET_KEY:-your-secret-key}
ports:
- "8002:8002"
networks:
- ecommerce_network
restart: unless-stopped
# 订单服务
order-service:
build: ./services/order
container_name: ecommerce_order_service
depends_on:
- postgres
- redis
- rabbitmq
environment:
- DEBUG=${DEBUG:-False}
- DB_HOST=postgres
- REDIS_URL=redis://redis:6379/0
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672//
- SECRET_KEY=${SECRET_KEY:-your-secret-key}
ports:
- "8003:8003"
networks:
- ecommerce_network
restart: unless-stopped
# 支付服务
payment-service:
build: ./services/payment
container_name: ecommerce_payment_service
depends_on:
- postgres
- redis
environment:
- DEBUG=${DEBUG:-False}
- DB_HOST=postgres
- REDIS_URL=redis://redis:6379/0
- STRIPE_SECRET_KEY=${STRIPE_SECRET_KEY}
- ALIPAY_APP_ID=${ALIPAY_APP_ID}
- SECRET_KEY=${SECRET_KEY:-your-secret-key}
ports:
- "8004:8004"
networks:
- ecommerce_network
restart: unless-stopped
# 通知服务
notification-service:
build: ./services/notification
container_name: ecommerce_notification_service
depends_on:
- postgres
- redis
- rabbitmq
environment:
- DEBUG=${DEBUG:-False}
- DB_HOST=postgres
- REDIS_URL=redis://redis:6379/0
- RABBITMQ_URL=amqp://guest:guest@rabbitmq:5672//
- EMAIL_HOST=${EMAIL_HOST}
- EMAIL_HOST_USER=${EMAIL_HOST_USER}
- EMAIL_HOST_PASSWORD=${EMAIL_HOST_PASSWORD}
- SECRET_KEY=${SECRET_KEY:-your-secret-key}
ports:
- "8005:8005"
networks:
- ecommerce_network
restart: unless-stopped
# Nginx反向代理
nginx:
image: nginx:alpine
container_name: ecommerce_nginx
depends_on:
- gateway
ports:
- "80:80"
- "443:443"
volumes:
- ./docker/nginx/nginx.conf:/etc/nginx/nginx.conf
- ./docker/nginx/conf.d:/etc/nginx/conf.d
- ./ssl:/etc/nginx/ssl
networks:
- ecommerce_network
restart: unless-stopped
# Celery Worker (用于异步任务)
celery-worker:
build: ./services/shared
container_name: ecommerce_celery_worker
depends_on:
- redis
- rabbitmq
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
- CELERY_RESULT_BACKEND=redis://redis:6379/0
command: celery -A worker worker --loglevel=info
networks:
- ecommerce_network
restart: unless-stopped
# Celery Beat (用于定时任务)
celery-beat:
build: ./services/shared
container_name: ecommerce_celery_beat
depends_on:
- redis
- rabbitmq
environment:
- CELERY_BROKER_URL=amqp://guest:guest@rabbitmq:5672//
- CELERY_RESULT_BACKEND=redis://redis:6379/0
command: celery -A worker beat --loglevel=info
networks:
- ecommerce_network
restart: unless-stopped
volumes:
postgres_data:
redis_data:
rabbitmq_data:
es_data:
networks:
ecommerce_network:
driver: bridge#Kubernetes部署配置
# k8s/namespace.yaml
apiVersion: v1
kind: Namespace
metadata:
name: ecommerce-platform
---
# k8s/postgres-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres
namespace: ecommerce-platform
labels:
app: postgres
spec:
replicas: 1
selector:
matchLabels:
app: postgres
template:
metadata:
labels:
app: postgres
spec:
containers:
- name: postgres
image: postgres:15
env:
- name: POSTGRES_DB
value: ecommerce
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: db-secret
key: username
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: db-secret
key: password
ports:
- containerPort: 5432
volumeMounts:
- name: postgres-storage
mountPath: /var/lib/postgresql/data
volumes:
- name: postgres-storage
persistentVolumeClaim:
claimName: postgres-pvc
---
apiVersion: v1
kind: Service
metadata:
name: postgres-service
namespace: ecommerce-platform
spec:
selector:
app: postgres
ports:
- protocol: TCP
port: 5432
targetPort: 5432
type: ClusterIP
---
# k8s/redis-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: redis
namespace: ecommerce-platform
labels:
app: redis
spec:
replicas: 1
selector:
matchLabels:
app: redis
template:
metadata:
labels:
app: redis
spec:
containers:
- name: redis
image: redis:7-alpine
ports:
- containerPort: 6379
---
apiVersion: v1
kind: Service
metadata:
name: redis-service
namespace: ecommerce-platform
spec:
selector:
app: redis
ports:
- protocol: TCP
port: 6379
targetPort: 6379
type: ClusterIP
---
# k8s/gateway-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: gateway
namespace: ecommerce-platform
labels:
app: gateway
spec:
replicas: 3
selector:
matchLabels:
app: gateway
template:
metadata:
labels:
app: gateway
spec:
containers:
- name: gateway
image: ecommerce/gateway:latest
ports:
- containerPort: 8000
env:
- name: DB_HOST
value: postgres-service
- name: REDIS_URL
value: redis://redis-service:6379/0
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: app-secret
key: secret-key
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health/
port: 8000
initialDelaySeconds:
