Django文件上传与存储 - 完整的文件处理解决方案

📂 所属阶段:第二部分 — 进阶特性
🎯 难度等级:中级
⏰ 预计学习时间:4-5小时

目录

文件上传基础概念

Django提供了完整的文件上传和处理框架,支持本地存储、云存储等多种方式。

文件上传原理

"""
文件上传工作原理:

1. 客户端使用multipart/form-data编码上传文件
2. Django接收文件并创建UploadedFile对象
3. 文件被存储到临时位置或直接处理
4. 可以将文件保存到指定位置或存储服务
5. 在模型中保存文件路径信息

文件类型:
- UploadedFile: 基础上传文件类
- TemporaryUploadedFile: 临时上传文件
- InMemoryUploadedFile: 内存中的上传文件
"""

文件上传表单

# 文件上传表单基础
from django import forms
from django.core.files.uploadedfile import UploadedFile

class FileUploadForm(forms.Form):
    """基础文件上传表单"""
    
    title = forms.CharField(max_length=100)
    file = forms.FileField(
        label='选择文件',
        help_text='支持的文件类型: PDF, DOC, XLS, JPG, PNG'
    )
    
    def clean_file(self):
        """验证上传的文件"""
        uploaded_file = self.cleaned_data['file']
        
        # 检查文件大小(例如限制为5MB)
        if uploaded_file.size > 5 * 1024 * 1024:  # 5MB
            raise forms.ValidationError('文件大小不能超过5MB')
        
        # 检查文件类型
        allowed_extensions = ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.jpg', '.jpeg', '.png']
        import os
        ext = os.path.splitext(uploaded_file.name)[1].lower()
        
        if ext not in allowed_extensions:
            raise forms.ValidationError(
                f'不支持的文件类型: {ext}。支持的类型: {", ".join(allowed_extensions)}'
            )
        
        return uploaded_file

# 模型表单中的文件字段
from django.db import models

class Document(models.Model):
    """文档模型"""
    title = models.CharField(max_length=200)
    file = models.FileField(upload_to='documents/%Y/%m/')  # 按年月组织
    uploaded_at = models.DateTimeField(auto_now_add=True)
    file_size = models.PositiveIntegerField(null=True, blank=True)  # 文件大小
    content_type = models.CharField(max_length=100, null=True, blank=True)  # MIME类型
    
    def save(self, *args, **kwargs):
        """保存时设置文件相关信息"""
        if self.file:
            self.file_size = self.file.size
            if hasattr(self.file, 'content_type'):
                self.content_type = self.file.content_type
        super().save(*args, **kwargs)

class DocumentForm(forms.ModelForm):
    """文档模型表单"""
    class Meta:
        model = Document
        fields = ['title', 'file']

文件上传视图

# 文件上传视图
from django.shortcuts import render, redirect
from django.contrib import messages
from django.views.decorators.csrf import csrf_protect
from django.http import JsonResponse
import json

@csrf_protect
def upload_file_view(request):
    """文件上传视图"""
    if request.method == 'POST':
        form = FileUploadForm(request.POST, request.FILES)
        if form.is_valid():
            # 处理上传的文件
            uploaded_file = request.FILES['file']
            
            # 方法1: 直接保存到模型
            document = Document.objects.create(
                title=form.cleaned_data['title'],
                file=uploaded_file
            )
            
            messages.success(request, '文件上传成功!')
            return redirect('document_list')
        else:
            messages.error(request, '文件上传失败,请检查文件格式和大小。')
    else:
        form = FileUploadForm()
    
    return render(request, 'upload.html', {'form': form})

# AJAX文件上传
def ajax_upload_view(request):
    """AJAX文件上传视图"""
    if request.method == 'POST' and request.FILES:
        try:
            uploaded_file = request.FILES['file']
            
            # 验证文件
            if uploaded_file.size > 10 * 1024 * 1024:  # 10MB限制
                return JsonResponse({
                    'success': False,
                    'error': '文件太大,最大支持10MB'
                })
            
            # 保存文件
            document = Document.objects.create(
                title=uploaded_file.name,
                file=uploaded_file
            )
            
            return JsonResponse({
                'success': True,
                'file_id': document.id,
                'file_url': document.file.url,
                'file_name': uploaded_file.name
            })
        except Exception as e:
            return JsonResponse({
                'success': False,
                'error': str(e)
            })
    
    return JsonResponse({'success': False, 'error': '无效请求'})

# 进度条上传(使用FormData和JavaScript)
def progress_upload_view(request):
    """支持进度显示的文件上传"""
    if request.method == 'POST':
        # 这里可以集成进度跟踪逻辑
        # 通常需要结合JavaScript和WebSocket来实现实时进度
        uploaded_file = request.FILES['file']
        
        # 保存文件
        document = Document.objects.create(
            title=uploaded_file.name,
            file=uploaded_file
        )
        
        return JsonResponse({
            'success': True,
            'file_url': document.file.url
        })
    
    return render(request, 'progress_upload.html')

Django文件处理架构

文件处理核心组件

# Django文件处理核心组件
from django.core.files.base import File, ContentFile
from django.core.files.storage import default_storage, FileSystemStorage
from django.core.files.uploadedfile import (
    UploadedFile, TemporaryUploadedFile, InMemoryUploadedFile
)
import tempfile
import os

# 文件存储后端
"""
Django文件存储后端:
1. FileSystemStorage: 本地文件系统存储(默认)
2. S3Boto3Storage: AWS S3存储
3. GoogleCloudStorage: Google Cloud存储
4. AzureStorage: Microsoft Azure存储
"""

# 文件处理工具类
class FileProcessor:
    """文件处理工具类"""
    
    @staticmethod
    def get_file_info(uploaded_file):
        """获取文件信息"""
        return {
            'name': uploaded_file.name,
            'size': uploaded_file.size,
            'content_type': getattr(uploaded_file, 'content_type', ''),
            'charset': getattr(uploaded_file, 'charset', ''),
            'temporary_file_path': getattr(uploaded_file, 'temporary_file_path', None),
            'multiple_chunks': uploaded_file.multiple_chunks(),
            'chunks': list(uploaded_file.chunks()) if uploaded_file.multiple_chunks() else [uploaded_file.read()]
        }
    
    @staticmethod
    def save_file_to_path(uploaded_file, destination_path):
        """将上传文件保存到指定路径"""
        with open(destination_path, 'wb+') as destination:
            for chunk in uploaded_file.chunks():
                destination.write(chunk)
    
    @staticmethod
    def create_content_file(content, name):
        """从内容创建文件对象"""
        return ContentFile(content, name=name)
    
    @staticmethod
    def temporary_file_handling():
        """临时文件处理示例"""
        # 创建临时文件
        with tempfile.NamedTemporaryFile(delete=False) as temp_file:
            temp_file.write(b'Some temporary content')
            temp_file_path = temp_file.name
        
        # 将临时文件移动到Django存储
        with open(temp_file_path, 'rb') as f:
            content = f.read()
            django_file = ContentFile(content)
            # 保存到模型
            # my_model.file.save('temp_file.txt', django_file)
        
        # 清理临时文件
        os.unlink(temp_file_path)

# 文件上传处理器
class FileUploadHandler:
    """文件上传处理器"""
    
    def __init__(self, request):
        self.request = request
    
    def handle_single_file(self, field_name='file'):
        """处理单个文件上传"""
        if field_name in self.request.FILES:
            uploaded_file = self.request.FILES[field_name]
            return self.process_file(uploaded_file)
        return None
    
    def handle_multiple_files(self, field_name='files'):
        """处理多个文件上传"""
        files = []
        if field_name in self.request.FILES:
            for uploaded_file in self.request.FILES.getlist(field_name):
                processed_file = self.process_file(uploaded_file)
                files.append(processed_file)
        return files
    
    def process_file(self, uploaded_file):
        """处理单个上传文件"""
        # 验证文件
        self.validate_file(uploaded_file)
        
        # 生成安全的文件名
        safe_filename = self.generate_safe_filename(uploaded_file.name)
        
        # 保存文件
        saved_path = default_storage.save(safe_filename, uploaded_file)
        
        return {
            'original_name': uploaded_file.name,
            'saved_name': safe_filename,
            'saved_path': saved_path,
            'size': uploaded_file.size,
            'content_type': getattr(uploaded_file, 'content_type', '')
        }
    
    def validate_file(self, uploaded_file):
        """验证上传文件"""
        # 检查文件大小
        max_size = 10 * 1024 * 1024  # 10MB
        if uploaded_file.size > max_size:
            raise ValueError(f'文件太大,最大支持 {max_size / (1024*1024)}MB')
        
        # 检查文件名安全性
        if '..' in uploaded_file.name or '/' in uploaded_file.name:
            raise ValueError('文件名包含非法字符')
    
    def generate_safe_filename(self, original_filename):
        """生成安全的文件名"""
        import uuid
        import os
        name, ext = os.path.splitext(original_filename)
        safe_name = f"{uuid.uuid4().hex}{ext.lower()}"
        return safe_name

文件存储接口

# 文件存储接口和实现
from django.core.files.storage import Storage
from django.conf import settings
import os
from urllib.parse import urljoin

class CustomStorage(Storage):
    """自定义存储后端示例"""
    
    def __init__(self, location=None, base_url=None):
        self.location = location or getattr(settings, 'CUSTOM_FILE_STORAGE_LOCATION', '/custom_storage/')
        self.base_url = base_url or getattr(settings, 'CUSTOM_FILE_STORAGE_URL', '/custom-files/')
    
    def _save(self, name, content):
        """保存文件"""
        full_path = self.path(name)
        
        # 创建目录
        directory = os.path.dirname(full_path)
        if not os.path.exists(directory):
            os.makedirs(directory)
        
        # 写入文件
        with open(full_path, 'wb') as f:
            for chunk in content.chunks():
                f.write(chunk)
        
        return name
    
    def _open(self, name, mode='rb'):
        """打开文件"""
        return open(self.path(name), mode)
    
    def delete(self, name):
        """删除文件"""
        full_path = self.path(name)
        if os.path.exists(full_path):
            os.remove(full_path)
    
    def exists(self, name):
        """检查文件是否存在"""
        return os.path.exists(self.path(name))
    
    def url(self, name):
        """获取文件URL"""
        return urljoin(self.base_url, name)
    
    def size(self, name):
        """获取文件大小"""
        return os.path.getsize(self.path(name))
    
    def path(self, name):
        """获取文件系统路径"""
        return os.path.join(self.location, name)

# 存储工厂
class StorageFactory:
    """存储工厂类"""
    
    @staticmethod
    def get_storage(storage_type='filesystem', **kwargs):
        """根据类型获取存储实例"""
        if storage_type == 'filesystem':
            from django.core.files.storage import FileSystemStorage
            return FileSystemStorage(**kwargs)
        elif storage_type == 'custom':
            return CustomStorage(**kwargs)
        elif storage_type == 's3':
            try:
                from storages.backends.s3boto3 import S3Boto3Storage
                return S3Boto3Storage(**kwargs)
            except ImportError:
                raise ValueError("S3存储需要安装django-storages[boto3]")
        else:
            raise ValueError(f"不支持的存储类型: {storage_type}")

# 存储配置管理
class StorageConfig:
    """存储配置管理"""
    
    @staticmethod
    def get_media_storage():
        """获取媒体文件存储配置"""
        storage_type = getattr(settings, 'MEDIA_STORAGE_TYPE', 'filesystem')
        
        if storage_type == 'filesystem':
            return {
                'class': 'django.core.files.storage.FileSystemStorage',
                'location': settings.MEDIA_ROOT,
                'base_url': settings.MEDIA_URL
            }
        elif storage_type == 's3':
            return {
                'class': 'storages.backends.s3boto3.S3Boto3Storage',
                'bucket_name': settings.AWS_STORAGE_BUCKET_NAME,
                'region_name': settings.AWS_S3_REGION_NAME,
                'file_overwrite': False
            }
        else:
            return {
                'class': 'django.core.files.storage.FileSystemStorage',
                'location': settings.MEDIA_ROOT,
                'base_url': settings.MEDIA_URL
            }

文件字段与表单处理

文件字段类型

# Django文件字段类型详解
from django.db import models
from django.core.files.storage import default_storage
import os

class FileFieldsExample(models.Model):
    """文件字段示例"""
    
    # 基础文件字段
    document = models.FileField(
        upload_to='documents/',
        max_length=500,
        help_text='上传文档文件'
    )
    
    # 图像字段(需要Pillow)
    image = models.ImageField(
        upload_to='images/',
        blank=True,
        null=True,
        help_text='上传图片文件'
    )
    
    # 自定义上传路径
    custom_file = models.FileField(
        upload_to=lambda instance, filename: f'uploads/{instance.user.id}/{filename}',
        blank=True,
        null=True
    )
    
    # 指定存储后端
    stored_file = models.FileField(
        upload_to='stored/',
        storage=default_storage,  # 可以指定自定义存储
        blank=True,
        null=True
    )
    
    # 限制文件类型
    pdf_only = models.FileField(
        upload_to='pdfs/',
        validators=[validate_pdf_file],  # 自定义验证器
        blank=True,
        null=True
    )

def validate_pdf_file(value):
    """PDF文件验证器"""
    import magic  # 需要python-magic包
    file_mime = magic.from_buffer(value.read(1024), mime=True)
    value.seek(0)  # 重置文件指针
    
    if file_mime != 'application/pdf':
        raise ValidationError('只允许上传PDF文件')

# 高级文件字段配置
class AdvancedFileFields(models.Model):
    """高级文件字段配置"""
    
    # 按日期组织文件
    dated_document = models.FileField(
        upload_to='documents/%Y/%m/%d/',  # 按年/月/日组织
        help_text='按日期组织的文档'
    )
    
    # 按用户ID组织
    user_document = models.FileField(
        upload_to=lambda instance, filename: f'user_docs/{instance.owner.id}/{filename}',
        help_text='按用户组织的文档'
    )
    
    # 按文件哈希组织(防重复)
    hashed_file = models.FileField(
        upload_to=lambda instance, filename: f'hashed/{hash_file_content(instance.file)}',
        help_text='按内容哈希组织的文件'
    )

def hash_file_content(uploaded_file):
    """根据文件内容生成哈希值"""
    import hashlib
    md5_hash = hashlib.md5()
    for chunk in uploaded_file.chunks():
        md5_hash.update(chunk)
    return f"{md5_hash.hexdigest()[:8]}/{uploaded_file.name}"

# 文件字段验证
class ValidatedFileForm(forms.ModelForm):
    """带验证的文件表单"""
    
    class Meta:
        model = FileFieldsExample
        fields = ['document', 'image']
    
    def clean_document(self):
        """验证文档文件"""
        document = self.cleaned_data.get('document')
        if document:
            # 检查文件大小
            if document.size > 10 * 1024 * 1024:  # 10MB
                raise forms.ValidationError('文档文件不能超过10MB')
            
            # 检查文件类型(基于扩展名)
            import os
            ext = os.path.splitext(document.name)[1].lower()
            allowed_exts = ['.pdf', '.doc', '.docx', '.txt', '.xls', '.xlsx']
            if ext not in allowed_exts:
                raise forms.ValidationError(
                    f'不允许的文档类型: {ext}。支持: {", ".join(allowed_exts)}'
                )
        
        return document
    
    def clean_image(self):
        """验证图像文件"""
        image = self.cleaned_data.get('image')
        if image:
            # 检查文件大小
            if image.size > 5 * 1024 * 1024:  # 5MB
                raise forms.ValidationError('图像文件不能超过5MB')
            
            # 检查图像尺寸
            from PIL import Image
            img = Image.open(image)
            width, height = img.size
            if width > 4000 or height > 4000:
                raise forms.ValidationError('图像尺寸过大,最大支持4000x4000像素')
        
        return image

文件表单处理

# 高级文件表单处理
from django import forms
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _
import os
import mimetypes

class AdvancedFileForm(forms.Form):
    """高级文件表单"""
    
    title = forms.CharField(max_length=200)
    description = forms.CharField(widget=forms.Textarea, required=False)
    
    # 多文件上传
    files = forms.FileField(
        widget=forms.ClearableFileInput(attrs={
            'multiple': True,
            'accept': '.pdf,.doc,.docx,.jpg,.jpeg,.png'
        }),
        help_text='可以选择多个文件',
        required=False
    )
    
    # 图像文件
    image = forms.ImageField(
        help_text='上传一张图片',
        required=False
    )
    
    def clean_files(self):
        """验证多个文件"""
        files = self.cleaned_data.get('files')
        if files:
            cleaned_files = []
            
            for uploaded_file in files if hasattr(files, '__iter__') else [files]:
                # 验证单个文件
                self.validate_single_file(uploaded_file)
                cleaned_files.append(uploaded_file)
            
            return cleaned_files
        
        return []
    
    def validate_single_file(self, uploaded_file):
        """验证单个文件"""
        # 检查文件大小
        max_size = 10 * 1024 * 1024  # 10MB
        if uploaded_file.size > max_size:
            raise ValidationError(_(f'文件 {uploaded_file.name} 太大,最大支持 {max_size // (1024*1024)}MB'))
        
        # 检查文件类型
        allowed_types = {
            'application/pdf': '.pdf',
            'application/msword': '.doc',
            'application/vnd.openxmlformats-officedocument.wordprocessingml.document': '.docx',
            'image/jpeg': '.jpg',
            'image/png': '.png',
            'text/plain': '.txt'
        }
        
        # 获取MIME类型
        mime_type = mimetypes.guess_type(uploaded_file.name)[0]
        if not mime_type:
            # 如果无法猜测,读取文件头
            import magic
            mime_type = magic.from_buffer(uploaded_file.read(1024), mime=True)
            uploaded_file.seek(0)  # 重置文件指针
        
        if mime_type not in allowed_types:
            raise ValidationError(_(f'不支持的文件类型: {mime_type}'))
        
        # 检查扩展名是否匹配
        _, ext = os.path.splitext(uploaded_file.name.lower())
        expected_ext = allowed_types.get(mime_type)
        if expected_ext and ext != expected_ext:
            raise ValidationError(_(f'文件扩展名与内容类型不匹配'))

# 文件上传进度处理
class ProgressFileForm(forms.Form):
    """支持进度显示的文件表单"""
    
    title = forms.CharField(max_length=200)
    file = forms.FileField()
    
    def save_with_progress(self, progress_callback=None):
        """带进度回调的文件保存"""
        uploaded_file = self.cleaned_data['file']
        
        # 创建临时文件来处理大文件
        import tempfile
        import shutil
        
        temp_file_path = None
        try:
            # 创建临时文件
            with tempfile.NamedTemporaryFile(delete=False) as temp_file:
                temp_file_path = temp_file.name
                
                # 分块写入并报告进度
                total_size = uploaded_file.size
                bytes_written = 0
                
                for chunk in uploaded_file.chunks():
                    temp_file.write(chunk)
                    bytes_written += len(chunk)
                    
                    if progress_callback:
                        progress = (bytes_written / total_size) * 100
                        progress_callback(progress)
            
            # 将临时文件移动到最终位置
            final_path = default_storage.save(uploaded_file.name, open(temp_file_path, 'rb'))
            
            # 创建数据库记录
            from .models import Document
            document = Document.objects.create(
                title=self.cleaned_data['title'],
                file=final_path
            )
            
            return document
            
        finally:
            # 清理临时文件
            if temp_file_path and os.path.exists(temp_file_path):
                os.unlink(temp_file_path)

# 动态文件表单
class DynamicFileForm(forms.Form):
    """动态文件表单"""
    
    def __init__(self, *args, **kwargs):
        # 从kwargs中获取文件字段数量
        num_files = kwargs.pop('num_files', 1)
        super().__init__(*args, **kwargs)
        
        # 动态添加文件字段
        for i in range(num_files):
            self.fields[f'file_{i}'] = forms.FileField(
                label=f'文件 {i+1}',
                required=False
            )
    
    def clean(self):
        """验证所有动态文件字段"""
        cleaned_data = super().clean()
        
        for field_name, field_value in cleaned_data.items():
            if field_name.startswith('file_') and field_value:
                # 验证单个文件
                self.validate_dynamic_file(field_value)
        
        return cleaned_data
    
    def validate_dynamic_file(self, uploaded_file):
        """验证动态添加的文件"""
        # 实现文件验证逻辑
        if uploaded_file.size > 5 * 1024 * 1024:  # 5MB限制
            raise ValidationError('文件太大')

文件存储配置

基础存储配置

# settings.py - 基础文件存储配置
import os
from pathlib import Path

BASE_DIR = Path(__file__).resolve().parent.parent

# 媒体文件配置
MEDIA_URL = '/media/'  # 媒体文件URL前缀
MEDIA_ROOT = os.path.join(BASE_DIR, 'media')  # 媒体文件存储根目录

# 静态文件配置
STATIC_URL = '/static/'
STATIC_ROOT = os.path.join(BASE_DIR, 'staticfiles')
STATICFILES_DIRS = [
    os.path.join(BASE_DIR, 'static'),
]

# 文件上传配置
FILE_UPLOAD_MAX_MEMORY_SIZE = 2621440  # 2.5MB,超过此大小的文件将保存到临时文件
DATA_UPLOAD_MAX_MEMORY_SIZE = 2621440  # 2.5MB,POST数据的最大内存大小
FILE_UPLOAD_TEMP_DIR = os.path.join(BASE_DIR, 'tmp')  # 临时文件目录

# 创建必要的目录
os.makedirs(MEDIA_ROOT, exist_ok=True)
os.makedirs(FILE_UPLOAD_TEMP_DIR, exist_ok=True)

# 存储后端配置
DEFAULT_FILE_STORAGE = 'django.core.files.storage.FileSystemStorage'
# DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'  # 用于云存储

# AWS S3存储配置示例(需要django-storages)
"""
AWS_ACCESS_KEY_ID = 'your-access-key'
AWS_SECRET_ACCESS_KEY = 'your-secret-key'
AWS_STORAGE_BUCKET_NAME = 'your-bucket-name'
AWS_S3_REGION_NAME = 'us-east-1'
AWS_S3_CUSTOM_DOMAIN = f'{AWS_STORAGE_BUCKET_NAME}.s3.amazonaws.com'

# 静态文件S3配置
STATICFILES_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'

# 媒体文件S3配置
DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
"""

高级存储配置

# 高级存储配置类
from django.conf import settings
from django.core.files.storage import get_storage_class
from django.utils.module_loading import import_string
import os

class StorageManager:
    """存储管理器"""
    
    def __init__(self):
        self._storages = {}
    
    def get_storage(self, storage_alias='default'):
        """获取存储实例"""
        if storage_alias in self._storages:
            return self._storages[storage_alias]
        
        # 从设置中获取存储配置
        storage_configs = getattr(settings, 'CUSTOM_FILE_STORAGES', {})
        
        if storage_alias in storage_configs:
            config = storage_configs[storage_alias]
            storage_class = get_storage_class(config['class'])
            storage_instance = storage_class(**config.get('options', {}))
        else:
            # 默认存储
            storage_class = get_storage_class(settings.DEFAULT_FILE_STORAGE)
            storage_instance = storage_class()
        
        self._storages[storage_alias] = storage_instance
        return storage_instance
    
    def save_file(self, file_obj, filename, storage_alias='default'):
        """使用指定存储保存文件"""
        storage = self.get_storage(storage_alias)
        return storage.save(filename, file_obj)
    
    def get_file_url(self, filename, storage_alias='default'):
        """获取文件URL"""
        storage = self.get_storage(storage_alias)
        return storage.url(filename)

# 存储配置示例
CUSTOM_FILE_STORAGES = {
    'default': {
        'class': 'django.core.files.storage.FileSystemStorage',
        'options': {
            'location': os.path.join(settings.BASE_DIR, 'media'),
            'base_url': '/media/',
        }
    },
    'temp': {
        'class': 'django.core.files.storage.FileSystemStorage',
        'options': {
            'location': os.path.join(settings.BASE_DIR, 'temp_uploads'),
            'base_url': '/temp-media/',
        }
    },
    'secure': {
        'class': 'django.core.files.storage.FileSystemStorage',
        'options': {
            'location': os.path.join(settings.BASE_DIR, 'secure_files'),
            'base_url': '/secure-files/',  # 实际URL可能需要保护
        }
    }
}

# 使用存储管理器
storage_manager = StorageManager()

# 存储策略配置
class StorageStrategy:
    """存储策略"""
    
    @staticmethod
    def get_strategy(file_type, file_size):
        """根据文件类型和大小选择存储策略"""
        if file_type.startswith('image/'):
            if file_size < 1024 * 1024:  # 1MB以下
                return 'default'
            else:
                return 'high_capacity'
        elif file_type.startswith('video/'):
            return 'high_capacity'
        elif file_type == 'application/pdf':
            if file_size > 10 * 1024 * 1024:  # 10MB以上
                return 'cloud'
            else:
                return 'default'
        else:
            return 'default'

# 文件组织策略
class FileOrganizer:
    """文件组织策略"""
    
    @staticmethod
    def organize_by_date(filename, date=None):
        """按日期组织文件"""
        from datetime import datetime
        if date is None:
            date = datetime.now()
        
        year = date.strftime('%Y')
        month = date.strftime('%m')
        day = date.strftime('%d')
        
        return f"{year}/{month}/{day}/{filename}"
    
    @staticmethod
    def organize_by_user(filename, user_id):
        """按用户组织文件"""
        return f"user_{user_id}/{filename}"
    
    @staticmethod
    def organize_by_content_type(filename, content_type):
        """按内容类型组织文件"""
        type_mapping = {
            'image/jpeg': 'images',
            'image/png': 'images', 
            'image/gif': 'images',
            'application/pdf': 'documents',
            'application/msword': 'documents',
            'application/vnd.openxmlformats-officedocument.wordprocessingml.document': 'documents',
            'video/mp4': 'videos',
            'audio/mpeg': 'audio'
        }
        
        folder = type_mapping.get(content_type, 'others')
        return f"{folder}/{filename}"
    
    @staticmethod
    def organize_by_size(filename, file_size):
        """按文件大小组织文件"""
        if file_size < 1024 * 1024:  # 1MB
            size_folder = 'small'
        elif file_size < 10 * 1024 * 1024:  # 10MB
            size_folder = 'medium'
        else:
            size_folder = 'large'
        
        return f"{size_folder}/{filename}"

# 综合文件路径生成器
class SmartUploadPath:
    """智能上传路径生成器"""
    
    def __init__(self, base_path='', organization_strategies=None):
        self.base_path = base_path
        self.strategies = organization_strategies or [
            FileOrganizer.organize_by_date,
            FileOrganizer.organize_by_content_type,
        ]
    
    def __call__(self, instance, filename):
        """生成上传路径"""
        # 获取文件信息
        content_type = getattr(instance, 'content_type', '')
        file_size = getattr(instance, 'file_size', 0)
        
        # 应用组织策略
        organized_path = filename
        for strategy in self.strategies:
            if callable(strategy):
                if strategy.__name__ == 'organize_by_content_type':
                    organized_path = strategy(organized_path, content_type)
                elif strategy.__name__ == 'organize_by_size':
                    organized_path = strategy(organized_path, file_size)
                elif strategy.__name__ == 'organize_by_date':
                    organized_path = strategy(organized_path)
                elif strategy.__name__ == 'organize_by_user':
                    user_id = getattr(instance, 'user_id', 1)
                    organized_path = strategy(organized_path, user_id)
        
        return f"{self.base_path}{organized_path}"

# 使用示例
smart_path = SmartUploadPath('uploads/', [
    FileOrganizer.organize_by_date,
    FileOrganizer.organize_by_content_type,
])

class SmartFileModel(models.Model):
    """使用智能路径的文件模型"""
    title = models.CharField(max_length=200)
    file = models.FileField(upload_to=smart_path)
    content_type = models.CharField(max_length=100, blank=True)
    file_size = models.PositiveIntegerField(null=True, blank=True)
    uploaded_at = models.DateTimeField(auto_now_add=True)

存储性能优化

# 存储性能优化配置
import threading
from concurrent.futures import ThreadPoolExecutor
import asyncio
from django.core.files.storage import default_storage

class OptimizedStorage:
    """优化的存储类"""
    
    def __init__(self, storage_backend=None):
        self.storage = storage_backend or default_storage
        self._thread_local = threading.local()
        self.executor = ThreadPoolExecutor(max_workers=4)
    
    def save_async(self, name, content):
        """异步保存文件"""
        future = self.executor.submit(self.storage.save, name, content)
        return future
    
    def save_batch(self, file_pairs):
        """批量保存文件"""
        futures = []
        for name, content in file_pairs:
            future = self.executor.submit(self.storage.save, name, content)
            futures.append(future)
        
        results = []
        for future in futures:
            results.append(future.result())
        
        return results
    
    def get_cached_url(self, name, cache_timeout=3600):
        """获取带缓存的文件URL"""
        from django.core.cache import cache
        
        cache_key = f"file_url_{name}"
        cached_url = cache.get(cache_key)
        
        if cached_url is None:
            cached_url = self.storage.url(name)
            cache.set(cache_key, cached_url, timeout=cache_timeout)
        
        return cached_url

# 缓存存储装饰器
def cached_storage_method(method):
    """存储方法缓存装饰器"""
    def wrapper(self, name, *args, **kwargs):
        from django.core.cache import cache
        
        cache_key = f"storage_{method.__name__}_{name}"
        cached_result = cache.get(cache_key)
        
        if cached_result is not None:
            return cached_result
        
        result = method(self, name, *args, **kwargs)
        cache.set(cache_key, result, timeout=300)  # 缓存5分钟
        
        return result
    return wrapper

class CachedStorage:
    """带缓存的存储类"""
    
    def __init__(self, storage_backend=None):
        self.storage = storage_backend or default_storage
    
    @cached_storage_method
    def url(self, name):
        """获取文件URL(带缓存)"""
        return self.storage.url(name)
    
    @cached_storage_method 
    def size(self, name):
        """获取文件大小(带缓存)"""
        return self.storage.size(name)
    
    def exists(self, name):
        """检查文件是否存在(带缓存)"""
        from django.core.cache import cache
        
        cache_key = f"storage_exists_{name}"
        cached_result = cache.get(cache_key)
        
        if cached_result is not None:
            return cached_result
        
        result = self.storage.exists(name)
        cache.set(cache_key, result, timeout=300)
        
        return result

# 存储监控
class StorageMonitor:
    """存储监控类"""
    
    def __init__(self):
        self.stats = {
            'files_saved': 0,
            'files_deleted': 0,
            'total_size': 0,
            'average_size': 0
        }
        self.lock = threading.Lock()
    
    def record_save(self, file_size):
        """记录文件保存"""
        with self.lock:
            self.stats['files_saved'] += 1
            self.stats['total_size'] += file_size
            self.stats['average_size'] = self.stats['total_size'] / self.stats['files_saved']
    
    def record_delete(self, file_size):
        """记录文件删除"""
        with self.lock:
            self.stats['files_deleted'] += 1
            self.stats['total_size'] -= file_size
            if self.stats['files_saved'] > self.stats['files_deleted']:
                self.stats['average_size'] = (self.stats['total_size'] + file_size) / (self.stats['files_saved'] - self.stats['files_deleted'])
    
    def get_stats(self):
        """获取统计信息"""
        return self.stats.copy()

# 全局存储监控器
storage_monitor = StorageMonitor()

安全文件上传

文件验证安全

# 文件安全验证
import os
import magic  # 需要python-magic包
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _

class SecureFileValidator:
    """安全文件验证器"""
    
    def __init__(self, allowed_extensions=None, allowed_mimes=None, max_size=None):
        self.allowed_extensions = allowed_extensions or [
            '.jpg', '.jpeg', '.png', '.gif',  # 图像
            '.pdf', '.doc', '.docx', '.txt',  # 文档
            '.mp3', '.wav', '.ogg',          # 音频
            '.mp4', '.avi', '.mov'           # 视频
        ]
        self.allowed_mimes = allowed_mimes or [
            'image/jpeg', 'image/png', 'image/gif',
            'application/pdf',
            'application/msword',
            'application/vnd.openxmlformats-officedocument.wordprocessingml.document',
            'text/plain',
            'audio/mpeg', 'audio/wav', 'audio/ogg',
            'video/mp4', 'video/x-msvideo', 'video/quicktime'
        ]
        self.max_size = max_size or 10 * 1024 * 1024  # 10MB
    
    def validate(self, uploaded_file):
        """执行完整验证"""
        self.validate_size(uploaded_file)
        self.validate_extension(uploaded_file)
        self.validate_mime_type(uploaded_file)
        self.validate_content(uploaded_file)
        self.validate_filename(uploaded_file)
    
    def validate_size(self, uploaded_file):
        """验证文件大小"""
        if uploaded_file.size > self.max_size:
            raise ValidationError(
                _('文件大小超过限制 (%(size)s MB)'),
                params={'size': self.max_size / (1024*1024)},
            )
    
    def validate_extension(self, uploaded_file):
        """验证文件扩展名"""
        _, ext = os.path.splitext(uploaded_file.name.lower())
        if ext not in self.allowed_extensions:
            raise ValidationError(
                _('不允许的文件扩展名: %(ext)s'),
                params={'ext': ext},
            )
    
    def validate_mime_type(self, uploaded_file):
        """验证MIME类型(双重检查)"""
        # 基于文件内容的真实MIME类型
        real_mime = magic.from_buffer(uploaded_file.read(1024), mime=True)
        uploaded_file.seek(0)  # 重置文件指针
        
        if real_mime not in self.allowed_mimes:
            raise ValidationError(
                _('不允许的文件类型: %(mime)s'),
                params={'mime': real_mime},
            )
        
        # 基于扩展名的MIME类型
        import mimetypes
        ext_mime = mimetypes.guess_type(uploaded_file.name)[0]
        if ext_mime and ext_mime not in self.allowed_mimes:
            raise ValidationError(
                _('扩展名对应的文件类型不允许: %(mime)s'),
                params={'mime': ext_mime},
            )
    
    def validate_content(self, uploaded_file):
        """验证文件内容(防恶意代码)"""
        # 检查文件头
        header = uploaded_file.read(1024)
        uploaded_file.seek(0)  # 重置
        
        # 检查是否包含HTML/JS标签(常见攻击向量)
        suspicious_patterns = [
            b'<script', b'javascript:', b'vbscript:', b'<iframe',
            b'<object', b'<embed', b'<?php', b'<?', b'<%',  # 服务端包含
        ]
        
        header_lower = header.lower()
        for pattern in suspicious_patterns:
            if pattern in header_lower:
                raise ValidationError(_('文件可能包含恶意代码'))
    
    def validate_filename(self, uploaded_file):
        """验证文件名安全性"""
        filename = uploaded_file.name
        
        # 检查路径遍历攻击
        if '..' in filename or '~' in filename:
            raise ValidationError(_('文件名包含非法字符'))
        
        # 检查危险扩展名
        dangerous_extensions = [
            '.exe', '.bat', '.com', '.scr', '.vbs', '.js', '.jar',
            '.pif', '.html', '.htm', '.php', '.asp', '.aspx'
        ]
        
        _, ext = os.path.splitext(filename.lower())
        if ext in dangerous_extensions:
            raise ValidationError(_('文件扩展名可能存在安全风险'))

# 使用安全验证器的表单
class SecureFileForm(forms.Form):
    """安全文件表单"""
    
    title = forms.CharField(max_length=200)
    file = forms.FileField()
    
    def __init__(self, *args, **kwargs):
        self.validator = SecureFileValidator(
            max_size=5 * 1024 * 1024,  # 5MB
            allowed_extensions=['.jpg', '.jpeg', '.png', '.pdf', '.doc', '.docx']
        )
        super().__init__(*args, **kwargs)
    
    def clean_file(self):
        """安全验证上传文件"""
        uploaded_file = self.cleaned_data.get('file')
        if uploaded_file:
            self.validator.validate(uploaded_file)
        return uploaded_file

# 模型级别的安全验证
class SecureDocument(models.Model):
    """安全文档模型"""
    title = forms.CharField(max_length=200)
    file = models.FileField(upload_to='secure_docs/')
    uploaded_at = models.DateTimeField(auto_now_add=True)
    verified = models.BooleanField(default=False)  # 是否通过安全检查
    
    def save(self, *args, **kwargs):
        """保存前进行安全检查"""
        if self.file:
            validator = SecureFileValidator()
            # 验证上传的文件
            if hasattr(self.file, 'file'):  # 如果是已上传的文件
                validator.validate(self.file.file)
        
        super().save(*args, **kwargs)
        
        # 异步安全扫描
        self.async_security_scan()
    
    def async_security_scan(self):
        """异步安全扫描"""
        # 这里可以集成病毒扫描、内容审核等
        from django.core.mail import mail_admins
        import threading
        
        def scan_task():
            try:
                # 模拟安全扫描
                import time
                time.sleep(1)  # 模拟扫描时间
                
                # 如果发现安全问题,通知管理员
                # mail_admins("安全警告", f"发现可疑文件: {self.file.name}")
                
                # 更新验证状态
                self.verified = True
                self.save(update_fields=['verified'])
                
            except Exception as e:
                # 记录错误
                import logging
                logger = logging.getLogger(__name__)
                logger.error(f"安全扫描失败: {e}")
        
        # 在后台线程中执行扫描
        thread = threading.Thread(target=scan_task)
        thread.daemon = True
        thread.start()

访问控制

# 文件访问控制
from django.http import HttpResponse, Http404, HttpResponseForbidden
from django.contrib.auth.decorators import login_required
from django.utils.decorators import method_decorator
from django.views.generic import View
from django.contrib.auth.mixins import LoginRequiredMixin
import os
from django.conf import settings

class SecureFileDownloadView(LoginRequiredMixin, View):
    """安全文件下载视图"""
    
    def get(self, request, file_path):
        """安全下载文件"""
        # 验证用户权限
        if not self.has_permission(request.user, file_path):
            return HttpResponseForbidden("无权访问此文件")
        
        # 构建安全的文件路径
        safe_path = self.get_safe_file_path(file_path)
        
        if not safe_path or not os.path.exists(safe_path):
            raise Http404("文件不存在")
        
        # 检查文件类型,防止执行脚本
        if self.is_dangerous_file(safe_path):
            return HttpResponseForbidden("禁止下载此类型文件")
        
        # 读取并返回文件
        with open(safe_path, 'rb') as f:
            response = HttpResponse(f.read(), content_type='application/octet-stream')
            response['Content-Disposition'] = f'attachment; filename="{os.path.basename(safe_path)}"'
            return response
    
    def has_permission(self, user, file_path):
        """检查用户是否有权限访问文件"""
        # 实现权限检查逻辑
        # 例如:检查文件是否属于用户,或用户是否有相应角色
        from .models import Document
        
        try:
            document = Document.objects.get(file=file_path)
            # 检查文档是否属于当前用户或用户有相应权限
            return (document.owner == user or 
                   user.is_staff or 
                   user.has_perm('myapp.download_document'))
        except Document.DoesNotExist:
            return False
    
    def get_safe_file_path(self, file_path):
        """获取安全的文件系统路径"""
        # 防止路径遍历攻击
        if '..' in file_path or file_path.startswith('/'):
            return None
        
        # 构建完整路径
        full_path = os.path.join(settings.MEDIA_ROOT, file_path)
        
        # 规范化路径
        normalized_path = os.path.normpath(full_path)
        
        # 确保路径在媒体目录内
        if not normalized_path.startswith(os.path.normpath(settings.MEDIA_ROOT)):
            return None
        
        return normalized_path
    
    def is_dangerous_file(self, file_path):
        """检查是否为危险文件类型"""
        dangerous_extensions = [
            '.exe', '.bat', '.com', '.scr', '.vbs', '.js', '.jar',
            '.pif', '.hta', '.msi', '.cmd', '.ps1', '.sh'
        ]
        
        _, ext = os.path.splitext(file_path.lower())
        return ext in dangerous_extensions

# 带权限检查的模型
class AccessControlledFile(models.Model):
    """访问控制文件模型"""
    title = models.CharField(max_length=200)
    file = models.FileField(upload_to='controlled/')
    owner = models.ForeignKey('auth.User', on_delete=models.CASCADE)
    access_level = models.CharField(
        max_length=20,
        choices=[
            ('private', '私有'),
            ('shared', '共享'),
            ('public', '公开'),
        ],
        default='private'
    )
    allowed_users = models.ManyToManyField('auth.User', related_name='allowed_files', blank=True)
    allowed_groups = models.ManyToManyField('auth.Group', blank=True)
    uploaded_at = models.DateTimeField(auto_now_add=True)
    
    def can_access(self, user):
        """检查用户是否有访问权限"""
        if self.access_level == 'public':
            return True
        
        if user == self.owner:
            return True
        
        if self.access_level == 'shared':
            if user in self.allowed_users.all():
                return True
            if user.groups.filter(id__in=self.allowed_groups.values_list('id', flat=True)).exists():
                return True
        
        return user.is_staff

# 访问控制装饰器
def require_file_permission(permission_level='read'):
    """文件权限装饰器"""
    def decorator(view_func):
        def wrapper(request, *args, **kwargs):
            file_id = kwargs.get('file_id') or request.GET.get('file_id')
            
            if file_id:
                try:
                    from .models import AccessControlledFile
                    file_obj = AccessControlledFile.objects.get(id=file_id)
                    
                    if not file_obj.can_access(request.user):
                        return HttpResponseForbidden("无权访问此文件")
                    
                    if permission_level == 'download' and not file_obj.can_download(request.user):
                        return HttpResponseForbidden("无权下载此文件")
                        
                except AccessControlledFile.DoesNotExist:
                    return Http404("文件不存在")
            
            return view_func(request, *args, **kwargs)
        return wrapper
    return decorator

# 临时访问链接
import uuid
from datetime import datetime, timedelta

class TemporaryAccessLink(models.Model):
    """临时访问链接"""
    file = models.ForeignKey(AccessControlledFile, on_delete=models.CASCADE)
    access_key = models.UUIDField(default=uuid.uuid4, unique=True)
    created_by = models.ForeignKey('auth.User', on_delete=models.CASCADE)
    expires_at = models.DateTimeField()
    max_downloads = models.PositiveIntegerField(default=1)
    downloads_count = models.PositiveIntegerField(default=0)
    is_active = models.BooleanField(default=True)
    
    @classmethod
    def create_link(cls, file, creator, hours=24, max_downloads=1):
        """创建临时访问链接"""
        return cls.objects.create(
            file=file,
            created_by=creator,
            expires_at=datetime.now() + timedelta(hours=hours),
            max_downloads=max_downloads
        )
    
    def is_valid(self):
        """检查链接是否有效"""
        return (self.is_active and 
                self.expires_at > datetime.now() and
                self.downloads_count < self.max_downloads)
    
    def use_link(self):
        """使用链接(增加下载计数)"""
        if self.is_valid():
            self.downloads_count += 1
            if self.downloads_count >= self.max_downloads:
                self.is_active = False
            self.save()
            return True
        return False

# 临时链接视图
class TemporaryDownloadView(View):
    """临时链接下载视图"""
    
    def get(self, request, access_key):
        try:
            link = TemporaryAccessLink.objects.get(access_key=access_key)
            
            if not link.is_valid():
                return HttpResponseForbidden("链接已失效")
            
            # 使用链接
            if not link.use_link():
                return HttpResponseForbidden("无法使用链接")
            
            # 返回文件
            file_path = link.file.file.path
            with open(file_path, 'rb') as f:
                response = HttpResponse(f.read(), content_type='application/octet-stream')
                response['Content-Disposition'] = f'attachment; filename="{os.path.basename(file_path)}"'
                return response
                
        except TemporaryAccessLink.DoesNotExist:
            return Http404("链接不存在")

云存储集成

AWS S3集成

# AWS S3存储集成
"""
pip install django-storages[boto3]
pip install boto3
"""

# settings.py 配置
"""
# AWS配置
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
AWS_STORAGE_BUCKET_NAME = os.environ.get('AWS_STORAGE_BUCKET_NAME')
AWS_S3_REGION_NAME = os.environ.get('AWS_S3_REGION_NAME', 'us-east-1')
AWS_S3_CUSTOM_DOMAIN = f'{AWS_STORAGE_BUCKET_NAME}.s3.amazonaws.com'

# S3设置
AWS_S3_OBJECT_PARAMETERS = {
    'CacheControl': 'max-age=86400',
}
AWS_DEFAULT_ACL = 'public-read'
AWS_S3_VERIFY = True
AWS_S3_REGION_NAME = AWS_S3_REGION_NAME

# 静态文件S3存储
STATICFILES_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'

# 媒体文件S3存储
DEFAULT_FILE_STORAGE = 'storages.backends.s3boto3.S3Boto3Storage'
"""

# S3存储配置类
class S3StorageConfig:
    """S3存储配置"""
    
    @staticmethod
    def get_public_media_settings():
        """公共媒体文件设置"""
        return {
            'AWS_STORAGE_BUCKET_NAME': os.environ.get('AWS_PUBLIC_BUCKET'),
            'AWS_S3_CUSTOM_DOMAIN': f'{os.environ.get("AWS_PUBLIC_BUCKET")}.s3.amazonaws.com',
            'AWS_DEFAULT_ACL': 'public-read',
            'AWS_S3_OBJECT_PARAMETERS': {
                'CacheControl': 'max-age=86400',
            },
        }
    
    @staticmethod
    def get_private_media_settings():
        """私有媒体文件设置"""
        return {
            'AWS_STORAGE_BUCKET_NAME': os.environ.get('AWS_PRIVATE_BUCKET'),
            'AWS_DEFAULT_ACL': 'private',
            'AWS_S3_FILE_OVERWRITE': False,
            'AWS_S3_OBJECT_PARAMETERS': {
                'CacheControl': 'max-age=60',
            },
        }

# S3文件操作类
import boto3
from botocore.exceptions import ClientError
from django.core.files.storage import default_storage

class S3FileManager:
    """S3文件管理器"""
    
    def __init__(self):
        self.s3_client = boto3.client(
            's3',
            aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
            aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
            region_name=settings.AWS_S3_REGION_NAME
        )
        self.bucket_name = settings.AWS_STORAGE_BUCKET_NAME
    
    def upload_file(self, file_obj, s3_key, extra_args=None):
        """上传文件到S3"""
        try:
            self.s3_client.upload_fileobj(
                file_obj,
                self.bucket_name,
                s3_key,
                ExtraArgs=extra_args or {
                    'ContentType': getattr(file_obj, 'content_type', 'binary/octet-stream'),
                    'ACL': 'public-read'
                }
            )
            return f"https://{self.bucket_name}.s3.{settings.AWS_S3_REGION_NAME}.amazonaws.com/{s3_key}"
        except ClientError as e:
            raise Exception(f"S3上传失败: {e}")
    
    def download_file(self, s3_key, local_path):
        """从S3下载文件"""
        try:
            self.s3_client.download_file(self.bucket_name, s3_key, local_path)
        except ClientError as e:
            raise Exception(f"S3下载失败: {e}")
    
    def generate_presigned_url(self, s3_key, expiration=3600):
        """生成预签名URL"""
        try:
            return self.s3_client.generate_presigned_url(
                'get_object',
                Params={'Bucket': self.bucket_name, 'Key': s3_key},
                ExpiresIn=expiration
            )
        except ClientError as e:
            raise Exception(f"生成预签名URL失败: {e}")
    
    def delete_file(self, s3_key):
        """删除S3文件"""
        try:
            self.s3_client.delete_object(Bucket=self.bucket_name, Key=s3_key)
            return True
        except ClientError as e:
            raise Exception(f"S3删除失败: {e}")
    
    def get_file_info(self, s3_key):
        """获取文件信息"""
        try:
            response = self.s3_client.head_object(Bucket=self.bucket_name, Key=s3_key)
            return {
                'size': response['ContentLength'],
                'last_modified': response['LastModified'],
                'content_type': response['ContentType'],
                'etag': response['ETag'].strip('"')
            }
        except ClientError as e:
            raise Exception(f"获取文件信息失败: {e}")

# S3优化的模型字段
from storages.backends.s3boto3 import S3Boto3Storage

class PublicMediaStorage(S3Boto3Storage):
    """公共媒体存储"""
    location = 'media'
    default_acl = 'public-read'
    file_overwrite = False
    signature_version = 's3v4'

class PrivateMediaStorage(S3Boto3Storage):
    """私有媒体存储"""
    location = 'private'
    default_acl = 'private'
    file_overwrite = False
    signature_version = 's3v4'

# 使用S3存储的模型
class S3Document(models.Model):
    """S3文档模型"""
    title = models.CharField(max_length=200)
    public_file = models.FileField(
        upload_to='public_docs/',
        storage=PublicMediaStorage(),
        blank=True,
        null=True
    )
    private_file = models.FileField(
        upload_to='private_docs/',
        storage=PrivateMediaStorage(),
        blank=True,
        null=True
    )
    uploaded_at = models.DateTimeField(auto_now_add=True)

# S3信号处理器
@receiver(post_save, sender=S3Document)
def optimize_s3_file(sender, instance, **kwargs):
    """优化S3文件处理"""
    s3_manager = S3FileManager()
    
    # 为新上传的文件设置优化参数
    if instance.public_file:
        # 可以在这里进行文件优化,如压缩、转码等
        pass

    if instance.private_file:
        # 设置私有文件的额外安全参数
        pass

其他云存储

# Google Cloud Storage集成
"""
pip install django-storages[google]
pip install google-cloud-storage
"""

# settings.py 配置
"""
# Google Cloud配置
GS_BUCKET_NAME = 'your-gcs-bucket'
DEFAULT_FILE_STORAGE = 'storages.backends.gcloud.GoogleCloudStorage'
GS_DEFAULT_ACL = 'publicRead'
"""

# Azure Storage集成
"""
pip install django-storages[azure]
pip install azure-storage-blob
"""

# settings.py 配置
"""
# Azure配置
AZURE_ACCOUNT_NAME = 'your-account-name'
AZURE_ACCOUNT_KEY = 'your-account-key'
AZURE_CONTAINER = 'your-container'
DEFAULT_FILE_STORAGE = 'storages.backends.azure_storage.AzureStorage'
"""

# 多云存储策略
class MultiCloudStorage:
    """多云存储策略"""
    
    def __init__(self):
        self.storages = {}
        self._initialize_storages()
    
    def _initialize_storages(self):
        """初始化各种云存储"""
        # AWS S3
        try:
            from storages.backends.s3boto3 import S3Boto3Storage
            self.storages['aws'] = S3Boto3Storage()
        except ImportError:
            pass
        
        # Google Cloud
        try:
            from storages.backends.gcloud import GoogleCloudStorage
            self.storages['gcs'] = GoogleCloudStorage()
        except ImportError:
            pass
        
        # Azure
        try:
            from storages.backends.azure_storage import AzureStorage
            self.storages['azure'] = AzureStorage()
        except ImportError:
            pass
    
    def select_storage(self, file_info):
        """根据文件信息选择最优存储"""
        file_size = file_info.get('size', 0)
        content_type = file_info.get('content_type', '')
        
        # 策略:大文件使用AWS,小文件使用GCS,特定类型使用Azure
        if file_size > 100 * 1024 * 1024:  # 100MB+
            return self.storages.get('aws')
        elif content_type.startswith('image/'):
            return self.storages.get('gcs')
        else:
            return self.storages.get('azure') or list(self.storages.values())[0] if self.storages else None
    
    def save_file(self, file_obj, filename, **file_info):
        """智能保存文件到最优存储"""
        storage = self.select_storage(file_info)
        if storage:
            return storage.save(filename, file_obj)
        else:
            # 降级到默认存储
            return default_storage.save(filename, file_obj)

# CDN集成
class CDNIntegration:
    """CDN集成"""
    
    def __init__(self, cdn_domain=None):
        self.cdn_domain = cdn_domain or getattr(settings, 'CDN_DOMAIN', None)
    
    def get_optimized_url(self, file_url):
        """获取CDN优化的URL"""
        if self.cdn_domain:
            # 替换为CDN域名
            import re
            # 将原始存储URL替换为CDN URL
            return file_url.replace(settings.MEDIA_URL.lstrip('/'), f'https://{self.cdn_domain}/')
        return file_url
    
    def invalidate_cache(self, file_path):
        """使CDN缓存失效"""
        if self.cdn_domain:
            # 这里集成具体的CDN缓存清除API
            # 例如Cloudflare、Akamai、AWS CloudFront等
            pass

# 全局CDN管理器
cdn_manager = CDNIntegration()

文件处理与优化

图像处理优化

# 图像处理和优化
from PIL import Image
from io import BytesIO
import os
from django.core.files.base import ContentFile

class ImageProcessor:
    """图像处理器"""
    
    @staticmethod
    def resize_image(image_file, max_width=800, max_height=600, quality=85):
        """调整图像大小"""
        # 打开图像
        img = Image.open(image_file)
        
        # 保持宽高比
        img.thumbnail((max_width, max_height), Image.Resampling.LANCZOS)
        
        # 创建输出缓冲区
        output = BytesIO()
        
        # 保存图像
        img_format = img.format or 'JPEG'
        img.save(output, format=img_format, quality=quality, optimize=True)
        
        # 创建ContentFile
        output.seek(0)
        resized_file = ContentFile(output.read(), name=image_file.name)
        
        return resized_file
    
    @staticmethod
    def compress_image(image_file, target_size_kb=500):
        """压缩图像到目标大小"""
        img = Image.open(image_file)
        
        # 二分查找合适的质量
        low_quality, high_quality = 10, 95
        best_file = None
        
        while low_quality <= high_quality:
            mid_quality = (low_quality + high_quality) // 2
            
            output = BytesIO()
            img.save(output, format='JPEG', quality=mid_quality, optimize=True)
            size_kb = len(output.getvalue()) / 1024
            
            if size_kb <= target_size_kb:
                best_file = ContentFile(output.getvalue(), name=image_file.name)
                low_quality = mid_quality + 1
            else:
                high_quality = mid_quality - 1
        
        return best_file or image_file
    
    @staticmethod
    def crop_image(image_file, box):
        """裁剪图像"""
        img = Image.open(image_file)
        cropped_img = img.crop(box)  # box: (left, top, right, bottom)
        
        output = BytesIO()
        cropped_img.save(output, format=img.format)
        output.seek(0)
        
        return ContentFile(output.read(), name=image_file.name)
    
    @staticmethod
    def add_watermark(image_file, watermark_path, position='bottom-right'):
        """添加水印"""
        img = Image.open(image_file)
        watermark = Image.open(watermark_path)
        
        # 调整水印大小
        watermark_width, watermark_height = watermark.size
        img_width, img_height = img.size
        
        # 根据位置放置水印
        positions = {
            'top-left': (10, 10),
            'top-right': (img_width - watermark_width - 10, 10),
            'bottom-left': (10, img_height - watermark_height - 10),
            'bottom-right': (img_width - watermark_width - 10, img_height - watermark_height - 10),
            'center': ((img_width - watermark_width) // 2, (img_height - watermark_height) // 2)
        }
        
        position_coords = positions.get(position, positions['bottom-right'])
        
        # 粘贴水印
        if watermark.mode != 'RGBA':
            watermark = watermark.convert('RGBA')
        
        img.paste(watermark, position_coords, watermark)
        
        output = BytesIO()
        img.save(output, format=img.format)
        output.seek(0)
        
        return ContentFile(output.read(), name=image_file.name)

# 图像模型和信号
class ProcessedImage(models.Model):
    """处理后图像模型"""
    title = models.CharField(max_length=200)
    original_image = models.ImageField(upload_to='originals/')
    processed_image = models.ImageField(upload_to='processed/', blank=True, null=True)
    thumbnail = models.ImageField(upload_to='thumbnails/', blank=True, null=True)
    width = models.PositiveIntegerField(null=True, blank=True)
    height = models.PositiveIntegerField(null=True, blank=True)
    file_size = models.PositiveIntegerField(null=True, blank=True)
    
    def save(self, *args, **kwargs):
        """保存时处理图像"""
        super().save(*args, **kwargs)
        
        if self.original_image and not self.processed_image:
            self.process_and_save_images()
    
    def process_and_save_images(self):
        """处理并保存图像的各种版本"""
        # 生成处理后的图像
        processed_file = ImageProcessor.resize_image(
            self.original_image.file,
            max_width=1200,
            max_height=800
        )
        
        # 保存处理后的图像
        self.processed_image.save(
            f"processed_{self.original_image.name}",
            processed_file,
            save=False
        )
        
        # 生成缩略图
        thumbnail_file = ImageProcessor.resize_image(
            self.original_image.file,
            max_width=200,
            max_height=200
        )
        
        self.thumbnail.save(
            f"thumb_{self.original_image.name}",
            thumbnail_file,
            save=False
        )
        
        # 更新尺寸信息
        img = Image.open(self.original_image.path)
        self.width, self.height = img.size
        
        # 保存所有更改
        super().save(update_fields=['processed_image', 'thumbnail', 'width', 'height'])

# 图像处理信号
@receiver(post_save, sender=ProcessedImage)
def process_image_on_save(sender, instance, created, **kwargs):
    """保存时处理图像"""
    if created and instance.original_image:
        # 在后台线程中处理图像以避免阻塞
        import threading
        
        def process_images():
            instance.process_and_save_images()
        
        thread = threading.Thread(target=process_images)
        thread.daemon = True
        thread.start()

# 批量图像处理器
class BulkImageProcessor:
    """批量图像处理器"""
    
    @staticmethod
    def process_image_batch(image_files, processing_options):
        """批量处理图像"""
        processed_files = []
        
        for image_file in image_files:
            processed_file = BulkImageProcessor.process_single_image(
                image_file, 
                processing_options
            )
            processed_files.append(processed_file)
        
        return processed_files
    
    @staticmethod
    def process_single_image(image_file, options):
        """处理单个图像"""
        img = Image.open(image_file)
        
        # 应用各种处理选项
        if options.get('resize'):
            size = options['resize']
            img = img.resize(size, Image.Resampling.LANCZOS)
        
        if options.get('crop'):
            box = options['crop']
            img = img.crop(box)
        
        if options.get('rotate'):
            angle = options['rotate']
            img = img.rotate(angle, expand=True)
        
        if options.get('format'):
            target_format = options['format']
        else:
            target_format = img.format or 'JPEG'
        
        output = BytesIO()
        img.save(output, format=target_format, quality=options.get('quality', 85))
        output.seek(0)
        
        return ContentFile(output.read(), name=image_file.name)

文件转换和格式化

# 文件转换和格式化
import subprocess
import fitz  # PyMuPDF
from PIL import Image
import pandas as pd
import json

class FileConverter:
    """文件转换器"""
    
    @staticmethod
    def image_to_pdf(image_file):
        """图像转PDF"""
        img = Image.open(image_file)
        
        # 创建PDF
        pdf_output = BytesIO()
        img.save(pdf_output, 'PDF', resolution=100.0, save_all=False)
        pdf_output.seek(0)
        
        return ContentFile(pdf_output.read(), name=f"{image_file.name.rsplit('.', 1)[0]}.pdf")
    
    @staticmethod
    def pdf_to_images(pdf_file, dpi=200):
        """PDF转图像(每页一张图)"""
        doc = fitz.open(stream=pdf_file.read(), filetype="pdf")
        images = []
        
        for page_num in range(len(doc)):
            page = doc[page_num]
            mat = fitz.Matrix(dpi/72, dpi/72)  # 72是PDF的默认DPI
            pix = page.get_pixmap(matrix=mat)
            
            img_data = pix.tobytes("png")
            img_file = ContentFile(img_data, name=f"page_{page_num + 1}.png")
            images.append(img_file)
        
        return images
    
    @staticmethod
    def office_to_pdf(office_file):
        """Office文档转PDF(需要LibreOffice)"""
        # 注意:这需要系统安装LibreOffice
        input_path = office_file.temporary_file_path() if hasattr(office_file, 'temporary_file_path') else None
        
        if not input_path:
            # 如果没有临时路径,先保存到临时文件
            import tempfile
            with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(office_file.name)[1]) as tmp:
                for chunk in office_file.chunks():
                    tmp.write(chunk)
                input_path = tmp.name
        
        try:
            output_dir = os.path.dirname(input_path)
            subprocess.run([
                'libreoffice', '--headless', '--convert-to', 'pdf',
                '--outdir', output_dir, input_path
            ], check=True)
            
            pdf_path = input_path.rsplit('.', 1)[0] + '.pdf'
            with open(pdf_path, 'rb') as pdf_file:
                result = ContentFile(pdf_file.read(), name=f"{office_file.name.rsplit('.', 1)[0]}.pdf")
            
            # 清理临时文件
            os.unlink(input_path)
            if os.path.exists(pdf_path):
                os.unlink(pdf_path)
            
            return result
        except subprocess.CalledProcessError:
            raise Exception("文档转换失败,可能需要安装LibreOffice")
        except FileNotFoundError:
            raise Exception("找不到LibreOffice,请确保已安装")
    
    @staticmethod
    def csv_to_excel(csv_file):
        """CSV转Excel"""
        df = pd.read_csv(csv_file)
        
        excel_output = BytesIO()
        with pd.ExcelWriter(excel_output, engine='openpyxl') as writer:
            df.to_excel(writer, index=False, sheet_name='Sheet1')
        
        excel_output.seek(0)
        return ContentFile(
            excel_output.read(), 
            name=f"{csv_file.name.rsplit('.', 1)[0]}.xlsx"
        )
    
    @staticmethod
    def json_to_csv(json_file):
        """JSON转CSV"""
        import csv
        
        # 读取JSON数据
        content = json_file.read()
        if isinstance(content, bytes):
            content = content.decode('utf-8')
        
        data = json.loads(content)
        
        # 转换为CSV
        csv_output = BytesIO()
        if isinstance(data, list) and len(data) > 0:
            fieldnames = data[0].keys()
            writer = csv.DictWriter(csv_output, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(data)
        else:
            raise ValueError("JSON数据格式不正确,应该是对象数组")
        
        csv_output.seek(0)
        return ContentFile(
            csv_output.read(), 
            name=f"{json_file.name.rsplit('.', 1)[0]}.csv"
        )

# 文件转换模型
class ConvertedFile(models.Model):
    """转换后文件模型"""
    ORIGINAL = 'original'
    PROCESSED = 'processed'
    CONVERSION_CHOICES = [
        (ORIGINAL, '原始文件'),
        (PROCESSED, '处理后文件'),
    ]
    
    original_file = models.FileField(upload_to='originals/')
    converted_file = models.FileField(upload_to='converted/', blank=True, null=True)
    conversion_type = models.CharField(max_length=50, choices=CONVERSION_CHOICES)
    file_format = models.CharField(max_length=10, blank=True)
    converted_at = models.DateTimeField(auto_now_add=True)
    
    def convert_file(self, target_format):
        """转换文件格式"""
        converter = FileConverter()
        
        if target_format == 'pdf' and self.original_file.name.endswith(('.jpg', '.jpeg', '.png')):
            converted_file = converter.image_to_pdf(self.original_file.file)
        elif target_format == 'jpg' and self.original_file.name.endswith('.pdf'):
            images = converter.pdf_to_images(self.original_file.file)
            # 只取第一页作为示例
            if images:
                converted_file = images[0]
            else:
                raise ValueError("PDF转换失败")
        else:
            raise ValueError(f"不支持的转换格式: {target_format}")
        
        # 保存转换后的文件
        filename = f"{self.original_file.name.rsplit('.', 1)[0]}.{target_format}"
        self.converted_file.save(filename, converted_file)
        self.file_format = target_format
        self.save(update_fields=['converted_file', 'file_format'])

# 异步文件转换任务
from celery import shared_task

@shared_task
def async_file_conversion(file_id, target_format):
    """异步文件转换任务"""
    try:
        converted_file = ConvertedFile.objects.get(id=file_id)
        converted_file.convert_file(target_format)
        
        # 可以发送通知或更新状态
        return f"文件转换成功: {converted_file.original_file.name} -> {target_format}"
    except ConvertedFile.DoesNotExist:
        return f"文件不存在: {file_id}"
    except Exception as e:
        return f"转换失败: {str(e)}"

# 文件优化管道
class FileProcessingPipeline:
    """文件处理管道"""
    
    def __init__(self):
        self.processors = []
    
    def add_processor(self, processor_func, **kwargs):
        """添加处理器"""
        self.processors.append((processor_func, kwargs))
        return self
    
    def process(self, file_obj):
        """执行处理管道"""
        current_file = file_obj
        
        for processor_func, kwargs in self.processors:
            current_file = processor_func(current_file, **kwargs)
        
        return current_file

# 使用示例
pipeline = FileProcessingPipeline()
pipeline.add_processor(ImageProcessor.resize_image, max_width=800, max_height=600)
pipeline.add_processor(ImageProcessor.compress_image, target_size_kb=500)

# processed_file = pipeline.process(original_file)

文件管理与清理

文件生命周期管理

# 文件生命周期管理
from datetime import datetime, timedelta
from django.utils import timezone
import os
import shutil

class FileManager:
    """文件管理器"""
    
    @staticmethod
    def get_file_age(file_path):
        """获取文件年龄"""
        timestamp = os.path.getmtime(file_path)
        file_time = datetime.fromtimestamp(timestamp)
        now = timezone.now()
        return now - file_time
    
    @staticmethod
    def cleanup_old_files(directory, days_old=30):
        """清理旧文件"""
        cutoff_date = timezone.now() - timedelta(days=days_old)
        deleted_count = 0
        
        for root, dirs, files in os.walk(directory):
            for file in files:
                file_path = os.path.join(root, file)
                file_mod_time = datetime.fromtimestamp(os.path.getmtime(file_path))
                
                if file_mod_time < cutoff_date:
                    try:
                        os.remove(file_path)
                        deleted_count += 1
                    except OSError as e:
                        print(f"无法删除文件 {file_path}: {e}")
        
        return deleted_count
    
    @staticmethod
    def get_directory_size(directory):
        """获取目录大小"""
        total_size = 0
        for dirpath, dirnames, filenames in os.walk(directory):
            for filename in filenames:
                filepath = os.path.join(dirpath, filename)
                if os.path.exists(filepath):
                    total_size += os.path.getsize(filepath)
        return total_size
    
    @staticmethod
    def organize_files_by_type(source_dir, target_dir):
        """按类型整理文件"""
        type_dirs = {
            'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp'],
            'documents': ['.pdf', '.doc', '.docx', '.txt', '.xls', '.xlsx'],
            'videos': ['.mp4', '.avi', '.mov', '.mkv'],
            'audio': ['.mp3', '.wav', '.flac', '.aac']
        }
        
        for filename in os.listdir(source_dir):
            file_path = os.path.join(source_dir, filename)
            if os.path.isfile(file_path):
                _, ext = os.path.splitext(filename.lower())
                
                # 确定目标目录
                target_subdir = 'others'
                for type_name, extensions in type_dirs.items():
                    if ext in extensions:
                        target_subdir = type_name
                        break
                
                # 创建目标子目录
                target_path = os.path.join(target_dir, target_subdir, filename)
                os.makedirs(os.path.join(target_dir, target_subdir), exist_ok=True)
                
                # 移动文件
                shutil.move(file_path, target_path)

# 文件清理任务
class FileCleanupTask:
    """文件清理任务"""
    
    @staticmethod
    def cleanup_temporary_files():
        """清理临时文件"""
        import tempfile
        temp_dir = tempfile.gettempdir()
        
        # 清理7天前的临时文件
        old_files_deleted = FileManager.cleanup_old_files(temp_dir, days_old=7)
        print(f"删除了 {old_files_deleted} 个临时文件")
    
    @staticmethod
    def cleanup_unused_media():
        """清理未使用的媒体文件"""
        from django.conf import settings
        from .models import Document
        
        media_root = settings.MEDIA_ROOT
        
        # 获取所有文档引用的文件
        used_files = set()
        for doc in Document.objects.all():
            if doc.file:
                used_files.add(doc.file.path)
        
        # 遍历媒体目录,删除未使用的文件
        deleted_count = 0
        for root, dirs, files in os.walk(media_root):
            for file in files:
                file_path = os.path.join(root, file)
                if file_path not in used_files:
                    try:
                        os.remove(file_path)
                        deleted_count += 1
                    except OSError:
                        pass  # 文件正在使用或其他错误
        
        print(f"删除了 {deleted_count} 个未使用的媒体文件")
    
    @staticmethod
    def generate_cleanup_report():
        """生成清理报告"""
        from django.conf import settings
        import humanize
        
        report = {
            'timestamp': timezone.now(),
            'media_directory_size': FileManager.get_directory_size(settings.MEDIA_ROOT),
            'static_directory_size': FileManager.get_directory_size(settings.STATIC_ROOT) if hasattr(settings, 'STATIC_ROOT') else 0,
        }
        
        return report

# 定期清理任务
from celery import shared_task

@shared_task
def scheduled_file_cleanup():
    """定期文件清理任务"""
    print("开始执行定期文件清理...")
    
    # 清理临时文件
    FileCleanupTask.cleanup_temporary_files()
    
    # 清理未使用的媒体文件
    FileCleanupTask.cleanup_unused_media()
    
    # 生成报告
    report = FileCleanupTask.generate_cleanup_report()
    print(f"清理完成,生成报告: {report}")
    
    return "文件清理任务完成"

存储监控和报告

# 存储监控和报告
from django.core.management.base import BaseCommand
from django.conf import settings
import psutil
import os
from datetime import datetime

class StorageMonitor:
    """存储监控器"""
    
    def __init__(self):
        self.media_root = settings.MEDIA_ROOT
        self.static_root = getattr(settings, 'STATIC_ROOT', '')
    
    def get_storage_usage(self):
        """获取存储使用情况"""
        usage = {}
        
        if os.path.exists(self.media_root):
            media_usage = psutil.disk_usage(self.media_root)
            usage['media'] = {
                'total': media_usage.total,
                'used': media_usage.used,
                'free': media_usage.free,
                'percent_used': (media_usage.used / media_usage.total) * 100
            }
        
        if self.static_root and os.path.exists(self.static_root):
            static_usage = psutil.disk_usage(self.static_root)
            usage['static'] = {
                'total': static_usage.total,
                'used': static_usage.used,
                'free': static_usage.free,
                'percent_used': (static_usage.used / static_usage.total) * 100
            }
        
        # 获取具体目录大小
        usage['media_dir_size'] = FileManager.get_directory_size(self.media_root)
        if self.static_root:
            usage['static_dir_size'] = FileManager.get_directory_size(self.static_root)
        
        return usage
    
    def get_file_statistics(self):
        """获取文件统计信息"""
        stats = {
            'total_files': 0,
            'total_size': 0,
            'file_types': {},
            'largest_files': []
        }
        
        for root, dirs, files in os.walk(self.media_root):
            for file in files:
                file_path = os.path.join(root, file)
                if os.path.exists(file_path):
                    file_size = os.path.getsize(file_path)
                    _, ext = os.path.splitext(file.lower())
                    
                    stats['total_files'] += 1
                    stats['total_size'] += file_size
                    
                    # 统计文件类型
                    if ext not in stats['file_types']:
                        stats['file_types'][ext] = {'count': 0, 'size': 0}
                    stats['file_types'][ext]['count'] += 1
                    stats['file_types'][ext]['size'] += file_size
        
        # 获取最大的10个文件
        large_files = []
        for root, dirs, files in os.walk(self.media_root):
            for file in files:
                file_path = os.path.join(root, file)
                if os.path.exists(file_path):
                    large_files.append((file_path, os.path.getsize(file_path)))
        
        stats['largest_files'] = sorted(large_files, key=lambda x: x[1], reverse=True)[:10]
        
        return stats
    
    def generate_report(self):
        """生成存储报告"""
        storage_usage = self.get_storage_usage()
        file_stats = self.get_file_statistics()
        
        report = {
            'generated_at': datetime.now().isoformat(),
            'storage_usage': storage_usage,
            'file_statistics': file_stats,
            'recommendations': self.get_recommendations(storage_usage, file_stats)
        }
        
        return report
    
    def get_recommendations(self, storage_usage, file_stats):
        """获取存储优化建议"""
        recommendations = []
        
        # 检查存储空间使用情况
        if 'media' in storage_usage:
            media_usage = storage_usage['media']
            if media_usage['percent_used'] > 80:
                recommendations.append("媒体存储使用率超过80%,建议清理旧文件或扩容")
        
        # 检查大文件
        if file_stats['largest_files']:
            largest_size = file_stats['largest_files'][0][1]
            if largest_size > 50 * 1024 * 1024:  # 50MB
                recommendations.append("发现大于50MB的文件,考虑压缩或优化")
        
        # 检查文件类型分布
        total_files = file_stats['total_files']
        for ext, info in file_stats['file_types'].items():
            if info['count'] / total_files > 0.5 and ext in ['.tmp', '.log', '.bak']:
                recommendations.append(f"发现过多的{ext}文件,建议定期清理")
        
        return recommendations

# 存储监控命令
class Command(BaseCommand):
    """存储监控命令"""
    
    def handle(self, *args, **options):
        monitor = StorageMonitor()
        report = monitor.generate_report()
        
        self.stdout.write("存储使用报告:")
        self.stdout.write(f"生成时间: {report['generated_at']}")
        
        if 'media' in report['storage_usage']:
            media = report['storage_usage']['media']
            self.stdout.write(f"媒体存储 - 总计: {media['total']}, 已用: {media['used']}, 使用率: {media['percent_used']:.2f}%")
        
        self.stdout.write(f"总文件数: {report['file_statistics']['total_files']}")
        self.stdout.write(f"总大小: {report['file_statistics']['total_size']}")
        
        self.stdout.write("\n存储优化建议:")
        for rec in report['recommendations']:
            self.stdout.write(f"- {rec}")

常见问题与解决方案

问题1:文件上传超时

症状:大文件上传时出现超时错误

解决方案

# 1. 调整服务器超时设置
# settings.py
DATA_UPLOAD_MAX_NUMBER_FIELDS = 1000  # 增加字段数限制
FILE_UPLOAD_MAX_MEMORY_SIZE = 10 * 1024 * 1024  # 10MB内存限制
DATA_UPLOAD_MAX_MEMORY_SIZE = 10 * 1024 * 1024  # 10MB数据限制

# 2. Nginx配置(如果使用Nginx)
"""
# nginx.conf
client_max_body_size 100M;
client_body_timeout 120s;
client_header_timeout 120s;
"""

# 3. 处理大文件上传的视图
def large_file_upload_view(request):
    """大文件上传视图"""
    if request.method == 'POST':
        uploaded_file = request.FILES.get('file')
        
        if uploaded_file:
            # 对于大文件,直接保存到临时位置
            import tempfile
            with tempfile.NamedTemporaryFile(delete=False) as temp_file:
                for chunk in uploaded_file.chunks():
                    temp_file.write(chunk)
                
                # 处理临时文件
                temp_file_path = temp_file.name
                # 可以在这里进行异步处理
                
                # 移动到最终位置
                from django.core.files.storage import default_storage
                final_path = default_storage.save(uploaded_file.name, open(temp_file_path, 'rb'))
                
                # 清理临时文件
                os.unlink(temp_file_path)
                
                return JsonResponse({'success': True, 'path': final_path})
    
    return render(request, 'large_upload.html')

问题2:文件名冲突

症状:上传同名文件时覆盖原有文件

解决方案

# 1. 使用UUID生成唯一文件名
import uuid
from django.utils import timezone

def unique_upload_path(instance, filename):
    """生成唯一上传路径"""
    ext = filename.split('.')[-1]
    filename = f"{uuid.uuid4().hex}.{ext}"
    return f"uploads/{timezone.now().strftime('%Y/%m/%d')}/{filename}"

class UniqueFileModel(models.Model):
    """使用唯一文件名的模型"""
    file = models.FileField(upload_to=unique_upload_path)

# 2. 自定义存储后端避免覆盖
from django.core.files.storage import FileSystemStorage

class NoOverwriteStorage(FileSystemStorage):
    """不覆盖文件的存储后端"""
    
    def get_available_name(self, name, max_length=None):
        """返回可用的文件名,避免覆盖"""
        if self.exists(name):
            # 分离文件名和扩展名
            name_parts = name.rsplit('.', 1)
            if len(name_parts) == 2:
                name_part, ext = name_parts
                counter = 1
                while self.exists(f"{name_part}_{counter}.{ext}"):
                    counter += 1
                return f"{name_part}_{counter}.{ext}"
            else:
                # 没有扩展名的情况
                counter = 1
                while self.exists(f"{name}_{counter}"):
                    counter += 1
                return f"{name}_{counter}"
        return name

class NoOverwriteFileModel(models.Model):
    """使用不覆盖存储的模型"""
    file = models.FileField(upload_to='uploads/', storage=NoOverwriteStorage())

问题3:内存不足

症状:上传大文件时消耗过多内存

解决方案

# 1. 调整文件上传内存限制
# settings.py
FILE_UPLOAD_MAX_MEMORY_SIZE = 5 * 1024 * 1024  # 5MB,超过此大小使用临时文件

# 2. 流式处理大文件
def stream_process_large_file(uploaded_file, chunk_size=8192):
    """流式处理大文件"""
    import hashlib
    
    # 使用生成器处理文件,避免一次性加载到内存
    def file_chunks():
        for chunk in uploaded_file.chunks(chunk_size=chunk_size):
            yield chunk
    
    # 计算文件哈希
    hasher = hashlib.md5()
    for chunk in file_chunks():
        hasher.update(chunk)
    
    file_hash = hasher.hexdigest()
    
    # 保存文件
    from django.core.files.storage import default_storage
    safe_filename = f"{file_hash}_{uploaded_file.name}"
    saved_path = default_storage.save(safe_filename, uploaded_file)
    
    return saved_path

# 3. 使用临时文件处理
def process_with_temp_file(uploaded_file):
    """使用临时文件处理大文件"""
    import tempfile
    import os
    
    # 创建临时文件
    with tempfile.NamedTemporaryFile(delete=False) as temp_file:
        # 分块写入临时文件
        for chunk in uploaded_file.chunks():
            temp_file.write(chunk)
        temp_path = temp_file.name
    
    try:
        # 处理临时文件
        # 例如:验证、转换、分析等
        
        # 保存到最终位置
        from django.core.files.storage import default_storage
        final_path = default_storage.save(uploaded_file.name, open(temp_path, 'rb'))
        
        return final_path
    finally:
        # 清理临时文件
        os.unlink(temp_path)

问题4:安全性问题

症状:上传恶意文件导致安全漏洞

解决方案

# 1. 完整的安全验证链
class ComprehensiveFileValidator:
    """综合文件验证器"""
    
    def __init__(self):
        self.magic = magic.Magic(mime=True)  # 需要python-magic
    
    def validate_completely(self, uploaded_file):
        """全面验证文件"""
        # 1. 基础验证
        self.validate_size(uploaded_file)
        self.validate_filename(uploaded_file)
        
        # 2. MIME类型验证
        self.validate_mime_type(uploaded_file)
        
        # 3. 内容扫描
        self.scan_content(uploaded_file)
        
        # 4. 文件头验证
        self.validate_file_headers(uploaded_file)
    
    def validate_size(self, uploaded_file, max_size=10*1024*1024):
        """验证文件大小"""
        if uploaded_file.size > max_size:
            raise ValidationError(f"文件太大,最大支持{max_size/(1024*1024)}MB")
    
    def validate_filename(self, uploaded_file):
        """验证文件名安全"""
        filename = uploaded_file.name
        if '..' in filename or '/' in filename or '\\' in filename:
            raise ValidationError("文件名包含非法字符")
        
        dangerous_exts = ['.exe', '.bat', '.sh', '.php', '.jsp', '.asp']
        ext = os.path.splitext(filename)[1].lower()
        if ext in dangerous_exts:
            raise ValidationError("不允许的文件扩展名")
    
    def validate_mime_type(self, uploaded_file):
        """验证MIME类型"""
        # 获取真实MIME类型
        real_mime = self.magic.from_buffer(uploaded_file.read(1024))
        uploaded_file.seek(0)  # 重置文件指针
        
        allowed_mimes = [
            'image/jpeg', 'image/png', 'image/gif',
            'application/pdf', 'text/plain',
            'application/msword',
            'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
        ]
        
        if real_mime not in allowed_mimes:
            raise ValidationError(f"不允许的文件类型: {real_mime}")
    
    def scan_content(self, uploaded_file):
        """扫描文件内容"""
        # 读取文件头
        header = uploaded_file.read(1024)
        uploaded_file.seek(0)
        
        # 检查恶意内容
        malicious_patterns = [
            b'<?php', b'<script', b'javascript:', b'onerror=', 
            b'eval(', b'exec(', b'system('
        ]
        
        for pattern in malicious_patterns:
            if pattern in header.lower():
                raise ValidationError("文件可能包含恶意代码")

# 2. 使用安全验证器
class SecureUploadForm(forms.Form):
    """安全上传表单"""
    
    file = forms.FileField()
    
    def clean_file(self):
        """安全验证上传文件"""
        uploaded_file = self.cleaned_data.get('file')
        if uploaded_file:
            validator = ComprehensiveFileValidator()
            validator.validate_completely(uploaded_file)
        return uploaded_file

本章小结

在本章中,我们深入学习了Django文件上传与存储系统:

  1. 文件上传基础:理解了Django文件上传的工作原理和基本概念
  2. 文件处理架构:掌握了Django文件处理的核心组件和架构
  3. 文件字段与表单:学会了如何在模型和表单中处理文件字段
  4. 存储配置:了解了本地和云存储的各种配置方式
  5. 安全上传:学习了文件验证和访问控制的安全实践
  6. 云存储集成:掌握了AWS S3、GCS等云存储的集成方法
  7. 文件优化:了解了图像处理、文件转换等优化技术
  8. 文件管理:学习了文件生命周期管理和清理策略

核心要点回顾

"""
本章核心要点:

1. Django提供了完整的文件上传和存储框架
2. 文件上传需要设置正确的表单编码和CSRF保护
3. 大文件上传需要考虑内存和超时限制
4. 文件安全性是重中之重,需要多重验证
5. 云存储提供了更好的可扩展性和可靠性
6. 图像处理和文件转换可以提升用户体验
7. 定期清理和监控存储使用情况很重要
8. 正确的文件组织策略有利于维护
"""

💡 核心要点:文件处理是Web应用的重要功能,既要保证功能完善,又要确保安全可靠。正确使用Django的文件处理框架,结合安全验证和适当的存储策略,可以构建稳定高效的文件管理系统。

SEO优化策略

  1. 关键词布局: 在标题、内容中合理布局"Django文件上传", "文件存储", "文件处理", "云存储", "Django媒体文件"等关键词
  2. 内容结构: 使用清晰的标题层级(H1-H3),便于搜索引擎理解内容结构
  3. 内部链接: 建立与其他相关教程的内部链接,提升页面权重
  4. 元数据优化: 在页面头部包含描述性的标题、描述和标签

🏷️ 标签云: Django文件上传 文件存储 文件处理 云存储 Django媒体文件 安全上传 图像处理 文件验证 S3存储