#Django文件上传与存储 - 完整的文件处理解决方案
📂 所属阶段:第二部分 — 进阶特性
🎯 难度等级:中级
⏰ 预计学习时间:4-5小时
#目录
#文件上传基础概念
Django提供了完整的文件上传和处理框架,支持本地存储、云存储等多种方式。
#文件上传原理
"""
文件上传工作原理:
1. 客户端使用multipart/form-data编码上传文件
2. Django接收文件并创建UploadedFile对象
3. 文件被存储到临时位置或直接处理
4. 可以将文件保存到指定位置或存储服务
5. 在模型中保存文件路径信息
文件类型:
- UploadedFile: 基础上传文件类
- TemporaryUploadedFile: 临时上传文件
- InMemoryUploadedFile: 内存中的上传文件
"""#文件上传表单
# 文件上传表单基础
from django import forms
from django.core.files.uploadedfile import UploadedFile
class FileUploadForm(forms.Form):
"""基础文件上传表单"""
title = forms.CharField(max_length=100)
file = forms.FileField(
label='选择文件',
help_text='支持的文件类型: PDF, DOC, XLS, JPG, PNG'
)
def clean_file(self):
"""验证上传的文件"""
uploaded_file = self.cleaned_data['file']
# 检查文件大小(例如限制为5MB)
if uploaded_file.size > 5 * 1024 * 1024: # 5MB
raise forms.ValidationError('文件大小不能超过5MB')
# 检查文件类型
allowed_extensions = ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.jpg', '.jpeg', '.png']
import os
ext = os.path.splitext(uploaded_file.name)[1].lower()
if ext not in allowed_extensions:
raise forms.ValidationError(
f'不支持的文件类型: {ext}。支持的类型: {", ".join(allowed_extensions)}'
)
return uploaded_file
# 模型表单中的文件字段
from django.db import models
class Document(models.Model):
"""文档模型"""
title = models.CharField(max_length=200)
file = models.FileField(upload_to='documents/%Y/%m/') # 按年月组织
uploaded_at = models.DateTimeField(auto_now_add=True)
file_size = models.PositiveIntegerField(null=True, blank=True) # 文件大小
content_type = models.CharField(max_length=100, null=True, blank=True) # MIME类型
def save(self, *args, **kwargs):
"""保存时设置文件相关信息"""
if self.file:
self.file_size = self.file.size
if hasattr(self.file, 'content_type'):
self.content_type = self.file.content_type
super().save(*args, **kwargs)
class DocumentForm(forms.ModelForm):
"""文档模型表单"""
class Meta:
model = Document
fields = ['title', 'file']#文件上传视图
# 文件上传视图
from django.shortcuts import render, redirect
from django.contrib import messages
from django.views.decorators.csrf import csrf_protect
from django.http import JsonResponse
import json
@csrf_protect
def upload_file_view(request):
"""文件上传视图"""
if request.method == 'POST':
form = FileUploadForm(request.POST, request.FILES)
if form.is_valid():
# 处理上传的文件
uploaded_file = request.FILES['file']
# 方法1: 直接保存到模型
document = Document.objects.create(
title=form.cleaned_data['title'],
file=uploaded_file
)
messages.success(request, '文件上传成功!')
return redirect('document_list')
else:
messages.error(request, '文件上传失败,请检查文件格式和大小。')
else:
form = FileUploadForm()
return render(request, 'upload.html', {'form': form})
# AJAX文件上传
def ajax_upload_view(request):
"""AJAX文件上传视图"""
if request.method == 'POST' and request.FILES:
try:
uploaded_file = request.FILES['file']
# 验证文件
if uploaded_file.size > 10 * 1024 * 1024: # 10MB限制
return JsonResponse({
'success': False,
'error': '文件太大,最大支持10MB'
})
# 保存文件
document = Document.objects.create(
title=uploaded_file.name,
file=uploaded_file
)
return JsonResponse({
'success': True,
'file_id': document.id,
'file_url': document.file.url,
'file_name': uploaded_file.name
})
except Exception as e:
return JsonResponse({
'success': False,
'error': str(e)
})
return JsonResponse({'success': False, 'error': '无效请求'})
# 进度条上传(使用FormData和JavaScript)
def progress_upload_view(request):
"""支持进度显示的文件上传"""
if request.method == 'POST':
# 这里可以集成进度跟踪逻辑
# 通常需要结合JavaScript和WebSocket来实现实时进度
uploaded_file = request.FILES['file']
# 保存文件
document = Document.objects.create(
title=uploaded_file.name,
file=uploaded_file
)
return JsonResponse({
'success': True,
'file_url': document.file.url
})
return render(request, 'progress_upload.html')#Django文件处理架构
#文件处理核心组件
# Django文件处理核心组件
from django.core.files.base import File, ContentFile
from django.core.files.storage import default_storage, FileSystemStorage
from django.core.files.uploadedfile import (
UploadedFile, TemporaryUploadedFile, InMemoryUploadedFile
)
import tempfile
import os
# 文件存储后端
"""
Django文件存储后端:
1. FileSystemStorage: 本地文件系统存储(默认)
2. S3Boto3Storage: AWS S3存储
3. GoogleCloudStorage: Google Cloud存储
4. AzureStorage: Microsoft Azure存储
"""
# 文件处理工具类
class FileProcessor:
"""文件处理工具类"""
@staticmethod
def get_file_info(uploaded_file):
"""获取文件信息"""
return {
'name': uploaded_file.name,
'size': uploaded_file.size,
'content_type': getattr(uploaded_file, 'content_type', ''),
'charset': getattr(uploaded_file, 'charset', ''),
'temporary_file_path': getattr(uploaded_file, 'temporary_file_path', None),
'multiple_chunks': uploaded_file.multiple_chunks(),
'chunks': list(uploaded_file.chunks()) if uploaded_file.multiple_chunks() else [uploaded_file.read()]
}
@staticmethod
def save_file_to_path(uploaded_file, destination_path):
"""将上传文件保存到指定路径"""
with open(destination_path, 'wb+') as destination:
for chunk in uploaded_file.chunks():
destination.write(chunk)
@staticmethod
def create_content_file(content, name):
"""从内容创建文件对象"""
return ContentFile(content, name=name)
@staticmethod
def temporary_file_handling():
"""临时文件处理示例"""
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(b'Some temporary content')
temp_file_path = temp_file.name
# 将临时文件移动到Django存储
with open(temp_file_path, 'rb') as f:
content = f.read()
django_file = ContentFile(content)
# 保存到模型
# my_model.file.save('temp_file.txt', django_file)
# 清理临时文件
os.unlink(temp_file_path)
# 文件上传处理器
class FileUploadHandler:
"""文件上传处理器"""
def __init__(self, request):
self.request = request
def handle_single_file(self, field_name='file'):
"""处理单个文件上传"""
if field_name in self.request.FILES:
uploaded_file = self.request.FILES[field_name]
return self.process_file(uploaded_file)
return None
def handle_multiple_files(self, field_name='files'):
"""处理多个文件上传"""
files = []
if field_name in self.request.FILES:
for uploaded_file in self.request.FILES.getlist(field_name):
processed_file = self.process_file(uploaded_file)
files.append(processed_file)
return files
def process_file(self, uploaded_file):
"""处理单个上传文件"""
# 验证文件
self.validate_file(uploaded_file)
# 生成安全的文件名
safe_filename = self.generate_safe_filename(uploaded_file.name)
# 保存文件
saved_path = default_storage.save(safe_filename, uploaded_file)
return {
'original_name': uploaded_file.name,
'saved_name': safe_filename,
'saved_path': saved_path,
'size': uploaded_file.size,
'content_type': getattr(uploaded_file, 'content_type', '')
}
def validate_file(self, uploaded_file):
"""验证上传文件"""
# 检查文件大小
max_size = 10 * 1024 * 1024 # 10MB
if uploaded_file.size > max_size:
raise ValueError(f'文件太大,最大支持 {max_size / (1024*1024)}MB')
# 检查文件名安全性
if '..' in uploaded_file.name or '/' in uploaded_file.name:
raise ValueError('文件名包含非法字符')
def generate_safe_filename(self, original_filename):
"""生成安全的文件名"""
import uuid
import os
name, ext = os.path.splitext(original_filename)
safe_name = f"{uuid.uuid4().hex}{ext.lower()}"
return safe_name#文件存储接口
# 文件存储接口和实现
from django.core.files.storage import Storage
from django.conf import settings
import os
from urllib.parse import urljoin
class CustomStorage(Storage):
"""自定义存储后端示例"""
def __init__(self, location=None, base_url=None):
self.location = location or getattr(settings, 'CUSTOM_FILE_STORAGE_LOCATION', '/custom_storage/')
self.base_url = base_url or getattr(settings, 'CUSTOM_FILE_STORAGE_URL', '/custom-files/')
def _save(self, name, content):
"""保存文件"""
full_path = self.path(name)
# 创建目录
directory = os.path.dirname(full_path)
if not os.path.exists(directory):
os.makedirs(directory)
# 写入文件
with open(full_path, 'wb') as f:
for chunk in content.chunks():
f.write(chunk)
return name
def _open(self, name, mode='rb'):
"""打开文件"""
return open(self.path(name), mode)
def delete(self, name):
"""删除文件"""
full_path = self.path(name)
if os.path.exists(full_path):
os.remove(full_path)
def exists(self, name):
"""检查文件是否存在"""
return os.path.exists(self.path(name))
def url(self, name):
"""获取文件URL"""
return urljoin(self.base_url, name)
def size(self, name):
"""获取文件大小"""
return os.path.getsize(self.path(name))
def path(self, name):
"""获取文件系统路径"""
return os.path.join(self.location, name)
# 存储工厂
class StorageFactory:
"""存储工厂类"""
@staticmethod
def get_storage(storage_type='filesystem', **kwargs):
"""根据类型获取存储实例"""
if storage_type == 'filesystem':
from django.core.files.storage import FileSystemStorage
return FileSystemStorage(**kwargs)
elif storage_type == 'custom':
return CustomStorage(**kwargs)
elif storage_type == 's3':
try:
from storages.backends.s3boto3 import S3Boto3Storage
return S3Boto3Storage(**kwargs)
except ImportError:
raise ValueError("S3存储需要安装django-storages[boto3]")
else:
raise ValueError(f"不支持的存储类型: {storage_type}")
# 存储配置管理
class StorageConfig:
"""存储配置管理"""
@staticmethod
def get_media_storage():
"""获取媒体文件存储配置"""
storage_type = getattr(settings, 'MEDIA_STORAGE_TYPE', 'filesystem')
if storage_type == 'filesystem':
return {
'class': 'django.core.files.storage.FileSystemStorage',
'location': settings.MEDIA_ROOT,
'base_url': settings.MEDIA_URL
}
elif storage_type == 's3':
return {
'class': 'storages.backends.s3boto3.S3Boto3Storage',
'bucket_name': settings.AWS_STORAGE_BUCKET_NAME,
'region_name': settings.AWS_S3_REGION_NAME,
'file_overwrite': False
}
else:
return {
'class': 'django.core.files.storage.FileSystemStorage',
'location': settings.MEDIA_ROOT,
'base_url': settings.MEDIA_URL
}#文件字段与表单处理
#文件字段类型
# Django文件字段类型详解
from django.db import models
from django.core.files.storage import default_storage
import os
class FileFieldsExample(models.Model):
"""文件字段示例"""
# 基础文件字段
document = models.FileField(
upload_to='documents/',
max_length=500,
help_text='上传文档文件'
)
# 图像字段(需要Pillow)
image = models.ImageField(
upload_to='images/',
blank=True,
null=True,
help_text='上传图片文件'
)
# 自定义上传路径
custom_file = models.FileField(
upload_to=lambda instance, filename: f'uploads/{instance.user.id}/{filename}',
blank=True,
null=True
)
# 指定存储后端
stored_file = models.FileField(
upload_to='stored/',
storage=default_storage, # 可以指定自定义存储
blank=True,
null=True
)
# 限制文件类型
pdf_only = models.FileField(
upload_to='pdfs/',
validators=[validate_pdf_file], # 自定义验证器
blank=True,
null=True
)
def validate_pdf_file(value):
"""PDF文件验证器"""
import magic # 需要python-magic包
file_mime = magic.from_buffer(value.read(1024), mime=True)
value.seek(0) # 重置文件指针
if file_mime != 'application/pdf':
raise ValidationError('只允许上传PDF文件')
# 高级文件字段配置
class AdvancedFileFields(models.Model):
"""高级文件字段配置"""
# 按日期组织文件
dated_document = models.FileField(
upload_to='documents/%Y/%m/%d/', # 按年/月/日组织
help_text='按日期组织的文档'
)
# 按用户ID组织
user_document = models.FileField(
upload_to=lambda instance, filename: f'user_docs/{instance.owner.id}/{filename}',
help_text='按用户组织的文档'
)
# 按文件哈希组织(防重复)
hashed_file = models.FileField(
upload_to=lambda instance, filename: f'hashed/{hash_file_content(instance.file)}',
help_text='按内容哈希组织的文件'
)
def hash_file_content(uploaded_file):
"""根据文件内容生成哈希值"""
import hashlib
md5_hash = hashlib.md5()
for chunk in uploaded_file.chunks():
md5_hash.update(chunk)
return f"{md5_hash.hexdigest()[:8]}/{uploaded_file.name}"
# 文件字段验证
class ValidatedFileForm(forms.ModelForm):
"""带验证的文件表单"""
class Meta:
model = FileFieldsExample
fields = ['document', 'image']
def clean_document(self):
"""验证文档文件"""
document = self.cleaned_data.get('document')
if document:
# 检查文件大小
if document.size > 10 * 1024 * 1024: # 10MB
raise forms.ValidationError('文档文件不能超过10MB')
# 检查文件类型(基于扩展名)
import os
ext = os.path.splitext(document.name)[1].lower()
allowed_exts = ['.pdf', '.doc', '.docx', '.txt', '.xls', '.xlsx']
if ext not in allowed_exts:
raise forms.ValidationError(
f'不允许的文档类型: {ext}。支持: {", ".join(allowed_exts)}'
)
return document
def clean_image(self):
"""验证图像文件"""
image = self.cleaned_data.get('image')
if image:
# 检查文件大小
if image.size > 5 * 1024 * 1024: # 5MB
raise forms.ValidationError('图像文件不能超过5MB')
# 检查图像尺寸
from PIL import Image
img = Image.open(image)
width, height = img.size
if width > 4000 or height > 4000:
raise forms.ValidationError('图像尺寸过大,最大支持4000x4000像素')
return image#文件表单处理
# 高级文件表单处理
from django import forms
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
import os
import mimetypes
class AdvancedFileForm(forms.Form):
"""高级文件表单"""
title = forms.CharField(max_length=200)
description = forms.CharField(widget=forms.Textarea, required=False)
# 多文件上传
files = forms.FileField(
widget=forms.ClearableFileInput(attrs={
'multiple': True,
'accept': '.pdf,.doc,.docx,.jpg,.jpeg,.png'
}),
help_text='可以选择多个文件',
required=False
)
# 图像文件
image = forms.ImageField(
help_text='上传一张图片',
required=False
)
def clean_files(self):
"""验证多个文件"""
files = self.cleaned_data.get('files')
if files:
cleaned_files = []
for uploaded_file in files if hasattr(files, '__iter__') else [files]:
# 验证单个文件
self.validate_single_file(uploaded_file)
cleaned_files.append(uploaded_file)
return cleaned_files
return []
def validate_single_file(self, uploaded_file):
"""验证单个文件"""
# 检查文件大小
max_size = 10 * 1024 * 1024 # 10MB
if uploaded_file.size > max_size:
raise ValidationError(_(f'文件 {uploaded_file.name} 太大,最大支持 {max_size // (1024*1024)}MB'))
# 检查文件类型
allowed_types = {
'application/pdf': '.pdf',
'application/msword': '.doc',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
'image/jpeg': '.jpg',
'image/png': '.png',
'text/plain': '.txt'
}
# 获取MIME类型
mime_type = mimetypes.guess_type(uploaded_file.name)[0]
if not mime_type:
# 如果无法猜测,读取文件头
import magic
mime_type = magic.from_buffer(uploaded_file.read(1024), mime=True)
uploaded_file.seek(0) # 重置文件指针
if mime_type not in allowed_types:
raise ValidationError(_(f'不支持的文件类型: {mime_type}'))
# 检查扩展名是否匹配
_, ext = os.path.splitext(uploaded_file.name.lower())
expected_ext = allowed_types.get(mime_type)
if expected_ext and ext != expected_ext:
raise ValidationError(_(f'文件扩展名与内容类型不匹配'))
# 文件上传进度处理
class ProgressFileForm(forms.Form):
"""支持进度显示的文件表单"""
title = forms.CharField(max_length=200)
file = forms.FileField()
def save_with_progress(self, progress_callback=None):
"""带进度回调的文件保存"""
uploaded_file = self.cleaned_data['file']
# 创建临时文件来处理大文件
import tempfile
import shutil
temp_file_path = None
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file_path = temp_file.name
# 分块写入并报告进度
total_size = uploaded_file.size
bytes_written = 0
for chunk in uploaded_file.chunks():
temp_file.write(chunk)
bytes_written += len(chunk)
if progress_callback:
progress = (bytes_written / total_size) * 100
progress_callback(progress)
# 将临时文件移动到最终位置
final_path = default_storage.save(uploaded_file.name, open(temp_file_path, 'rb'))
# 创建数据库记录
from .models import Document
document = Document.objects.create(
title=self.cleaned_data['title'],
file=final_path
)
return document
finally:
# 清理临时文件
if temp_file_path and os.path.exists(temp_file_path):
os.unlink(temp_file_path)
# 动态文件表单
class DynamicFileForm(forms.Form):
"""动态文件表单"""
def __init__(self, *args, **kwargs):
# 从kwargs中获取文件字段数量
num_files = kwargs.pop('num_files', 1)
super().__init__(*args, **kwargs)
# 动态添加文件字段
for i in range(num_files):
self.fields[f'file_{i}'] = forms.FileField(
label=f'文件 {i+1}',
required=False
)
def clean(self):
"""验证所有动态文件字段"""
cleaned_data = super().clean()
for field_name, field_value in cleaned_data.items():
if field_name.startswith('file_') and field_value:
# 验证单个文件
self.validate_dynamic_file(field_value)
return cleaned_data
def validate_dynamic_file(self, uploaded_file):
"""验证动态添加的文件"""
# 实现文件验证逻辑
if uploaded_file.size > 5 * 1024 * 1024: # 5MB限制
raise ValidationError('文件太大')#文件存储配置
#基础存储配置
# settings.py - 基础文件存储配置
import os
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
# 媒体文件配置
MEDIA_URL = '/media/' # 媒体文件URL前缀
MEDIA_ROOT = os.path.join(BASE_DIR, 'media') # 媒体文件存储根目录
# 静态文件配置
STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(BASE_DIR, 'staticfiles')
STATICFILES_DIRS = [
os.path.join(BASE_DIR, 'static'),
]
# 文件上传配置
FILE_UPLOAD_MAX_MEMORY_SIZE = 2621440 # 2.5MB,超过此大小的文件将保存到临时文件
DATA_UPLOAD_MAX_MEMORY_SIZE = 2621440 # 2.5MB,POST数据的最大内存大小
FILE_UPLOAD_TEMP_DIR = os.path.join(BASE_DIR, 'tmp') # 临时文件目录
# 创建必要的目录
os.makedirs(MEDIA_ROOT, exist_ok=True)
os.makedirs(FILE_UPLOAD_TEMP_DIR, exist_ok=True)
# 存储后端配置
DEFAULT_FILE_STORAGE = 'django.core.files.storage.FileSystemStorage'
# DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage' # 用于云存储
# AWS S3存储配置示例(需要django-storages)
"""
AWS_ACCESS_KEY_ID = 'your-access-key'
AWS_SECRET_ACCESS_KEY = 'your-secret-key'
AWS_STORAGE_BUCKET_NAME = 'your-bucket-name'
AWS_S3_REGION_NAME = 'us-east-1'
AWS_S3_CUSTOM_DOMAIN = f'{AWS_STORAGE_BUCKET_NAME}.s3.amazonaws.com'
# 静态文件S3配置
STATICFILES_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
# 媒体文件S3配置
DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
"""#高级存储配置
# 高级存储配置类
from django.conf import settings
from django.core.files.storage import get_storage_class
from django.utils.module_loading import import_string
import os
class StorageManager:
"""存储管理器"""
def __init__(self):
self._storages = {}
def get_storage(self, storage_alias='default'):
"""获取存储实例"""
if storage_alias in self._storages:
return self._storages[storage_alias]
# 从设置中获取存储配置
storage_configs = getattr(settings, 'CUSTOM_FILE_STORAGES', {})
if storage_alias in storage_configs:
config = storage_configs[storage_alias]
storage_class = get_storage_class(config['class'])
storage_instance = storage_class(**config.get('options', {}))
else:
# 默认存储
storage_class = get_storage_class(settings.DEFAULT_FILE_STORAGE)
storage_instance = storage_class()
self._storages[storage_alias] = storage_instance
return storage_instance
def save_file(self, file_obj, filename, storage_alias='default'):
"""使用指定存储保存文件"""
storage = self.get_storage(storage_alias)
return storage.save(filename, file_obj)
def get_file_url(self, filename, storage_alias='default'):
"""获取文件URL"""
storage = self.get_storage(storage_alias)
return storage.url(filename)
# 存储配置示例
CUSTOM_FILE_STORAGES = {
'default': {
'class': 'django.core.files.storage.FileSystemStorage',
'options': {
'location': os.path.join(settings.BASE_DIR, 'media'),
'base_url': '/media/',
}
},
'temp': {
'class': 'django.core.files.storage.FileSystemStorage',
'options': {
'location': os.path.join(settings.BASE_DIR, 'temp_uploads'),
'base_url': '/temp-media/',
}
},
'secure': {
'class': 'django.core.files.storage.FileSystemStorage',
'options': {
'location': os.path.join(settings.BASE_DIR, 'secure_files'),
'base_url': '/secure-files/', # 实际URL可能需要保护
}
}
}
# 使用存储管理器
storage_manager = StorageManager()
# 存储策略配置
class StorageStrategy:
"""存储策略"""
@staticmethod
def get_strategy(file_type, file_size):
"""根据文件类型和大小选择存储策略"""
if file_type.startswith('image/'):
if file_size < 1024 * 1024: # 1MB以下
return 'default'
else:
return 'high_capacity'
elif file_type.startswith('video/'):
return 'high_capacity'
elif file_type == 'application/pdf':
if file_size > 10 * 1024 * 1024: # 10MB以上
return 'cloud'
else:
return 'default'
else:
return 'default'
# 文件组织策略
class FileOrganizer:
"""文件组织策略"""
@staticmethod
def organize_by_date(filename, date=None):
"""按日期组织文件"""
from datetime import datetime
if date is None:
date = datetime.now()
year = date.strftime('%Y')
month = date.strftime('%m')
day = date.strftime('%d')
return f"{year}/{month}/{day}/{filename}"
@staticmethod
def organize_by_user(filename, user_id):
"""按用户组织文件"""
return f"user_{user_id}/{filename}"
@staticmethod
def organize_by_content_type(filename, content_type):
"""按内容类型组织文件"""
type_mapping = {
'image/jpeg': 'images',
'image/png': 'images',
'image/gif': 'images',
'application/pdf': 'documents',
'application/msword': 'documents',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'documents',
'video/mp4': 'videos',
'audio/mpeg': 'audio'
}
folder = type_mapping.get(content_type, 'others')
return f"{folder}/{filename}"
@staticmethod
def organize_by_size(filename, file_size):
"""按文件大小组织文件"""
if file_size < 1024 * 1024: # 1MB
size_folder = 'small'
elif file_size < 10 * 1024 * 1024: # 10MB
size_folder = 'medium'
else:
size_folder = 'large'
return f"{size_folder}/{filename}"
# 综合文件路径生成器
class SmartUploadPath:
"""智能上传路径生成器"""
def __init__(self, base_path='', organization_strategies=None):
self.base_path = base_path
self.strategies = organization_strategies or [
FileOrganizer.organize_by_date,
FileOrganizer.organize_by_content_type,
]
def __call__(self, instance, filename):
"""生成上传路径"""
# 获取文件信息
content_type = getattr(instance, 'content_type', '')
file_size = getattr(instance, 'file_size', 0)
# 应用组织策略
organized_path = filename
for strategy in self.strategies:
if callable(strategy):
if strategy.__name__ == 'organize_by_content_type':
organized_path = strategy(organized_path, content_type)
elif strategy.__name__ == 'organize_by_size':
organized_path = strategy(organized_path, file_size)
elif strategy.__name__ == 'organize_by_date':
organized_path = strategy(organized_path)
elif strategy.__name__ == 'organize_by_user':
user_id = getattr(instance, 'user_id', 1)
organized_path = strategy(organized_path, user_id)
return f"{self.base_path}{organized_path}"
# 使用示例
smart_path = SmartUploadPath('uploads/', [
FileOrganizer.organize_by_date,
FileOrganizer.organize_by_content_type,
])
class SmartFileModel(models.Model):
"""使用智能路径的文件模型"""
title = models.CharField(max_length=200)
file = models.FileField(upload_to=smart_path)
content_type = models.CharField(max_length=100, blank=True)
file_size = models.PositiveIntegerField(null=True, blank=True)
uploaded_at = models.DateTimeField(auto_now_add=True)#存储性能优化
# 存储性能优化配置
import threading
from concurrent.futures import ThreadPoolExecutor
import asyncio
from django.core.files.storage import default_storage
class OptimizedStorage:
"""优化的存储类"""
def __init__(self, storage_backend=None):
self.storage = storage_backend or default_storage
self._thread_local = threading.local()
self.executor = ThreadPoolExecutor(max_workers=4)
def save_async(self, name, content):
"""异步保存文件"""
future = self.executor.submit(self.storage.save, name, content)
return future
def save_batch(self, file_pairs):
"""批量保存文件"""
futures = []
for name, content in file_pairs:
future = self.executor.submit(self.storage.save, name, content)
futures.append(future)
results = []
for future in futures:
results.append(future.result())
return results
def get_cached_url(self, name, cache_timeout=3600):
"""获取带缓存的文件URL"""
from django.core.cache import cache
cache_key = f"file_url_{name}"
cached_url = cache.get(cache_key)
if cached_url is None:
cached_url = self.storage.url(name)
cache.set(cache_key, cached_url, timeout=cache_timeout)
return cached_url
# 缓存存储装饰器
def cached_storage_method(method):
"""存储方法缓存装饰器"""
def wrapper(self, name, *args, **kwargs):
from django.core.cache import cache
cache_key = f"storage_{method.__name__}_{name}"
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
result = method(self, name, *args, **kwargs)
cache.set(cache_key, result, timeout=300) # 缓存5分钟
return result
return wrapper
class CachedStorage:
"""带缓存的存储类"""
def __init__(self, storage_backend=None):
self.storage = storage_backend or default_storage
@cached_storage_method
def url(self, name):
"""获取文件URL(带缓存)"""
return self.storage.url(name)
@cached_storage_method
def size(self, name):
"""获取文件大小(带缓存)"""
return self.storage.size(name)
def exists(self, name):
"""检查文件是否存在(带缓存)"""
from django.core.cache import cache
cache_key = f"storage_exists_{name}"
cached_result = cache.get(cache_key)
if cached_result is not None:
return cached_result
result = self.storage.exists(name)
cache.set(cache_key, result, timeout=300)
return result
# 存储监控
class StorageMonitor:
"""存储监控类"""
def __init__(self):
self.stats = {
'files_saved': 0,
'files_deleted': 0,
'total_size': 0,
'average_size': 0
}
self.lock = threading.Lock()
def record_save(self, file_size):
"""记录文件保存"""
with self.lock:
self.stats['files_saved'] += 1
self.stats['total_size'] += file_size
self.stats['average_size'] = self.stats['total_size'] / self.stats['files_saved']
def record_delete(self, file_size):
"""记录文件删除"""
with self.lock:
self.stats['files_deleted'] += 1
self.stats['total_size'] -= file_size
if self.stats['files_saved'] > self.stats['files_deleted']:
self.stats['average_size'] = (self.stats['total_size'] + file_size) / (self.stats['files_saved'] - self.stats['files_deleted'])
def get_stats(self):
"""获取统计信息"""
return self.stats.copy()
# 全局存储监控器
storage_monitor = StorageMonitor()#安全文件上传
#文件验证安全
# 文件安全验证
import os
import magic # 需要python-magic包
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
class SecureFileValidator:
"""安全文件验证器"""
def __init__(self, allowed_extensions=None, allowed_mimes=None, max_size=None):
self.allowed_extensions = allowed_extensions or [
'.jpg', '.jpeg', '.png', '.gif', # 图像
'.pdf', '.doc', '.docx', '.txt', # 文档
'.mp3', '.wav', '.ogg', # 音频
'.mp4', '.avi', '.mov' # 视频
]
self.allowed_mimes = allowed_mimes or [
'image/jpeg', 'image/png', 'image/gif',
'application/pdf',
'application/msword',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
'text/plain',
'audio/mpeg', 'audio/wav', 'audio/ogg',
'video/mp4', 'video/x-msvideo', 'video/quicktime'
]
self.max_size = max_size or 10 * 1024 * 1024 # 10MB
def validate(self, uploaded_file):
"""执行完整验证"""
self.validate_size(uploaded_file)
self.validate_extension(uploaded_file)
self.validate_mime_type(uploaded_file)
self.validate_content(uploaded_file)
self.validate_filename(uploaded_file)
def validate_size(self, uploaded_file):
"""验证文件大小"""
if uploaded_file.size > self.max_size:
raise ValidationError(
_('文件大小超过限制 (%(size)s MB)'),
params={'size': self.max_size / (1024*1024)},
)
def validate_extension(self, uploaded_file):
"""验证文件扩展名"""
_, ext = os.path.splitext(uploaded_file.name.lower())
if ext not in self.allowed_extensions:
raise ValidationError(
_('不允许的文件扩展名: %(ext)s'),
params={'ext': ext},
)
def validate_mime_type(self, uploaded_file):
"""验证MIME类型(双重检查)"""
# 基于文件内容的真实MIME类型
real_mime = magic.from_buffer(uploaded_file.read(1024), mime=True)
uploaded_file.seek(0) # 重置文件指针
if real_mime not in self.allowed_mimes:
raise ValidationError(
_('不允许的文件类型: %(mime)s'),
params={'mime': real_mime},
)
# 基于扩展名的MIME类型
import mimetypes
ext_mime = mimetypes.guess_type(uploaded_file.name)[0]
if ext_mime and ext_mime not in self.allowed_mimes:
raise ValidationError(
_('扩展名对应的文件类型不允许: %(mime)s'),
params={'mime': ext_mime},
)
def validate_content(self, uploaded_file):
"""验证文件内容(防恶意代码)"""
# 检查文件头
header = uploaded_file.read(1024)
uploaded_file.seek(0) # 重置
# 检查是否包含HTML/JS标签(常见攻击向量)
suspicious_patterns = [
b'<script', b'javascript:', b'vbscript:', b'<iframe',
b'<object', b'<embed', b'<?php', b'<?', b'<%', # 服务端包含
]
header_lower = header.lower()
for pattern in suspicious_patterns:
if pattern in header_lower:
raise ValidationError(_('文件可能包含恶意代码'))
def validate_filename(self, uploaded_file):
"""验证文件名安全性"""
filename = uploaded_file.name
# 检查路径遍历攻击
if '..' in filename or '~' in filename:
raise ValidationError(_('文件名包含非法字符'))
# 检查危险扩展名
dangerous_extensions = [
'.exe', '.bat', '.com', '.scr', '.vbs', '.js', '.jar',
'.pif', '.html', '.htm', '.php', '.asp', '.aspx'
]
_, ext = os.path.splitext(filename.lower())
if ext in dangerous_extensions:
raise ValidationError(_('文件扩展名可能存在安全风险'))
# 使用安全验证器的表单
class SecureFileForm(forms.Form):
"""安全文件表单"""
title = forms.CharField(max_length=200)
file = forms.FileField()
def __init__(self, *args, **kwargs):
self.validator = SecureFileValidator(
max_size=5 * 1024 * 1024, # 5MB
allowed_extensions=['.jpg', '.jpeg', '.png', '.pdf', '.doc', '.docx']
)
super().__init__(*args, **kwargs)
def clean_file(self):
"""安全验证上传文件"""
uploaded_file = self.cleaned_data.get('file')
if uploaded_file:
self.validator.validate(uploaded_file)
return uploaded_file
# 模型级别的安全验证
class SecureDocument(models.Model):
"""安全文档模型"""
title = forms.CharField(max_length=200)
file = models.FileField(upload_to='secure_docs/')
uploaded_at = models.DateTimeField(auto_now_add=True)
verified = models.BooleanField(default=False) # 是否通过安全检查
def save(self, *args, **kwargs):
"""保存前进行安全检查"""
if self.file:
validator = SecureFileValidator()
# 验证上传的文件
if hasattr(self.file, 'file'): # 如果是已上传的文件
validator.validate(self.file.file)
super().save(*args, **kwargs)
# 异步安全扫描
self.async_security_scan()
def async_security_scan(self):
"""异步安全扫描"""
# 这里可以集成病毒扫描、内容审核等
from django.core.mail import mail_admins
import threading
def scan_task():
try:
# 模拟安全扫描
import time
time.sleep(1) # 模拟扫描时间
# 如果发现安全问题,通知管理员
# mail_admins("安全警告", f"发现可疑文件: {self.file.name}")
# 更新验证状态
self.verified = True
self.save(update_fields=['verified'])
except Exception as e:
# 记录错误
import logging
logger = logging.getLogger(__name__)
logger.error(f"安全扫描失败: {e}")
# 在后台线程中执行扫描
thread = threading.Thread(target=scan_task)
thread.daemon = True
thread.start()#访问控制
# 文件访问控制
from django.http import HttpResponse, Http404, HttpResponseForbidden
from django.contrib.auth.decorators import login_required
from django.utils.decorators import method_decorator
from django.views.generic import View
from django.contrib.auth.mixins import LoginRequiredMixin
import os
from django.conf import settings
class SecureFileDownloadView(LoginRequiredMixin, View):
"""安全文件下载视图"""
def get(self, request, file_path):
"""安全下载文件"""
# 验证用户权限
if not self.has_permission(request.user, file_path):
return HttpResponseForbidden("无权访问此文件")
# 构建安全的文件路径
safe_path = self.get_safe_file_path(file_path)
if not safe_path or not os.path.exists(safe_path):
raise Http404("文件不存在")
# 检查文件类型,防止执行脚本
if self.is_dangerous_file(safe_path):
return HttpResponseForbidden("禁止下载此类型文件")
# 读取并返回文件
with open(safe_path, 'rb') as f:
response = HttpResponse(f.read(), content_type='application/octet-stream')
response['Content-Disposition'] = f'attachment; filename="{os.path.basename(safe_path)}"'
return response
def has_permission(self, user, file_path):
"""检查用户是否有权限访问文件"""
# 实现权限检查逻辑
# 例如:检查文件是否属于用户,或用户是否有相应角色
from .models import Document
try:
document = Document.objects.get(file=file_path)
# 检查文档是否属于当前用户或用户有相应权限
return (document.owner == user or
user.is_staff or
user.has_perm('myapp.download_document'))
except Document.DoesNotExist:
return False
def get_safe_file_path(self, file_path):
"""获取安全的文件系统路径"""
# 防止路径遍历攻击
if '..' in file_path or file_path.startswith('/'):
return None
# 构建完整路径
full_path = os.path.join(settings.MEDIA_ROOT, file_path)
# 规范化路径
normalized_path = os.path.normpath(full_path)
# 确保路径在媒体目录内
if not normalized_path.startswith(os.path.normpath(settings.MEDIA_ROOT)):
return None
return normalized_path
def is_dangerous_file(self, file_path):
"""检查是否为危险文件类型"""
dangerous_extensions = [
'.exe', '.bat', '.com', '.scr', '.vbs', '.js', '.jar',
'.pif', '.hta', '.msi', '.cmd', '.ps1', '.sh'
]
_, ext = os.path.splitext(file_path.lower())
return ext in dangerous_extensions
# 带权限检查的模型
class AccessControlledFile(models.Model):
"""访问控制文件模型"""
title = models.CharField(max_length=200)
file = models.FileField(upload_to='controlled/')
owner = models.ForeignKey('auth.User', on_delete=models.CASCADE)
access_level = models.CharField(
max_length=20,
choices=[
('private', '私有'),
('shared', '共享'),
('public', '公开'),
],
default='private'
)
allowed_users = models.ManyToManyField('auth.User', related_name='allowed_files', blank=True)
allowed_groups = models.ManyToManyField('auth.Group', blank=True)
uploaded_at = models.DateTimeField(auto_now_add=True)
def can_access(self, user):
"""检查用户是否有访问权限"""
if self.access_level == 'public':
return True
if user == self.owner:
return True
if self.access_level == 'shared':
if user in self.allowed_users.all():
return True
if user.groups.filter(id__in=self.allowed_groups.values_list('id', flat=True)).exists():
return True
return user.is_staff
# 访问控制装饰器
def require_file_permission(permission_level='read'):
"""文件权限装饰器"""
def decorator(view_func):
def wrapper(request, *args, **kwargs):
file_id = kwargs.get('file_id') or request.GET.get('file_id')
if file_id:
try:
from .models import AccessControlledFile
file_obj = AccessControlledFile.objects.get(id=file_id)
if not file_obj.can_access(request.user):
return HttpResponseForbidden("无权访问此文件")
if permission_level == 'download' and not file_obj.can_download(request.user):
return HttpResponseForbidden("无权下载此文件")
except AccessControlledFile.DoesNotExist:
return Http404("文件不存在")
return view_func(request, *args, **kwargs)
return wrapper
return decorator
# 临时访问链接
import uuid
from datetime import datetime, timedelta
class TemporaryAccessLink(models.Model):
"""临时访问链接"""
file = models.ForeignKey(AccessControlledFile, on_delete=models.CASCADE)
access_key = models.UUIDField(default=uuid.uuid4, unique=True)
created_by = models.ForeignKey('auth.User', on_delete=models.CASCADE)
expires_at = models.DateTimeField()
max_downloads = models.PositiveIntegerField(default=1)
downloads_count = models.PositiveIntegerField(default=0)
is_active = models.BooleanField(default=True)
@classmethod
def create_link(cls, file, creator, hours=24, max_downloads=1):
"""创建临时访问链接"""
return cls.objects.create(
file=file,
created_by=creator,
expires_at=datetime.now() + timedelta(hours=hours),
max_downloads=max_downloads
)
def is_valid(self):
"""检查链接是否有效"""
return (self.is_active and
self.expires_at > datetime.now() and
self.downloads_count < self.max_downloads)
def use_link(self):
"""使用链接(增加下载计数)"""
if self.is_valid():
self.downloads_count += 1
if self.downloads_count >= self.max_downloads:
self.is_active = False
self.save()
return True
return False
# 临时链接视图
class TemporaryDownloadView(View):
"""临时链接下载视图"""
def get(self, request, access_key):
try:
link = TemporaryAccessLink.objects.get(access_key=access_key)
if not link.is_valid():
return HttpResponseForbidden("链接已失效")
# 使用链接
if not link.use_link():
return HttpResponseForbidden("无法使用链接")
# 返回文件
file_path = link.file.file.path
with open(file_path, 'rb') as f:
response = HttpResponse(f.read(), content_type='application/octet-stream')
response['Content-Disposition'] = f'attachment; filename="{os.path.basename(file_path)}"'
return response
except TemporaryAccessLink.DoesNotExist:
return Http404("链接不存在")#云存储集成
#AWS S3集成
# AWS S3存储集成
"""
pip install django-storages[boto3]
pip install boto3
"""
# settings.py 配置
"""
# AWS配置
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
AWS_STORAGE_BUCKET_NAME = os.environ.get('AWS_STORAGE_BUCKET_NAME')
AWS_S3_REGION_NAME = os.environ.get('AWS_S3_REGION_NAME', 'us-east-1')
AWS_S3_CUSTOM_DOMAIN = f'{AWS_STORAGE_BUCKET_NAME}.s3.amazonaws.com'
# S3设置
AWS_S3_OBJECT_PARAMETERS = {
'CacheControl': 'max-age=86400',
}
AWS_DEFAULT_ACL = 'public-read'
AWS_S3_VERIFY = True
AWS_S3_REGION_NAME = AWS_S3_REGION_NAME
# 静态文件S3存储
STATICFILES_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
# 媒体文件S3存储
DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
"""
# S3存储配置类
class S3StorageConfig:
"""S3存储配置"""
@staticmethod
def get_public_media_settings():
"""公共媒体文件设置"""
return {
'AWS_STORAGE_BUCKET_NAME': os.environ.get('AWS_PUBLIC_BUCKET'),
'AWS_S3_CUSTOM_DOMAIN': f'{os.environ.get("AWS_PUBLIC_BUCKET")}.s3.amazonaws.com',
'AWS_DEFAULT_ACL': 'public-read',
'AWS_S3_OBJECT_PARAMETERS': {
'CacheControl': 'max-age=86400',
},
}
@staticmethod
def get_private_media_settings():
"""私有媒体文件设置"""
return {
'AWS_STORAGE_BUCKET_NAME': os.environ.get('AWS_PRIVATE_BUCKET'),
'AWS_DEFAULT_ACL': 'private',
'AWS_S3_FILE_OVERWRITE': False,
'AWS_S3_OBJECT_PARAMETERS': {
'CacheControl': 'max-age=60',
},
}
# S3文件操作类
import boto3
from botocore.exceptions import ClientError
from django.core.files.storage import default_storage
class S3FileManager:
"""S3文件管理器"""
def __init__(self):
self.s3_client = boto3.client(
's3',
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_S3_REGION_NAME
)
self.bucket_name = settings.AWS_STORAGE_BUCKET_NAME
def upload_file(self, file_obj, s3_key, extra_args=None):
"""上传文件到S3"""
try:
self.s3_client.upload_fileobj(
file_obj,
self.bucket_name,
s3_key,
ExtraArgs=extra_args or {
'ContentType': getattr(file_obj, 'content_type', 'binary/octet-stream'),
'ACL': 'public-read'
}
)
return f"https://{self.bucket_name}.s3.{settings.AWS_S3_REGION_NAME}.amazonaws.com/{s3_key}"
except ClientError as e:
raise Exception(f"S3上传失败: {e}")
def download_file(self, s3_key, local_path):
"""从S3下载文件"""
try:
self.s3_client.download_file(self.bucket_name, s3_key, local_path)
except ClientError as e:
raise Exception(f"S3下载失败: {e}")
def generate_presigned_url(self, s3_key, expiration=3600):
"""生成预签名URL"""
try:
return self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': s3_key},
ExpiresIn=expiration
)
except ClientError as e:
raise Exception(f"生成预签名URL失败: {e}")
def delete_file(self, s3_key):
"""删除S3文件"""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=s3_key)
return True
except ClientError as e:
raise Exception(f"S3删除失败: {e}")
def get_file_info(self, s3_key):
"""获取文件信息"""
try:
response = self.s3_client.head_object(Bucket=self.bucket_name, Key=s3_key)
return {
'size': response['ContentLength'],
'last_modified': response['LastModified'],
'content_type': response['ContentType'],
'etag': response['ETag'].strip('"')
}
except ClientError as e:
raise Exception(f"获取文件信息失败: {e}")
# S3优化的模型字段
from storages.backends.s3boto3 import S3Boto3Storage
class PublicMediaStorage(S3Boto3Storage):
"""公共媒体存储"""
location = 'media'
default_acl = 'public-read'
file_overwrite = False
signature_version = 's3v4'
class PrivateMediaStorage(S3Boto3Storage):
"""私有媒体存储"""
location = 'private'
default_acl = 'private'
file_overwrite = False
signature_version = 's3v4'
# 使用S3存储的模型
class S3Document(models.Model):
"""S3文档模型"""
title = models.CharField(max_length=200)
public_file = models.FileField(
upload_to='public_docs/',
storage=PublicMediaStorage(),
blank=True,
null=True
)
private_file = models.FileField(
upload_to='private_docs/',
storage=PrivateMediaStorage(),
blank=True,
null=True
)
uploaded_at = models.DateTimeField(auto_now_add=True)
# S3信号处理器
@receiver(post_save, sender=S3Document)
def optimize_s3_file(sender, instance, **kwargs):
"""优化S3文件处理"""
s3_manager = S3FileManager()
# 为新上传的文件设置优化参数
if instance.public_file:
# 可以在这里进行文件优化,如压缩、转码等
pass
if instance.private_file:
# 设置私有文件的额外安全参数
pass#其他云存储
# Google Cloud Storage集成
"""
pip install django-storages[google]
pip install google-cloud-storage
"""
# settings.py 配置
"""
# Google Cloud配置
GS_BUCKET_NAME = 'your-gcs-bucket'
DEFAULT_FILE_STORAGE = 'storages.backends.gcloud.GoogleCloudStorage'
GS_DEFAULT_ACL = 'publicRead'
"""
# Azure Storage集成
"""
pip install django-storages[azure]
pip install azure-storage-blob
"""
# settings.py 配置
"""
# Azure配置
AZURE_ACCOUNT_NAME = 'your-account-name'
AZURE_ACCOUNT_KEY = 'your-account-key'
AZURE_CONTAINER = 'your-container'
DEFAULT_FILE_STORAGE = 'storages.backends.azure_storage.AzureStorage'
"""
# 多云存储策略
class MultiCloudStorage:
"""多云存储策略"""
def __init__(self):
self.storages = {}
self._initialize_storages()
def _initialize_storages(self):
"""初始化各种云存储"""
# AWS S3
try:
from storages.backends.s3boto3 import S3Boto3Storage
self.storages['aws'] = S3Boto3Storage()
except ImportError:
pass
# Google Cloud
try:
from storages.backends.gcloud import GoogleCloudStorage
self.storages['gcs'] = GoogleCloudStorage()
except ImportError:
pass
# Azure
try:
from storages.backends.azure_storage import AzureStorage
self.storages['azure'] = AzureStorage()
except ImportError:
pass
def select_storage(self, file_info):
"""根据文件信息选择最优存储"""
file_size = file_info.get('size', 0)
content_type = file_info.get('content_type', '')
# 策略:大文件使用AWS,小文件使用GCS,特定类型使用Azure
if file_size > 100 * 1024 * 1024: # 100MB+
return self.storages.get('aws')
elif content_type.startswith('image/'):
return self.storages.get('gcs')
else:
return self.storages.get('azure') or list(self.storages.values())[0] if self.storages else None
def save_file(self, file_obj, filename, **file_info):
"""智能保存文件到最优存储"""
storage = self.select_storage(file_info)
if storage:
return storage.save(filename, file_obj)
else:
# 降级到默认存储
return default_storage.save(filename, file_obj)
# CDN集成
class CDNIntegration:
"""CDN集成"""
def __init__(self, cdn_domain=None):
self.cdn_domain = cdn_domain or getattr(settings, 'CDN_DOMAIN', None)
def get_optimized_url(self, file_url):
"""获取CDN优化的URL"""
if self.cdn_domain:
# 替换为CDN域名
import re
# 将原始存储URL替换为CDN URL
return file_url.replace(settings.MEDIA_URL.lstrip('/'), f'https://{self.cdn_domain}/')
return file_url
def invalidate_cache(self, file_path):
"""使CDN缓存失效"""
if self.cdn_domain:
# 这里集成具体的CDN缓存清除API
# 例如Cloudflare、Akamai、AWS CloudFront等
pass
# 全局CDN管理器
cdn_manager = CDNIntegration()#文件处理与优化
#图像处理优化
# 图像处理和优化
from PIL import Image
from io import BytesIO
import os
from django.core.files.base import ContentFile
class ImageProcessor:
"""图像处理器"""
@staticmethod
def resize_image(image_file, max_width=800, max_height=600, quality=85):
"""调整图像大小"""
# 打开图像
img = Image.open(image_file)
# 保持宽高比
img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
# 创建输出缓冲区
output = BytesIO()
# 保存图像
img_format = img.format or 'JPEG'
img.save(output, format=img_format, quality=quality, optimize=True)
# 创建ContentFile
output.seek(0)
resized_file = ContentFile(output.read(), name=image_file.name)
return resized_file
@staticmethod
def compress_image(image_file, target_size_kb=500):
"""压缩图像到目标大小"""
img = Image.open(image_file)
# 二分查找合适的质量
low_quality, high_quality = 10, 95
best_file = None
while low_quality <= high_quality:
mid_quality = (low_quality + high_quality) // 2
output = BytesIO()
img.save(output, format='JPEG', quality=mid_quality, optimize=True)
size_kb = len(output.getvalue()) / 1024
if size_kb <= target_size_kb:
best_file = ContentFile(output.getvalue(), name=image_file.name)
low_quality = mid_quality + 1
else:
high_quality = mid_quality - 1
return best_file or image_file
@staticmethod
def crop_image(image_file, box):
"""裁剪图像"""
img = Image.open(image_file)
cropped_img = img.crop(box) # box: (left, top, right, bottom)
output = BytesIO()
cropped_img.save(output, format=img.format)
output.seek(0)
return ContentFile(output.read(), name=image_file.name)
@staticmethod
def add_watermark(image_file, watermark_path, position='bottom-right'):
"""添加水印"""
img = Image.open(image_file)
watermark = Image.open(watermark_path)
# 调整水印大小
watermark_width, watermark_height = watermark.size
img_width, img_height = img.size
# 根据位置放置水印
positions = {
'top-left': (10, 10),
'top-right': (img_width - watermark_width - 10, 10),
'bottom-left': (10, img_height - watermark_height - 10),
'bottom-right': (img_width - watermark_width - 10, img_height - watermark_height - 10),
'center': ((img_width - watermark_width) // 2, (img_height - watermark_height) // 2)
}
position_coords = positions.get(position, positions['bottom-right'])
# 粘贴水印
if watermark.mode != 'RGBA':
watermark = watermark.convert('RGBA')
img.paste(watermark, position_coords, watermark)
output = BytesIO()
img.save(output, format=img.format)
output.seek(0)
return ContentFile(output.read(), name=image_file.name)
# 图像模型和信号
class ProcessedImage(models.Model):
"""处理后图像模型"""
title = models.CharField(max_length=200)
original_image = models.ImageField(upload_to='originals/')
processed_image = models.ImageField(upload_to='processed/', blank=True, null=True)
thumbnail = models.ImageField(upload_to='thumbnails/', blank=True, null=True)
width = models.PositiveIntegerField(null=True, blank=True)
height = models.PositiveIntegerField(null=True, blank=True)
file_size = models.PositiveIntegerField(null=True, blank=True)
def save(self, *args, **kwargs):
"""保存时处理图像"""
super().save(*args, **kwargs)
if self.original_image and not self.processed_image:
self.process_and_save_images()
def process_and_save_images(self):
"""处理并保存图像的各种版本"""
# 生成处理后的图像
processed_file = ImageProcessor.resize_image(
self.original_image.file,
max_width=1200,
max_height=800
)
# 保存处理后的图像
self.processed_image.save(
f"processed_{self.original_image.name}",
processed_file,
save=False
)
# 生成缩略图
thumbnail_file = ImageProcessor.resize_image(
self.original_image.file,
max_width=200,
max_height=200
)
self.thumbnail.save(
f"thumb_{self.original_image.name}",
thumbnail_file,
save=False
)
# 更新尺寸信息
img = Image.open(self.original_image.path)
self.width, self.height = img.size
# 保存所有更改
super().save(update_fields=['processed_image', 'thumbnail', 'width', 'height'])
# 图像处理信号
@receiver(post_save, sender=ProcessedImage)
def process_image_on_save(sender, instance, created, **kwargs):
"""保存时处理图像"""
if created and instance.original_image:
# 在后台线程中处理图像以避免阻塞
import threading
def process_images():
instance.process_and_save_images()
thread = threading.Thread(target=process_images)
thread.daemon = True
thread.start()
# 批量图像处理器
class BulkImageProcessor:
"""批量图像处理器"""
@staticmethod
def process_image_batch(image_files, processing_options):
"""批量处理图像"""
processed_files = []
for image_file in image_files:
processed_file = BulkImageProcessor.process_single_image(
image_file,
processing_options
)
processed_files.append(processed_file)
return processed_files
@staticmethod
def process_single_image(image_file, options):
"""处理单个图像"""
img = Image.open(image_file)
# 应用各种处理选项
if options.get('resize'):
size = options['resize']
img = img.resize(size, Image.Resampling.LANCZOS)
if options.get('crop'):
box = options['crop']
img = img.crop(box)
if options.get('rotate'):
angle = options['rotate']
img = img.rotate(angle, expand=True)
if options.get('format'):
target_format = options['format']
else:
target_format = img.format or 'JPEG'
output = BytesIO()
img.save(output, format=target_format, quality=options.get('quality', 85))
output.seek(0)
return ContentFile(output.read(), name=image_file.name)#文件转换和格式化
# 文件转换和格式化
import subprocess
import fitz # PyMuPDF
from PIL import Image
import pandas as pd
import json
class FileConverter:
"""文件转换器"""
@staticmethod
def image_to_pdf(image_file):
"""图像转PDF"""
img = Image.open(image_file)
# 创建PDF
pdf_output = BytesIO()
img.save(pdf_output, 'PDF', resolution=100.0, save_all=False)
pdf_output.seek(0)
return ContentFile(pdf_output.read(), name=f"{image_file.name.rsplit('.', 1)[0]}.pdf")
@staticmethod
def pdf_to_images(pdf_file, dpi=200):
"""PDF转图像(每页一张图)"""
doc = fitz.open(stream=pdf_file.read(), filetype="pdf")
images = []
for page_num in range(len(doc)):
page = doc[page_num]
mat = fitz.Matrix(dpi/72, dpi/72) # 72是PDF的默认DPI
pix = page.get_pixmap(matrix=mat)
img_data = pix.tobytes("png")
img_file = ContentFile(img_data, name=f"page_{page_num + 1}.png")
images.append(img_file)
return images
@staticmethod
def office_to_pdf(office_file):
"""Office文档转PDF(需要LibreOffice)"""
# 注意:这需要系统安装LibreOffice
input_path = office_file.temporary_file_path() if hasattr(office_file, 'temporary_file_path') else None
if not input_path:
# 如果没有临时路径,先保存到临时文件
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(office_file.name)[1]) as tmp:
for chunk in office_file.chunks():
tmp.write(chunk)
input_path = tmp.name
try:
output_dir = os.path.dirname(input_path)
subprocess.run([
'libreoffice', '--headless', '--convert-to', 'pdf',
'--outdir', output_dir, input_path
], check=True)
pdf_path = input_path.rsplit('.', 1)[0] + '.pdf'
with open(pdf_path, 'rb') as pdf_file:
result = ContentFile(pdf_file.read(), name=f"{office_file.name.rsplit('.', 1)[0]}.pdf")
# 清理临时文件
os.unlink(input_path)
if os.path.exists(pdf_path):
os.unlink(pdf_path)
return result
except subprocess.CalledProcessError:
raise Exception("文档转换失败,可能需要安装LibreOffice")
except FileNotFoundError:
raise Exception("找不到LibreOffice,请确保已安装")
@staticmethod
def csv_to_excel(csv_file):
"""CSV转Excel"""
df = pd.read_csv(csv_file)
excel_output = BytesIO()
with pd.ExcelWriter(excel_output, engine='openpyxl') as writer:
df.to_excel(writer, index=False, sheet_name='Sheet1')
excel_output.seek(0)
return ContentFile(
excel_output.read(),
name=f"{csv_file.name.rsplit('.', 1)[0]}.xlsx"
)
@staticmethod
def json_to_csv(json_file):
"""JSON转CSV"""
import csv
# 读取JSON数据
content = json_file.read()
if isinstance(content, bytes):
content = content.decode('utf-8')
data = json.loads(content)
# 转换为CSV
csv_output = BytesIO()
if isinstance(data, list) and len(data) > 0:
fieldnames = data[0].keys()
writer = csv.DictWriter(csv_output, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
else:
raise ValueError("JSON数据格式不正确,应该是对象数组")
csv_output.seek(0)
return ContentFile(
csv_output.read(),
name=f"{json_file.name.rsplit('.', 1)[0]}.csv"
)
# 文件转换模型
class ConvertedFile(models.Model):
"""转换后文件模型"""
ORIGINAL = 'original'
PROCESSED = 'processed'
CONVERSION_CHOICES = [
(ORIGINAL, '原始文件'),
(PROCESSED, '处理后文件'),
]
original_file = models.FileField(upload_to='originals/')
converted_file = models.FileField(upload_to='converted/', blank=True, null=True)
conversion_type = models.CharField(max_length=50, choices=CONVERSION_CHOICES)
file_format = models.CharField(max_length=10, blank=True)
converted_at = models.DateTimeField(auto_now_add=True)
def convert_file(self, target_format):
"""转换文件格式"""
converter = FileConverter()
if target_format == 'pdf' and self.original_file.name.endswith(('.jpg', '.jpeg', '.png')):
converted_file = converter.image_to_pdf(self.original_file.file)
elif target_format == 'jpg' and self.original_file.name.endswith('.pdf'):
images = converter.pdf_to_images(self.original_file.file)
# 只取第一页作为示例
if images:
converted_file = images[0]
else:
raise ValueError("PDF转换失败")
else:
raise ValueError(f"不支持的转换格式: {target_format}")
# 保存转换后的文件
filename = f"{self.original_file.name.rsplit('.', 1)[0]}.{target_format}"
self.converted_file.save(filename, converted_file)
self.file_format = target_format
self.save(update_fields=['converted_file', 'file_format'])
# 异步文件转换任务
from celery import shared_task
@shared_task
def async_file_conversion(file_id, target_format):
"""异步文件转换任务"""
try:
converted_file = ConvertedFile.objects.get(id=file_id)
converted_file.convert_file(target_format)
# 可以发送通知或更新状态
return f"文件转换成功: {converted_file.original_file.name} -> {target_format}"
except ConvertedFile.DoesNotExist:
return f"文件不存在: {file_id}"
except Exception as e:
return f"转换失败: {str(e)}"
# 文件优化管道
class FileProcessingPipeline:
"""文件处理管道"""
def __init__(self):
self.processors = []
def add_processor(self, processor_func, **kwargs):
"""添加处理器"""
self.processors.append((processor_func, kwargs))
return self
def process(self, file_obj):
"""执行处理管道"""
current_file = file_obj
for processor_func, kwargs in self.processors:
current_file = processor_func(current_file, **kwargs)
return current_file
# 使用示例
pipeline = FileProcessingPipeline()
pipeline.add_processor(ImageProcessor.resize_image, max_width=800, max_height=600)
pipeline.add_processor(ImageProcessor.compress_image, target_size_kb=500)
# processed_file = pipeline.process(original_file)#文件管理与清理
#文件生命周期管理
# 文件生命周期管理
from datetime import datetime, timedelta
from django.utils import timezone
import os
import shutil
class FileManager:
"""文件管理器"""
@staticmethod
def get_file_age(file_path):
"""获取文件年龄"""
timestamp = os.path.getmtime(file_path)
file_time = datetime.fromtimestamp(timestamp)
now = timezone.now()
return now - file_time
@staticmethod
def cleanup_old_files(directory, days_old=30):
"""清理旧文件"""
cutoff_date = timezone.now() - timedelta(days=days_old)
deleted_count = 0
for root, dirs, files in os.walk(directory):
for file in files:
file_path = os.path.join(root, file)
file_mod_time = datetime.fromtimestamp(os.path.getmtime(file_path))
if file_mod_time < cutoff_date:
try:
os.remove(file_path)
deleted_count += 1
except OSError as e:
print(f"无法删除文件 {file_path}: {e}")
return deleted_count
@staticmethod
def get_directory_size(directory):
"""获取目录大小"""
total_size = 0
for dirpath, dirnames, filenames in os.walk(directory):
for filename in filenames:
filepath = os.path.join(dirpath, filename)
if os.path.exists(filepath):
total_size += os.path.getsize(filepath)
return total_size
@staticmethod
def organize_files_by_type(source_dir, target_dir):
"""按类型整理文件"""
type_dirs = {
'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp'],
'documents': ['.pdf', '.doc', '.docx', '.txt', '.xls', '.xlsx'],
'videos': ['.mp4', '.avi', '.mov', '.mkv'],
'audio': ['.mp3', '.wav', '.flac', '.aac']
}
for filename in os.listdir(source_dir):
file_path = os.path.join(source_dir, filename)
if os.path.isfile(file_path):
_, ext = os.path.splitext(filename.lower())
# 确定目标目录
target_subdir = 'others'
for type_name, extensions in type_dirs.items():
if ext in extensions:
target_subdir = type_name
break
# 创建目标子目录
target_path = os.path.join(target_dir, target_subdir, filename)
os.makedirs(os.path.join(target_dir, target_subdir), exist_ok=True)
# 移动文件
shutil.move(file_path, target_path)
# 文件清理任务
class FileCleanupTask:
"""文件清理任务"""
@staticmethod
def cleanup_temporary_files():
"""清理临时文件"""
import tempfile
temp_dir = tempfile.gettempdir()
# 清理7天前的临时文件
old_files_deleted = FileManager.cleanup_old_files(temp_dir, days_old=7)
print(f"删除了 {old_files_deleted} 个临时文件")
@staticmethod
def cleanup_unused_media():
"""清理未使用的媒体文件"""
from django.conf import settings
from .models import Document
media_root = settings.MEDIA_ROOT
# 获取所有文档引用的文件
used_files = set()
for doc in Document.objects.all():
if doc.file:
used_files.add(doc.file.path)
# 遍历媒体目录,删除未使用的文件
deleted_count = 0
for root, dirs, files in os.walk(media_root):
for file in files:
file_path = os.path.join(root, file)
if file_path not in used_files:
try:
os.remove(file_path)
deleted_count += 1
except OSError:
pass # 文件正在使用或其他错误
print(f"删除了 {deleted_count} 个未使用的媒体文件")
@staticmethod
def generate_cleanup_report():
"""生成清理报告"""
from django.conf import settings
import humanize
report = {
'timestamp': timezone.now(),
'media_directory_size': FileManager.get_directory_size(settings.MEDIA_ROOT),
'static_directory_size': FileManager.get_directory_size(settings.STATIC_ROOT) if hasattr(settings, 'STATIC_ROOT') else 0,
}
return report
# 定期清理任务
from celery import shared_task
@shared_task
def scheduled_file_cleanup():
"""定期文件清理任务"""
print("开始执行定期文件清理...")
# 清理临时文件
FileCleanupTask.cleanup_temporary_files()
# 清理未使用的媒体文件
FileCleanupTask.cleanup_unused_media()
# 生成报告
report = FileCleanupTask.generate_cleanup_report()
print(f"清理完成,生成报告: {report}")
return "文件清理任务完成"#存储监控和报告
# 存储监控和报告
from django.core.management.base import BaseCommand
from django.conf import settings
import psutil
import os
from datetime import datetime
class StorageMonitor:
"""存储监控器"""
def __init__(self):
self.media_root = settings.MEDIA_ROOT
self.static_root = getattr(settings, 'STATIC_ROOT', '')
def get_storage_usage(self):
"""获取存储使用情况"""
usage = {}
if os.path.exists(self.media_root):
media_usage = psutil.disk_usage(self.media_root)
usage['media'] = {
'total': media_usage.total,
'used': media_usage.used,
'free': media_usage.free,
'percent_used': (media_usage.used / media_usage.total) * 100
}
if self.static_root and os.path.exists(self.static_root):
static_usage = psutil.disk_usage(self.static_root)
usage['static'] = {
'total': static_usage.total,
'used': static_usage.used,
'free': static_usage.free,
'percent_used': (static_usage.used / static_usage.total) * 100
}
# 获取具体目录大小
usage['media_dir_size'] = FileManager.get_directory_size(self.media_root)
if self.static_root:
usage['static_dir_size'] = FileManager.get_directory_size(self.static_root)
return usage
def get_file_statistics(self):
"""获取文件统计信息"""
stats = {
'total_files': 0,
'total_size': 0,
'file_types': {},
'largest_files': []
}
for root, dirs, files in os.walk(self.media_root):
for file in files:
file_path = os.path.join(root, file)
if os.path.exists(file_path):
file_size = os.path.getsize(file_path)
_, ext = os.path.splitext(file.lower())
stats['total_files'] += 1
stats['total_size'] += file_size
# 统计文件类型
if ext not in stats['file_types']:
stats['file_types'][ext] = {'count': 0, 'size': 0}
stats['file_types'][ext]['count'] += 1
stats['file_types'][ext]['size'] += file_size
# 获取最大的10个文件
large_files = []
for root, dirs, files in os.walk(self.media_root):
for file in files:
file_path = os.path.join(root, file)
if os.path.exists(file_path):
large_files.append((file_path, os.path.getsize(file_path)))
stats['largest_files'] = sorted(large_files, key=lambda x: x[1], reverse=True)[:10]
return stats
def generate_report(self):
"""生成存储报告"""
storage_usage = self.get_storage_usage()
file_stats = self.get_file_statistics()
report = {
'generated_at': datetime.now().isoformat(),
'storage_usage': storage_usage,
'file_statistics': file_stats,
'recommendations': self.get_recommendations(storage_usage, file_stats)
}
return report
def get_recommendations(self, storage_usage, file_stats):
"""获取存储优化建议"""
recommendations = []
# 检查存储空间使用情况
if 'media' in storage_usage:
media_usage = storage_usage['media']
if media_usage['percent_used'] > 80:
recommendations.append("媒体存储使用率超过80%,建议清理旧文件或扩容")
# 检查大文件
if file_stats['largest_files']:
largest_size = file_stats['largest_files'][0][1]
if largest_size > 50 * 1024 * 1024: # 50MB
recommendations.append("发现大于50MB的文件,考虑压缩或优化")
# 检查文件类型分布
total_files = file_stats['total_files']
for ext, info in file_stats['file_types'].items():
if info['count'] / total_files > 0.5 and ext in ['.tmp', '.log', '.bak']:
recommendations.append(f"发现过多的{ext}文件,建议定期清理")
return recommendations
# 存储监控命令
class Command(BaseCommand):
"""存储监控命令"""
def handle(self, *args, **options):
monitor = StorageMonitor()
report = monitor.generate_report()
self.stdout.write("存储使用报告:")
self.stdout.write(f"生成时间: {report['generated_at']}")
if 'media' in report['storage_usage']:
media = report['storage_usage']['media']
self.stdout.write(f"媒体存储 - 总计: {media['total']}, 已用: {media['used']}, 使用率: {media['percent_used']:.2f}%")
self.stdout.write(f"总文件数: {report['file_statistics']['total_files']}")
self.stdout.write(f"总大小: {report['file_statistics']['total_size']}")
self.stdout.write("\n存储优化建议:")
for rec in report['recommendations']:
self.stdout.write(f"- {rec}")#常见问题与解决方案
#问题1:文件上传超时
症状:大文件上传时出现超时错误
解决方案:
# 1. 调整服务器超时设置
# settings.py
DATA_UPLOAD_MAX_NUMBER_FIELDS = 1000 # 增加字段数限制
FILE_UPLOAD_MAX_MEMORY_SIZE = 10 * 1024 * 1024 # 10MB内存限制
DATA_UPLOAD_MAX_MEMORY_SIZE = 10 * 1024 * 1024 # 10MB数据限制
# 2. Nginx配置(如果使用Nginx)
"""
# nginx.conf
client_max_body_size 100M;
client_body_timeout 120s;
client_header_timeout 120s;
"""
# 3. 处理大文件上传的视图
def large_file_upload_view(request):
"""大文件上传视图"""
if request.method == 'POST':
uploaded_file = request.FILES.get('file')
if uploaded_file:
# 对于大文件,直接保存到临时位置
import tempfile
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
for chunk in uploaded_file.chunks():
temp_file.write(chunk)
# 处理临时文件
temp_file_path = temp_file.name
# 可以在这里进行异步处理
# 移动到最终位置
from django.core.files.storage import default_storage
final_path = default_storage.save(uploaded_file.name, open(temp_file_path, 'rb'))
# 清理临时文件
os.unlink(temp_file_path)
return JsonResponse({'success': True, 'path': final_path})
return render(request, 'large_upload.html')#问题2:文件名冲突
症状:上传同名文件时覆盖原有文件
解决方案:
# 1. 使用UUID生成唯一文件名
import uuid
from django.utils import timezone
def unique_upload_path(instance, filename):
"""生成唯一上传路径"""
ext = filename.split('.')[-1]
filename = f"{uuid.uuid4().hex}.{ext}"
return f"uploads/{timezone.now().strftime('%Y/%m/%d')}/{filename}"
class UniqueFileModel(models.Model):
"""使用唯一文件名的模型"""
file = models.FileField(upload_to=unique_upload_path)
# 2. 自定义存储后端避免覆盖
from django.core.files.storage import FileSystemStorage
class NoOverwriteStorage(FileSystemStorage):
"""不覆盖文件的存储后端"""
def get_available_name(self, name, max_length=None):
"""返回可用的文件名,避免覆盖"""
if self.exists(name):
# 分离文件名和扩展名
name_parts = name.rsplit('.', 1)
if len(name_parts) == 2:
name_part, ext = name_parts
counter = 1
while self.exists(f"{name_part}_{counter}.{ext}"):
counter += 1
return f"{name_part}_{counter}.{ext}"
else:
# 没有扩展名的情况
counter = 1
while self.exists(f"{name}_{counter}"):
counter += 1
return f"{name}_{counter}"
return name
class NoOverwriteFileModel(models.Model):
"""使用不覆盖存储的模型"""
file = models.FileField(upload_to='uploads/', storage=NoOverwriteStorage())#问题3:内存不足
症状:上传大文件时消耗过多内存
解决方案:
# 1. 调整文件上传内存限制
# settings.py
FILE_UPLOAD_MAX_MEMORY_SIZE = 5 * 1024 * 1024 # 5MB,超过此大小使用临时文件
# 2. 流式处理大文件
def stream_process_large_file(uploaded_file, chunk_size=8192):
"""流式处理大文件"""
import hashlib
# 使用生成器处理文件,避免一次性加载到内存
def file_chunks():
for chunk in uploaded_file.chunks(chunk_size=chunk_size):
yield chunk
# 计算文件哈希
hasher = hashlib.md5()
for chunk in file_chunks():
hasher.update(chunk)
file_hash = hasher.hexdigest()
# 保存文件
from django.core.files.storage import default_storage
safe_filename = f"{file_hash}_{uploaded_file.name}"
saved_path = default_storage.save(safe_filename, uploaded_file)
return saved_path
# 3. 使用临时文件处理
def process_with_temp_file(uploaded_file):
"""使用临时文件处理大文件"""
import tempfile
import os
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
# 分块写入临时文件
for chunk in uploaded_file.chunks():
temp_file.write(chunk)
temp_path = temp_file.name
try:
# 处理临时文件
# 例如:验证、转换、分析等
# 保存到最终位置
from django.core.files.storage import default_storage
final_path = default_storage.save(uploaded_file.name, open(temp_path, 'rb'))
return final_path
finally:
# 清理临时文件
os.unlink(temp_path)#问题4:安全性问题
症状:上传恶意文件导致安全漏洞
解决方案:
# 1. 完整的安全验证链
class ComprehensiveFileValidator:
"""综合文件验证器"""
def __init__(self):
self.magic = magic.Magic(mime=True) # 需要python-magic
def validate_completely(self, uploaded_file):
"""全面验证文件"""
# 1. 基础验证
self.validate_size(uploaded_file)
self.validate_filename(uploaded_file)
# 2. MIME类型验证
self.validate_mime_type(uploaded_file)
# 3. 内容扫描
self.scan_content(uploaded_file)
# 4. 文件头验证
self.validate_file_headers(uploaded_file)
def validate_size(self, uploaded_file, max_size=10*1024*1024):
"""验证文件大小"""
if uploaded_file.size > max_size:
raise ValidationError(f"文件太大,最大支持{max_size/(1024*1024)}MB")
def validate_filename(self, uploaded_file):
"""验证文件名安全"""
filename = uploaded_file.name
if '..' in filename or '/' in filename or '\\' in filename:
raise ValidationError("文件名包含非法字符")
dangerous_exts = ['.exe', '.bat', '.sh', '.php', '.jsp', '.asp']
ext = os.path.splitext(filename)[1].lower()
if ext in dangerous_exts:
raise ValidationError("不允许的文件扩展名")
def validate_mime_type(self, uploaded_file):
"""验证MIME类型"""
# 获取真实MIME类型
real_mime = self.magic.from_buffer(uploaded_file.read(1024))
uploaded_file.seek(0) # 重置文件指针
allowed_mimes = [
'image/jpeg', 'image/png', 'image/gif',
'application/pdf', 'text/plain',
'application/msword',
'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
]
if real_mime not in allowed_mimes:
raise ValidationError(f"不允许的文件类型: {real_mime}")
def scan_content(self, uploaded_file):
"""扫描文件内容"""
# 读取文件头
header = uploaded_file.read(1024)
uploaded_file.seek(0)
# 检查恶意内容
malicious_patterns = [
b'<?php', b'<script', b'javascript:', b'onerror=',
b'eval(', b'exec(', b'system('
]
for pattern in malicious_patterns:
if pattern in header.lower():
raise ValidationError("文件可能包含恶意代码")
# 2. 使用安全验证器
class SecureUploadForm(forms.Form):
"""安全上传表单"""
file = forms.FileField()
def clean_file(self):
"""安全验证上传文件"""
uploaded_file = self.cleaned_data.get('file')
if uploaded_file:
validator = ComprehensiveFileValidator()
validator.validate_completely(uploaded_file)
return uploaded_file#本章小结
在本章中,我们深入学习了Django文件上传与存储系统:
- 文件上传基础:理解了Django文件上传的工作原理和基本概念
- 文件处理架构:掌握了Django文件处理的核心组件和架构
- 文件字段与表单:学会了如何在模型和表单中处理文件字段
- 存储配置:了解了本地和云存储的各种配置方式
- 安全上传:学习了文件验证和访问控制的安全实践
- 云存储集成:掌握了AWS S3、GCS等云存储的集成方法
- 文件优化:了解了图像处理、文件转换等优化技术
- 文件管理:学习了文件生命周期管理和清理策略
#核心要点回顾
"""
本章核心要点:
1. Django提供了完整的文件上传和存储框架
2. 文件上传需要设置正确的表单编码和CSRF保护
3. 大文件上传需要考虑内存和超时限制
4. 文件安全性是重中之重,需要多重验证
5. 云存储提供了更好的可扩展性和可靠性
6. 图像处理和文件转换可以提升用户体验
7. 定期清理和监控存储使用情况很重要
8. 正确的文件组织策略有利于维护
"""💡 核心要点:文件处理是Web应用的重要功能,既要保证功能完善,又要确保安全可靠。正确使用Django的文件处理框架,结合安全验证和适当的存储策略,可以构建稳定高效的文件管理系统。
#SEO优化策略
- 关键词布局: 在标题、内容中合理布局"Django文件上传", "文件存储", "文件处理", "云存储", "Django媒体文件"等关键词
- 内容结构: 使用清晰的标题层级(H1-H3),便于搜索引擎理解内容结构
- 内部链接: 建立与其他相关教程的内部链接,提升页面权重
- 元数据优化: 在页面头部包含描述性的标题、描述和标签
🏷️ 标签云: Django文件上传 文件存储 文件处理 云存储 Django媒体文件 安全上传 图像处理 文件验证 S3存储

