MinIO 实战教程


什么是MinIO?

MinIO是一个高性能的对象存储系统,专为人工智能、机器学习和大数据工作负载而设计。它兼容Amazon S3 API,可以作为私有云存储解决方案,提供高可用性、可扩展性和安全性。

MinIO的主要特点:

  • S3兼容:完全兼容Amazon S3 API
  • 高性能:针对现代硬件优化,提供卓越的吞吐量
  • 分布式架构:支持多节点部署,实现高可用性
  • 安全性:内置身份验证和加密功能
  • 简单部署:易于安装和配置
  • 开源免费:采用AGPLv3许可证

1. MinIO安装与配置

1.1 Docker方式安装

# 拉取MinIO镜像
docker pull minio/minio

# 单节点模式运行MinIO
docker run -d \
  --name minio-server \
  -p 9000:9000 \
  -p 9001:9001 \
  -e "MINIO_ROOT_USER=minioadmin" \
  -e "MINIO_ROOT_PASSWORD=minioadmin123" \
  -v /data/minio:/data \
  minio/minio server /data --console-address ":9001"

# 检查MinIO状态
docker logs minio-server

1.2 Docker Compose方式安装

# docker-compose.yml
version: '3.8'

services:
  minio:
    image: minio/minio:RELEASE.2024-04-06T05-26-02Z
    container_name: minio-server
    restart: unless-stopped
    ports:
      - "9000:9000"  # API端口
      - "9001:9001"  # 控制台端口
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin123
    volumes:
      - minio_data:/data
    command: server /data --console-address ":9001"
    healthcheck:
      test: ["CMD", "mc", "ready", "local"]
      interval: 5s
      timeout: 5s
      retries: 5

volumes:
  minio_data:

1.3 分布式模式安装

# docker-compose-distributed.yml
version: '3.8'

services:
  minio1:
    image: minio/minio:RELEASE.2024-04-06T05-26-02Z
    hostname: minio1
    volumes:
      - minio1_data:/data1
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin123
    command: server --address :9000 http://minio{1...4}/data{1...2}
    networks:
      - minio_network

  minio2:
    image: minio/minio:RELEASE.2024-04-06T05-26-02Z
    hostname: minio2
    volumes:
      - minio2_data:/data1
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin123
    command: server --address :9000 http://minio{1...4}/data{1...2}
    networks:
      - minio_network

  minio3:
    image: minio/minio:RELEASE.2024-04-06T05-26-02Z
    hostname: minio3
    volumes:
      - minio3_data:/data1
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin123
    command: server --address :9000 http://minio{1...4}/data{1...2}
    networks:
      - minio_network

  minio4:
    image: minio/minio:RELEASE.2024-04-06T05-26-02Z
    hostname: minio4
    volumes:
      - minio4_data:/data1
    environment:
      MINIO_ROOT_USER: minioadmin
      MINIO_ROOT_PASSWORD: minioadmin123
    command: server --address :9000 http://minio{1...4}/data{1...2}
    networks:
      - minio_network

volumes:
  minio1_data:
  minio2_data:
  minio3_data:
  minio4_data:

networks:
  minio_network:
    driver: bridge

1.4 配置文件详解

# config.json - MinIO配置文件示例
{
  "version": "34",
  "credential": {
    "accessKey": "minioadmin",
    "secretKey": "minioadmin123"
  },
  "region": "",
  "browser": "on",
  "domain": "",
  "sse": {
    "type": "vault",
    "vault": {
      "endpoint": "",
      "auth": {
        "type": "approle",
        "approle": {
          "id": "",
          "secret": ""
        }
      },
      "engine": {
        "name": "transit",
        "key": "my-minio-key"
      },
      "prefix": "/minio"
    },
    "kms": {
      "vault": {
        "endpoint": "",
        "auth": {
          "type": "kubernetes",
          "kubernetes": {
            "role": "my-minio-role",
            "jwt": ""
          }
        },
        "engine": {
          "name": "transit",
          "key": "my-minio-key"
        },
        "prefix": "/minio"
      }
    }
  },
  "notify": {
    "webhook": {
      "1": {
        "enable": false,
        "endpoint": "http://localhost:3000/"
      }
    }
  }
}

2. MinIO基础概念

2.1 核心概念

概念说明类比传统存储
Bucket存储桶,对象的容器目录/文件夹
Object存储的对象(文件)文件
Policy访问控制策略权限设置
User访问用户用户账号
Group用户组用户组
Tenant多租户隔离租户空间

2.2 MinIO Client (mc) 命令行工具

# 安装mc工具
wget https://dl.min.io/client/mc/release/linux-amd64/mc
chmod +x mc
sudo mv mc /usr/local/bin/

# 配置服务器别名
mc alias set myminio http://localhost:9000 minioadmin minioadmin123

# 测试连接
mc admin info myminio

# 列出所有存储桶
mc ls myminio

# 创建存储桶
mc mb myminio/mybucket

# 上传文件
mc cp /path/to/local/file myminio/mybucket/

# 下载文件
mc cp myminio/mybucket/file /path/to/download/

# 同步目录
mc mirror /path/to/local/dir myminio/mybucket/

# 删除文件
mc rm myminio/mybucket/file

# 删除存储桶
mc rb myminio/mybucket --force

3. 存储桶管理

3.1 创建和配置存储桶

# 创建存储桶
mc mb myminio/my-images
mc mb myminio/my-documents
mc mb myminio/my-videos

# 设置存储桶策略
mc anonymous set public myminio/my-public-bucket
mc anonymous set download myminio/my-download-bucket
mc anonymous set none myminio/my-private-bucket

# 查看存储桶信息
mc stat myminio/my-images

# 设置存储桶配额
mc quota set myminio/my-images --size 10GB

# 查看存储桶配额
mc quota info myminio/my-images

3.2 生命周期管理

# 设置生命周期规则
mc ilm rule add myminio/my-images --expiry-days 30
mc ilm rule add myminio/my-images --transition-days 7 --transition-tier WARM-TIER
mc ilm rule add myminio/my-images --noncurrent-expiry-days 30

# 查看生命周期规则
mc ilm rule list myminio/my-images

# 删除生命周期规则
mc ilm rule rm myminio/my-images RULE_ID

3.3 版本控制

# 启用版本控制
mc version enable myminio/my-versioned-bucket

# 暂停版本控制
mc version suspend myminio/my-versioned-bucket

# 查看版本控制状态
mc version info myminio/my-versioned-bucket

# 列出对象版本
mc ls --versions myminio/my-versioned-bucket/

4. 对象操作

4.1 基本对象操作

# 上传文件
mc cp /home/user/document.pdf myminio/my-documents/
mc cp --encrypt-key="myminio=my-access-key:my-secret-key" /home/user/secret.pdf myminio/my-documents/

# 上传目录
mc cp --recursive /home/user/photos/ myminio/my-images/

# 下载文件
mc cp myminio/my-documents/document.pdf /home/user/downloads/

# 下载目录
mc cp --recursive myminio/my-images/ /home/user/downloads/images/

# 移动/重命名对象
mc mv myminio/my-documents/old-name.pdf myminio/my-documents/new-name.pdf

# 删除对象
mc rm myminio/my-documents/document.pdf
mc rm --recursive --force myminio/my-images/old-folder/

# 获取对象信息
mc stat myminio/my-documents/document.pdf

# 计算对象校验和
mc sum256 myminio/my-documents/document.pdf

4.2 预签名URL

# 生成预签名URL(默认24小时)
mc share download myminio/my-documents/document.pdf

# 生成预签名URL(指定过期时间)
mc share download myminio/my-documents/document.pdf --expire=1h

# 生成上传预签名URL
mc share upload myminio/my-documents/new-document.pdf --expire=1h

4.3 对象元数据

# 上传时设置元数据
mc cp --attr "x-amz-meta-author=John Doe;x-amz-meta-category=important" /home/user/doc.pdf myminio/my-documents/

# 复制对象并修改元数据
mc cp --attr "x-amz-meta-new-key=new-value" myminio/my-documents/old.pdf myminio/my-documents/new.pdf

# 查看对象元数据
mc stat myminio/my-documents/document.pdf

5. 访问控制与安全

5.1 用户管理

# 创建用户
mc admin user add myminio newuser newpassword

# 列出用户
mc admin user list myminio

# 删除用户
mc admin user remove myminio newuser

# 启用/禁用用户
mc admin user disable myminio username
mc admin user enable myminio username

# 查看用户信息
mc admin user info myminio username

5.2 策略管理

# 创建自定义策略
cat > readonly-policy.json <<EOF
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::my-bucket/*",
        "arn:aws:s3:::my-bucket"
      ]
    }
  ]
}
EOF

# 上传策略
mc admin policy add myminio readonly readonly-policy.json

# 列出策略
mc admin policy list myminio

# 删除策略
mc admin policy remove myminio readonly

# 将策略分配给用户
mc admin policy attach myminio readonly --user=newuser

5.3 组管理

# 创建组
mc admin group add myminio mygroup user1 user2 user3

# 列出组
mc admin group list myminio

# 将策略分配给组
mc admin policy attach myminio readonly --group=mygroup

# 从组中添加/移除用户
mc admin group add myminio mygroup newuser
mc admin group remove myminio mygroup olduser

6. Python与MinIO集成

6.1 安装MinIO客户端

pip install minio
pip install urllib3  # 确保安装urllib3

6.2 基本连接与操作

from minio import Minio
from minio.error import S3Error
import os
from datetime import timedelta

# 初始化MinIO客户端
def init_minio_client():
    client = Minio(
        "localhost:9000",
        access_key="minioadmin",
        secret_key="minioadmin123",
        secure=False  # 生产环境应设为True
    )
    return client

# 检查存储桶是否存在
def bucket_exists(client, bucket_name):
    return client.bucket_exists(bucket_name)

# 创建存储桶
def create_bucket(client, bucket_name):
    if not client.bucket_exists(bucket_name):
        client.make_bucket(bucket_name)
        print(f"存储桶 {bucket_name} 创建成功")
    else:
        print(f"存储桶 {bucket_name} 已存在")

# 上传文件
def upload_file(client, bucket_name, object_name, file_path):
    client.fput_object(
        bucket_name,
        object_name,
        file_path,
        content_type="application/octet-stream"
    )
    print(f"文件 {file_path} 上传成功至 {bucket_name}/{object_name}")

# 下载文件
def download_file(client, bucket_name, object_name, file_path):
    client.fget_object(bucket_name, object_name, file_path)
    print(f"文件 {bucket_name}/{object_name} 下载成功至 {file_path}")

# 上传对象(从内存)
def upload_object(client, bucket_name, object_name, data):
    client.put_object(
        bucket_name,
        object_name,
        data,
        length=len(data),
        content_type="text/plain"
    )

# 获取对象
def get_object(client, bucket_name, object_name):
    try:
        response = client.get_object(bucket_name, object_name)
        data = response.read()
        response.close()
        response.release_conn()
        return data
    except S3Error as e:
        print(f"获取对象失败: {e}")
        return None

# 列出对象
def list_objects(client, bucket_name, prefix=""):
    objects = client.list_objects(bucket_name, prefix=prefix, recursive=True)
    for obj in objects:
        print(f"Object: {obj.object_name}, Size: {obj.size}")

# 删除对象
def delete_object(client, bucket_name, object_name):
    client.remove_object(bucket_name, object_name)
    print(f"对象 {bucket_name}/{object_name} 删除成功")

# 生成预签名URL
def presigned_get_url(client, bucket_name, object_name, expires=timedelta(hours=1)):
    return client.presigned_get_object(
        bucket_name,
        object_name,
        expires=expires
    )

def presigned_put_url(client, bucket_name, object_name, expires=timedelta(hours=1)):
    return client.presigned_put_object(
        bucket_name,
        object_name,
        expires=expires
    )

6.3 高级功能

# 设置对象生命周期
def set_lifecycle_config(client, bucket_name):
    from minio.commonconfig import ENABLED
    from minio.lifecycleconfig import LifecycleConfig, Rule, Expiration
    from datetime import datetime
    
    config = LifecycleConfig([
        Rule(
            ENABLED,
            rule_filter=LifecycleRuleFilter(prefix="logs/"),
            rule_id="expire-logs-after-30-days",
            expiration=Expiration(days=30)
        )
    ])
    client.set_bucket_lifecycle(bucket_name, config)

# 设置存储桶通知
def set_bucket_notification(client, bucket_name):
    from minio.notificationconfig import NotificationConfig
    from minio.event import ObjectCreatedPut
    
    notification_config = NotificationConfig(
        queue_config_list=[
            QueueConfig(
                "QUEUE-ARN-OF-REDIS",
                [ObjectCreatedPut()],
                config_id="1",
                prefix_filter_rule=PrefixFilterRule("photos/"),
            ),
        ],
    )
    client.set_bucket_notification(bucket_name, notification_config)

# 获取存储桶统计信息
def get_bucket_stats(client, bucket_name):
    # 获取存储桶策略
    policy = client.get_bucket_policy(bucket_name)
    print(f"Bucket Policy: {policy}")
    
    # 获取存储桶配置
    versioning = client.get_bucket_versioning(bucket_name)
    print(f"Versioning Status: {versioning.status}")
    
    # 获取生命周期配置
    lifecycle = client.get_bucket_lifecycle(bucket_name)
    print(f"Lifecycle Config: {lifecycle}")

6.4 实际应用场景

6.4.1 文件上传服务

from flask import Flask, request, jsonify
import uuid
from werkzeug.utils import secure_filename
import mimetypes
import os

app = Flask(__name__)
minio_client = init_minio_client()

# 确保存储桶存在
create_bucket(minio_client, "uploads")

ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'mp4', 'mov'}

def allowed_file(filename):
    return '.' in filename and \
           filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS

@app.route('/upload', methods=['POST'])
def upload_file_api():
    try:
        if 'file' not in request.files:
            return jsonify({'error': 'No file part'}), 400
        
        file = request.files['file']
        if file.filename == '':
            return jsonify({'error': 'No selected file'}), 400
        
        if file and allowed_file(file.filename):
            filename = secure_filename(file.filename)
            # 使用UUID生成唯一文件名
            unique_filename = f"{uuid.uuid4()}_{filename}"
            bucket_name = "uploads"
            
            # 上传到MinIO
            file_content = file.read()
            minio_client.put_object(
                bucket_name,
                unique_filename,
                file_content,
                length=len(file_content),
                content_type=file.content_type or mimetypes.guess_type(filename)[0] or 'application/octet-stream'
            )
            
            # 生成访问URL
            file_url = f"http://localhost:9000/{bucket_name}/{unique_filename}"
            
            return jsonify({
                'success': True,
                'filename': unique_filename,
                'original_filename': filename,
                'url': file_url,
                'size': len(file_content)
            })
        
        return jsonify({'error': 'File type not allowed'}), 400
    
    except S3Error as e:
        return jsonify({'error': f'MinIO Error: {str(e)}'}), 500
    except Exception as e:
        return jsonify({'error': f'Server Error: {str(e)}'}), 500

@app.route('/download/<filename>')
def download_file_api(filename):
    try:
        bucket_name = "uploads"
        
        # 生成预签名URL
        presigned_url = minio_client.presigned_get_object(
            bucket_name,
            filename,
            expires=timedelta(minutes=10)  # 10分钟有效期
        )
        
        return jsonify({
            'download_url': presigned_url
        })
    
    except S3Error as e:
        return jsonify({'error': f'MinIO Error: {str(e)}'}), 500

if __name__ == '__main__':
    app.run(debug=True)

6.4.2 图片处理服务

from PIL import Image
import io
import base64
from typing import Tuple

class ImageStorageService:
    def __init__(self, minio_client, bucket_name="images"):
        self.client = minio_client
        self.bucket_name = bucket_name
        self.allowed_formats = {'JPEG', 'PNG', 'GIF', 'WEBP'}
        
        # 确保存储桶存在
        if not self.client.bucket_exists(self.bucket_name):
            self.client.make_bucket(self.bucket_name)
    
    def upload_image(self, image_data: bytes, filename: str, resize: Tuple[int, int] = None):
        """上传并可选地调整图片大小"""
        # 打开并验证图片
        image_stream = io.BytesIO(image_data)
        image = Image.open(image_stream)
        
        if image.format not in self.allowed_formats:
            raise ValueError(f"Unsupported image format: {image.format}")
        
        # 调整大小(如果指定)
        if resize:
            image = image.resize(resize, Image.Resampling.LANCZOS)
        
        # 保存到字节流
        output_stream = io.BytesIO()
        image.save(output_stream, format=image.format)
        output_stream.seek(0)
        
        # 上传到MinIO
        self.client.put_object(
            self.bucket_name,
            filename,
            output_stream,
            length=output_stream.getbuffer().nbytes,
            content_type=f"image/{image.format.lower()}"
        )
        
        return f"{self.bucket_name}/{filename}"
    
    def get_image_info(self, filename: str):
        """获取图片信息"""
        stat = self.client.stat_object(self.bucket_name, filename)
        image_data = self.get_image_bytes(filename)
        
        image_stream = io.BytesIO(image_data)
        image = Image.open(image_stream)
        
        return {
            'size': stat.size,
            'modified': stat.last_modified,
            'content_type': stat.content_type,
            'width': image.width,
            'height': image.height,
            'format': image.format
        }
    
    def get_image_bytes(self, filename: str) -> bytes:
        """获取图片字节数据"""
        response = self.client.get_object(self.bucket_name, filename)
        try:
            return response.read()
        finally:
            response.close()
            response.release_conn()
    
    def create_thumbnail(self, original_filename: str, thumbnail_size: Tuple[int, int] = (150, 150)):
        """创建缩略图"""
        # 获取原始图片
        original_data = self.get_image_bytes(original_filename)
        image = Image.open(io.BytesIO(original_data))
        
        # 创建缩略图
        image.thumbnail(thumbnail_size, Image.Resampling.LANCZOS)
        
        # 保存缩略图到字节流
        thumb_stream = io.BytesIO()
        image.save(thumb_stream, format=image.format)
        thumb_stream.seek(0)
        
        # 生成缩略图文件名
        name_parts = original_filename.rsplit('.', 1)
        if len(name_parts) == 2:
            thumbnail_filename = f"{name_parts[0]}_thumb.{name_parts[1]}"
        else:
            thumbnail_filename = f"{original_filename}_thumb"
        
        # 上传缩略图
        self.client.put_object(
            self.bucket_name,
            thumbnail_filename,
            thumb_stream,
            length=thumb_stream.getbuffer().nbytes,
            content_type=f"image/{image.format.lower()}"
        )
        
        return thumbnail_filename

# 使用示例
image_service = ImageStorageService(minio_client)

# 上传图片并创建缩略图
with open("example.jpg", "rb") as f:
    image_data = f.read()
    
original_path = image_service.upload_image(image_data, "example.jpg", resize=(800, 600))
thumbnail_path = image_service.create_thumbnail("example.jpg")

print(f"原始图片: {original_path}")
print(f"缩略图: {thumbnail_path}")

6.4.3 备份与归档服务

import zipfile
import tarfile
import gzip
from datetime import datetime, timedelta
import shutil

class BackupService:
    def __init__(self, minio_client, backup_bucket="backups"):
        self.client = minio_client
        self.backup_bucket = backup_bucket
        
        if not self.client.bucket_exists(self.backup_bucket):
            self.client.make_bucket(self.backup_bucket)
    
    def backup_directory(self, local_dir_path: str, archive_name: str = None):
        """备份目录到MinIO"""
        if not archive_name:
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            archive_name = f"backup_{timestamp}.tar.gz"
        
        # 创建压缩包
        archive_path = f"/tmp/{archive_name}"
        
        with tarfile.open(archive_path, "w:gz") as tar:
            tar.add(local_dir_path, arcname=".")
        
        # 上传到MinIO
        with open(archive_path, "rb") as f:
            self.client.put_object(
                self.backup_bucket,
                archive_name,
                f,
                length=os.path.getsize(archive_path),
                content_type="application/gzip"
            )
        
        # 清理临时文件
        os.remove(archive_path)
        
        return f"{self.backup_bucket}/{archive_name}"
    
    def restore_backup(self, archive_name: str, restore_path: str):
        """从MinIO恢复备份"""
        # 下载备份文件
        backup_data = self.get_backup_bytes(archive_name)
        
        # 创建临时文件
        temp_archive_path = f"/tmp/restore_{archive_name}"
        with open(temp_archive_path, "wb") as f:
            f.write(backup_data)
        
        # 解压到目标路径
        if archive_name.endswith('.tar.gz'):
            with tarfile.open(temp_archive_path, "r:gz") as tar:
                tar.extractall(path=restore_path)
        elif archive_name.endswith('.zip'):
            with zipfile.ZipFile(temp_archive_path, 'r') as zip_ref:
                zip_ref.extractall(restore_path)
        
        # 清理临时文件
        os.remove(temp_archive_path)
        
        return restore_path
    
    def get_backup_bytes(self, archive_name: str) -> bytes:
        """获取备份文件字节数据"""
        response = self.client.get_object(self.backup_bucket, archive_name)
        try:
            return response.read()
        finally:
            response.close()
            response.release_conn()
    
    def list_backups(self):
        """列出所有备份"""
        backups = []
        objects = self.client.list_objects(self.backup_bucket, recursive=True)
        for obj in objects:
            backups.append({
                'name': obj.object_name,
                'size': obj.size,
                'last_modified': obj.last_modified,
                'etag': obj.etag
            })
        return sorted(backups, key=lambda x: x['last_modified'], reverse=True)
    
    def cleanup_old_backups(self, days_to_keep: int = 30):
        """清理旧备份"""
        cutoff_date = datetime.now() - timedelta(days=days_to_keep)
        backups = self.list_backups()
        
        deleted_count = 0
        for backup in backups:
            if backup['last_modified'].replace(tzinfo=None) < cutoff_date:
                self.client.remove_object(self.backup_bucket, backup['name'])
                deleted_count += 1
        
        return deleted_count

# 使用示例
backup_service = BackupService(minio_client)

# 创建备份
backup_path = backup_service.backup_directory("/path/to/data", "daily_backup.tar.gz")
print(f"备份创建成功: {backup_path}")

# 列出备份
backups = backup_service.list_backups()
print(f"共有 {len(backups)} 个备份")

# 清理30天前的备份
deleted = backup_service.cleanup_old_backups(days_to_keep=30)
print(f"清理了 {deleted} 个旧备份")

7. 性能优化

7.1 客户端优化

# 优化MinIO客户端配置
def create_optimized_client():
    client = Minio(
        "localhost:9000",
        access_key="minioadmin",
        secret_key="minioadmin123",
        secure=False,
        # 连接池配置
        http_client=requests.Session(),
    )
    
    # 配置HTTP会话以提高性能
    from requests.adapters import HTTPAdapter
    from urllib3.util.retry import Retry
    
    retry_strategy = Retry(
        total=3,
        backoff_factor=1,
        status_forcelist=[429, 500, 502, 503, 504],
    )
    adapter = HTTPAdapter(max_retries=retry_strategy, pool_connections=100, pool_maxsize=100)
    
    client.set_http_client(adapter)
    return client

7.2 服务端优化

# MinIO服务端性能优化配置
export MINIO_CACHE_DRIVES="/mnt/drive1,/mnt/drive2,/mnt/drive3,/mnt/drive4"
export MINIO_CACHE_EXCLUDE="*.tmp,*.log"
export MINIO_CACHE_QUOTA=80
export MINIO_CACHE_AFTER=0
export MINIO_CACHE_WATERMARK_LOW=70
export MINIO_CACHE_WATERMARK_HIGH=80

# 启动优化后的MinIO
minio server --address :9000 /mnt/drive{1...4}/export{1...32}

7.3 大文件上传优化

def upload_large_file(client, bucket_name, object_name, file_path, part_size=100*1024*1024):  # 100MB chunks
    """大文件分片上传"""
    from minio.helpers import get_part_info
    
    file_size = os.path.getsize(file_path)
    
    # 开始多部分上传
    upload_id = client._create_multipart_upload(bucket_name, object_name, {})
    
    uploaded_parts = []
    part_number = 1
    bytes_uploaded = 0
    
    with open(file_path, 'rb') as file:
        while bytes_uploaded < file_size:
            length = min(part_size, file_size - bytes_uploaded)
            data = file.read(length)
            
            # 上传分片
            part = client._upload_part(
                bucket_name, object_name, upload_id, part_number, data, len(data), {}
            )
            
            uploaded_parts.append({
                "PartNumber": part_number,
                "ETag": part.etag
            })
            
            bytes_uploaded += length
            part_number += 1
            
            print(f"上传进度: {bytes_uploaded}/{file_size} ({bytes_uploaded/file_size*100:.1f}%)")
    
    # 完成分片上传
    client._complete_multipart_upload(
        bucket_name, object_name, upload_id, uploaded_parts, {}
    )
    
    print(f"大文件上传完成: {object_name}")

8. 监控与运维

8.1 使用mc命令监控

# 查看服务器信息
mc admin info myminio

# 查看服务器磁盘使用情况
mc admin info myminio --json | jq '.disk'

# 查看服务器配置
mc admin config get myminio

# 查看服务器统计信息
mc admin service status myminio

# 重启服务器
mc admin service restart myminio

# 服务停止/启动
mc admin service stop myminio
mc admin service start myminio

# 查看服务器日志
mc admin logs myminio

# 性能分析
mc support perf myminio

8.2 健康检查

def health_check(client):
    """MinIO健康检查"""
    try:
        # 检查连接
        client.list_buckets()
        
        # 检查存储桶
        buckets = client.list_buckets()
        
        # 简单的读写测试
        test_bucket = "health-check-test"
        test_object = "health_test.txt"
        test_data = b"health check data"
        
        # 创建测试存储桶
        if not client.bucket_exists(test_bucket):
            client.make_bucket(test_bucket)
        
        # 上传测试对象
        client.put_object(
            test_bucket,
            test_object,
            test_data,
            length=len(test_data)
        )
        
        # 下载测试对象
        response = client.get_object(test_bucket, test_object)
        downloaded_data = response.read()
        response.close()
        response.release_conn()
        
        # 验证数据完整性
        if downloaded_data == test_data:
            # 清理测试对象
            client.remove_object(test_bucket, test_object)
            print("MinIO健康检查: 通过")
            return True
        else:
            print("MinIO健康检查: 数据不一致")
            return False
            
    except Exception as e:
        print(f"MinIO健康检查: 失败 - {str(e)}")
        return False

8.3 备份策略

# MinIO配置备份
mc admin config export myminio > minio-config-backup.json

# 恢复配置
mc admin config import myminio minio-config-backup.json

# 使用mc mirror进行跨集群备份
mc mirror myminio/source-bucket primary-backup/
mc mirror myminio/source-bucket secondary-backup/

9. 最佳实践

9.1 安全最佳实践

"""
MinIO安全最佳实践:
1. 使用强密码:访问密钥和秘密密钥应足够复杂
2. TLS加密:在生产环境中启用HTTPS
3. 最小权限原则:为每个应用创建专用用户和策略
4. 定期轮换密钥:定期更换访问密钥
5. 网络隔离:使用防火墙限制访问
6. 监控审计:启用访问日志记录
"""

# 安全的客户端初始化
def create_secure_client():
    # 使用环境变量存储凭据
    import os
    
    client = Minio(
        os.getenv('MINIO_ENDPOINT', 'localhost:9000'),
        access_key=os.getenv('MINIO_ACCESS_KEY'),
        secret_key=os.getenv('MINIO_SECRET_KEY'),
        secure=os.getenv('MINIO_SECURE', 'true').lower() == 'true'
    )
    return client

9.2 性能最佳实践

"""
MinIO性能最佳实践:
1. 使用SSD存储:获得更好的I/O性能
2. 网络优化:使用高速网络连接
3. 分布式部署:使用多个节点提高可用性和性能
4. 适当分片:大文件使用分片上传
5. 缓存配置:启用适当的缓存策略
6. 对象大小:避免过多小文件,合并小对象
"""

# 批量操作优化
def batch_upload_files(client, bucket_name, file_paths):
    """批量上传文件"""
    import concurrent.futures
    from threading import Lock
    
    upload_results = []
    lock = Lock()
    
    def upload_single_file(file_path):
        filename = os.path.basename(file_path)
        try:
            client.fput_object(bucket_name, filename, file_path)
            with lock:
                upload_results.append({'file': filename, 'status': 'success'})
        except Exception as e:
            with lock:
                upload_results.append({'file': filename, 'status': 'failed', 'error': str(e)})
    
    # 使用线程池并行上传
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(upload_single_file, fp) for fp in file_paths]
        concurrent.futures.wait(futures)
    
    return upload_results

9.3 应用场景总结

场景说明推荐配置
文件存储文档、图片、视频等文件存储标准存储,生命周期管理
备份归档数据备份和长期归档低成本存储,版本控制
内容分发静态资源CDN源站公共读取权限,缓存策略
数据湖大数据分析和机器学习大容量,高吞吐量
应用附件用户上传的各类附件访问控制,大小限制

总结

MinIO是一个功能强大、高性能的对象存储解决方案,特别适合需要S3兼容性的应用场景。通过合理的配置、安全措施和性能优化,可以构建可靠、高效的对象存储系统。掌握MinIO的核心概念和最佳实践,能够帮助开发者构建出色的数据存储和管理解决方案。