MongoDB 实战教程


什么是MongoDB?

MongoDB是一个基于文档的开源NoSQL数据库,属于非关系型数据库。它以BSON(Binary JSON)格式存储数据,具有高可扩展性、高性能和灵活的数据模型等特点。

MongoDB的主要特点:

  • 文档存储:使用类似JSON的BSON格式存储数据
  • 动态模式:无需预定义表结构,模式灵活
  • 高可用性:支持副本集和自动故障转移
  • 水平扩展:支持分片集群,可处理海量数据
  • 丰富的查询语言:支持复杂的查询和聚合操作
  • 索引支持:支持多种类型的索引优化查询

1. MongoDB安装与配置

1.1 Docker方式安装

# 拉取MongoDB镜像
docker pull mongo:7.0

# 运行MongoDB容器
docker run -d --name mongodb \
  -p 27017:27017 \
  -e MONGO_INITDB_ROOT_USERNAME=admin \
  -e MONGO_INITDB_ROOT_PASSWORD=password \
  -v mongodb_data:/data/db \
  mongo:7.0

# 连接到MongoDB容器
docker exec -it mongodb mongosh -u admin -p password

1.2 Docker Compose方式安装

# docker-compose.yml
version: '3.8'
services:
  mongodb:
    image: mongo:7.0
    container_name: mongodb
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: admin
      MONGO_INITDB_ROOT_PASSWORD: password
    ports:
      - "27017:27017"
    volumes:
      - mongodb_data:/data/db
      - ./mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js

volumes:
  mongodb_data:

1.3 配置文件详解

// mongod.conf - MongoDB配置文件
storage:
  engine: wiredTiger
  wiredTiger:
    engineConfig:
      cacheSizeGB: 1
  dbPath: /var/lib/mongodb
  journal:
    enabled: true

systemLog:
  destination: file
  logAppend: true
  path: /var/log/mongodb/mongod.log

net:
  port: 27017
  bindIp: 0.0.0.0

processManagement:
  fork: true
  pidFilePath: /var/run/mongodb/mongod.pid

setParameter:
  enableLocalhostAuthBypass: false

security:
  authorization: enabled

2. MongoDB基础概念

2.1 核心概念

概念说明类比关系型数据库
Database数据库Database
Collection文档集合Table
Document数据记录Row
Field字段Column
Index索引Index
Cursor游标ResultSet

2.2 BSON数据类型

// BSON支持的数据类型示例
{
  // 字符串
  name: "张三",
  
  // 整数
  age: 25,
  
  // 浮点数
  salary: 5000.50,
  
  // 布尔值
  isActive: true,
  
  // 日期
  createdAt: new Date(),
  
  // ObjectId
  _id: ObjectId("507f1f77bcf86cd799439011"),
  
  // 数组
  skills: ["Python", "MongoDB", "JavaScript"],
  
  // 嵌套文档
  address: {
    street: "北京路123号",
    city: "广州",
    country: "中国"
  },
  
  // null值
  middleName: null,
  
  // 二进制数据
  avatar: BinData(0, "encoded_binary_data_here")
}

3. 数据库操作

3.1 基本数据库操作

// 连接到MongoDB
use myapp_database

// 查看所有数据库
show dbs

// 查看当前数据库
db.getName()

// 查看当前数据库状态
db.stats()

// 创建数据库(首次插入数据时自动创建)
use ecommerce_db

// 删除数据库
db.dropDatabase()

// 列出所有集合
show collections

// 创建集合
db.createCollection("users")
db.createCollection("products", {
  capped: true,
  size: 100000,
  max: 100
})

// 删除集合
db.products.drop()

3.2 集合管理

// 查看集合信息
db.users.stats()

// 重命名集合
db.users.renameCollection("customers")

// 查看集合大小
db.users.dataSize()
db.users.storageSize()

// 集合验证
db.users.validate()

4. 文档操作

4.1 插入文档

// 插入单个文档
db.users.insertOne({
  name: "张三",
  email: "zhangsan@example.com",
  age: 25,
  hobbies: ["读书", "游泳", "旅行"],
  address: {
    city: "北京",
    district: "朝阳区"
  },
  createdAt: new Date()
})

// 插入多个文档
db.users.insertMany([
  {
    name: "李四",
    email: "lisi@example.com",
    age: 30,
    hobbies: ["摄影", "电影"],
    address: {
      city: "上海",
      district: "浦东新区"
    }
  },
  {
    name: "王五",
    email: "wangwu@example.com",
    age: 28,
    hobbies: ["音乐", "运动"],
    address: {
      city: "深圳",
      district: "南山区"
    }
  }
])

// 使用save方法(如果_id存在则更新,否则插入)
db.users.save({
  _id: ObjectId("507f1f77bcf86cd799439011"),
  name: "赵六",
  email: "zhaoliu@example.com"
})

4.2 查询文档

// 查询所有文档
db.users.find()

// 查询单个文档
db.users.findOne({name: "张三"})

// 条件查询
db.users.find({age: {$gt: 25}})  // 年龄大于25
db.users.find({age: {$gte: 25, $lt: 35}})  // 年龄在25-35之间
db.users.find({name: /^张/})  // 名字以"张"开头
db.users.find({hobbies: "读书"})  // 爱好包含"读书"

// 复杂查询
db.users.find({
  $and: [
    {age: {$gte: 20}},
    {age: {$lte: 40}},
    {hobbies: {$in: ["读书", "游泳"]}}
  ]
})

// 投影(只返回指定字段)
db.users.find({}, {name: 1, email: 1, _id: 0})

// 限制结果数量
db.users.find().limit(10)

// 跳过结果
db.users.find().skip(5).limit(10)

// 排序
db.users.find().sort({age: 1})  // 1升序,-1降序
db.users.find().sort({age: -1, name: 1})

// 统计文档数量
db.users.countDocuments({age: {$gte: 25}})
db.users.estimatedDocumentCount()  // 估算总数(更快)

// 检查文档是否存在
db.users.findOne({email: "zhangsan@example.com"}) != null

4.3 更新文档

// 更新单个文档
db.users.updateOne(
  {name: "张三"},  // 查询条件
  {$set: {age: 26}}  // 更新操作
)

// 更新多个文档
db.users.updateMany(
  {age: {$lt: 30}},  // 查询条件
  {$inc: {age: 1}}  // 将年龄增加1
)

// 替换整个文档
db.users.replaceOne(
  {name: "李四"},
  {
    name: "李四更新",
    email: "lisi_new@example.com",
    age: 31,
    updatedAt: new Date()
  }
)

// upsert操作(如果不存在则插入)
db.users.updateOne(
  {email: "newuser@example.com"},
  {
    $set: {
      name: "新用户",
      email: "newuser@example.com",
      age: 20
    }
  },
  {upsert: true}
)

// 数组操作
db.users.updateOne(
  {name: "张三"},
  {
    $push: {hobbies: "登山"},  // 添加到数组
    $pull: {hobbies: "游泳"},  // 从数组移除
    $addToSet: {skills: "Python"}  // 只添加不存在的元素
  }
)

// 原子操作
db.users.updateOne(
  {name: "张三"},
  {
    $inc: {age: 1},  // 原子递增
    $mul: {salary: 1.1},  // 原子乘法
    $min: {age: 30},  // 只在新值更小时更新
    $max: {age: 50}   // 只在新值更大时更新
  }
)

4.4 删除文档

// 删除单个文档
db.users.deleteOne({name: "赵六"})

// 删除多个文档
db.users.deleteMany({age: {$lt: 20}})

// 删除所有文档
db.users.deleteMany({})

// 删除符合条件的文档并返回被删除的文档
db.users.findOneAndDelete({name: "张三"})

5. 索引管理

5.1 创建索引

// 创建单字段索引
db.users.createIndex({name: 1})  // 1表示升序,-1表示降序

// 创建复合索引
db.users.createIndex({name: 1, age: -1})

// 创建唯一索引
db.users.createIndex({email: 1}, {unique: true})

// 创建稀疏索引(只索引包含字段的文档)
db.users.createIndex({phone: 1}, {sparse: true})

// 创建TTL索引(自动过期)
db.sessions.createIndex({createdAt: 1}, {expireAfterSeconds: 3600})

// 创建文本索引
db.articles.createIndex({title: "text", content: "text"})

// 创建地理位置索引
db.places.createIndex({location: "2dsphere"})

// 创建多键索引(数组字段)
db.users.createIndex({hobbies: 1})

// 创建部分索引
db.users.createIndex(
  {age: 1},
  {partialFilterExpression: {status: "active"}}
)

// 创建后台索引(不影响数据库操作)
db.large_collection.createIndex({field: 1}, {background: true})

5.2 索引操作

// 查看集合的所有索引
db.users.getIndexes()

// 查看索引大小
db.users.totalIndexSize()

// 删除索引
db.users.dropIndex("name_1")
db.users.dropIndex({name: 1})  // 通过索引规范删除
db.users.dropIndexes()  // 删除所有索引(保留_id索引)

// 重建索引
db.users.reIndex()

// 查看索引使用情况
db.users.aggregate([{$indexStats: {}}])

// 分析查询计划
db.users.find({name: "张三"}).explain("executionStats")

6. 聚合管道

6.1 基础聚合操作

// 简单聚合
db.orders.aggregate([
  {$match: {status: "completed"}},
  {$group: {_id: "$customer_id", total_orders: {$sum: 1}}},
  {$sort: {total_orders: -1}},
  {$limit: 10}
])

// 字段投影
db.users.aggregate([
  {$project: {
    name: 1,
    email: 1,
    age: 1,
    fullName: {$concat: ["$firstName", " ", "$lastName"]}
  }}
])

// 字段重命名
db.users.aggregate([
  {$project: {
    _id: 0,
    customerName: "$name",
    customerEmail: "$email"
  }}
])

// 条件操作
db.orders.aggregate([
  {$project: {
    customer_id: 1,
    order_amount: 1,
    discount_category: {
      $switch: {
        branches: [
          {case: {$gte: ["$order_amount", 1000]}, then: "VIP"},
          {case: {$gte: ["$order_amount", 500]}, then: "Premium"},
          {case: {$gte: ["$order_amount", 100]}, then: "Regular"}
        ],
        default: "Basic"
      }
    }
  }}
])

6.2 高级聚合操作

// 数组操作
db.posts.aggregate([
  {$unwind: "$comments"},  // 展开数组
  {$group: {
    _id: "$author",
    commentCount: {$sum: 1},
    avgRating: {$avg: "$comments.rating"}
  }}
])

// 查找重复数据
db.users.aggregate([
  {$group: {
    _id: "$email",
    count: {$sum: 1},
    docs: {$push: "$$ROOT"}
  }},
  {$match: {count: {$gt: 1}}}
])

// 连接其他集合
db.orders.aggregate([
  {$lookup: {
    from: "customers",
    localField: "customer_id",
    foreignField: "_id",
    as: "customer_info"
  }},
  {$unwind: "$customer_info"},
  {$project: {
    order_id: 1,
    customer_name: "$customer_info.name",
    customer_email: "$customer_info.email",
    order_date: 1
  }}
])

// 窗口函数(MongoDB 4.2+)
db.sales.aggregate([
  {$setWindowFields: {
    partitionBy: "$region",
    sortBy: {sale_date: 1},
    output: {
      cumulative_sales: {
        $sum: "$amount",
        window: {$documentNumber: [0, 0]}
      }
    }
  }}
])

6.3 聚合优化

// 优化聚合管道
db.orders.aggregate([
  // 尽早过滤数据
  {$match: {order_date: {$gte: ISODate("2026-01-01")}}},
  
  // 使用索引友好的操作
  {$sort: {customer_id: 1, order_date: -1}},
  
  // 分组操作
  {$group: {
    _id: "$customer_id",
    orders: {$push: "$$ROOT"},
    total_spent: {$sum: "$amount"},
    order_count: {$sum: 1}
  }},
  
  // 限制结果
  {$limit: 1000}
])

7. Python与MongoDB集成

7.1 安装驱动程序

pip install pymongo
pip install motor  # 异步驱动
pip install dnspython  # 支持SRV记录的连接字符串

7.2 基本连接与操作

from pymongo import MongoClient
from bson import ObjectId
from datetime import datetime
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient

# 同步连接
client = MongoClient(
    "mongodb://admin:password@localhost:27017/",
    serverSelectionTimeoutMS=5000  # 5秒超时
)

# 获取数据库
db = client.myapp_database
users_collection = db.users

# 异步连接
async def get_async_client():
    client = AsyncIOMotorClient("mongodb://admin:password@localhost:27017/")
    return client

# 插入文档
def insert_user(user_data):
    result = users_collection.insert_one(user_data)
    return result.inserted_id

# 批量插入
def insert_multiple_users(users_list):
    result = users_collection.insert_many(users_list)
    return result.inserted_ids

# 查询文档
def find_user_by_email(email):
    user = users_collection.find_one({"email": email})
    return user

def find_users_by_age(min_age, max_age):
    users = list(users_collection.find({
        "age": {"$gte": min_age, "$lte": max_age}
    }).sort("age", 1))
    return users

# 更新文档
def update_user_age(user_id, new_age):
    result = users_collection.update_one(
        {"_id": ObjectId(user_id)},
        {"$set": {"age": new_age, "updatedAt": datetime.now()}}
    )
    return result.modified_count

# 删除文档
def delete_user(user_id):
    result = users_collection.delete_one({"_id": ObjectId(user_id)})
    return result.deleted_count

# 聚合查询
def get_user_statistics():
    pipeline = [
        {"$group": {
            "_id": None,
            "total_users": {"$sum": 1},
            "avg_age": {"$avg": "$age"},
            "max_age": {"$max": "$age"},
            "min_age": {"$min": "$age"}
        }}
    ]
    result = list(users_collection.aggregate(pipeline))
    return result[0] if result else {}

7.3 高级功能

# 使用连接池
from pymongo.errors import ConnectionFailure

def create_client_with_pool():
    client = MongoClient(
        "mongodb://admin:password@localhost:27017/",
        maxPoolSize=50,  # 最大连接池大小
        minPoolSize=10,  # 最小连接池大小
        maxIdleTimeMS=30000,  # 最大空闲时间
        serverSelectionTimeoutMS=5000,
        connectTimeoutMS=20000,
        socketTimeoutMS=20000
    )
    
    # 测试连接
    try:
        client.admin.command('ping')
        print("MongoDB connection successful")
    except ConnectionFailure:
        print("MongoDB server not available")
    
    return client

# 事务操作(MongoDB 4.0+)
def perform_transaction():
    client = create_client_with_pool()
    db = client.myapp_database
    
    with client.start_session() as session:
        with session.start_transaction():
            # 扣款操作
            result1 = db.accounts.update_one(
                {"account_id": "acc1", "balance": {"$gte": 100}},
                {"$inc": {"balance": -100}},
                session=session
            )
            
            if result1.modified_count == 0:
                raise Exception("Insufficient funds")
            
            # 入账操作
            result2 = db.accounts.update_one(
                {"account_id": "acc2"},
                {"$inc": {"balance": 100}},
                session=session
            )
            
            print("Transaction completed successfully")

# 异步操作
async def async_operations():
    client = AsyncIOMotorClient("mongodb://admin:password@localhost:27017/")
    db = client.myapp_database
    collection = db.users
    
    # 异步插入
    user_doc = {
        "name": "异步用户",
        "email": "async@example.com",
        "created_at": datetime.now()
    }
    result = await collection.insert_one(user_doc)
    
    # 异步查询
    user = await collection.find_one({"_id": result.inserted_id})
    
    # 异常处理
    try:
        await collection.find_one_and_update(
            {"email": "async@example.com"},
            {"$set": {"updated_at": datetime.now()}}
        )
    except Exception as e:
        print(f"Update error: {e}")
    
    return user

# 监听变更流
def watch_changes():
    collection = db.users
    
    # 监听集合变更
    with collection.watch() as stream:
        for change in stream:
            print(f"Change detected: {change}")
            # 处理变更事件
            if change['operationType'] == 'insert':
                print(f"New document inserted: {change['fullDocument']}")
            elif change['operationType'] == 'update':
                print(f"Document updated: {change['documentKey']}")
            elif change['operationType'] == 'delete':
                print(f"Document deleted: {change['documentKey']}")

7.4 实际应用场景

7.4.1 用户管理系统

from pymongo import MongoClient
from bson import ObjectId
from datetime import datetime
from typing import Optional, List, Dict
import hashlib

class UserManager:
    def __init__(self, connection_string: str, database_name: str):
        self.client = MongoClient(connection_string)
        self.db = self.client[database_name]
        self.collection = self.db.users
        
        # 创建索引
        self.collection.create_index("email", unique=True)
        self.collection.create_index("username", unique=True)
        self.collection.create_index("created_at")
    
    def hash_password(self, password: str) -> str:
        """哈希密码"""
        return hashlib.sha256(password.encode()).hexdigest()
    
    def create_user(self, user_data: Dict) -> str:
        """创建用户"""
        user_data['password'] = self.hash_password(user_data['password'])
        user_data['created_at'] = datetime.now()
        user_data['updated_at'] = datetime.now()
        user_data['is_active'] = True
        
        result = self.collection.insert_one(user_data)
        return str(result.inserted_id)
    
    def get_user_by_email(self, email: str) -> Optional[Dict]:
        """通过邮箱获取用户"""
        user = self.collection.find_one({"email": email})
        return user
    
    def authenticate_user(self, email: str, password: str) -> Optional[Dict]:
        """用户认证"""
        hashed_password = self.hash_password(password)
        user = self.collection.find_one({
            "email": email,
            "password": hashed_password,
            "is_active": True
        })
        return user
    
    def update_user_profile(self, user_id: str, profile_data: Dict) -> int:
        """更新用户资料"""
        result = self.collection.update_one(
            {"_id": ObjectId(user_id)},
            {
                "$set": {
                    **profile_data,
                    "updated_at": datetime.now()
                }
            }
        )
        return result.modified_count
    
    def get_users_paginated(self, page: int = 1, limit: int = 10) -> List[Dict]:
        """分页获取用户"""
        skip = (page - 1) * limit
        users = list(
            self.collection.find({})
            .skip(skip)
            .limit(limit)
            .sort("created_at", -1)
        )
        return users
    
    def get_user_statistics(self) -> Dict:
        """获取用户统计信息"""
        pipeline = [
            {
                "$group": {
                    "_id": None,
                    "total_users": {"$sum": 1},
                    "active_users": {
                        "$sum": {"$cond": [{"$eq": ["$is_active", True]}, 1, 0]}
                    },
                    "avg_age": {"$avg": "$age"},
                    "users_by_city": {
                        "$push": "$address.city"
                    }
                }
            }
        ]
        result = list(self.collection.aggregate(pipeline))
        return result[0] if result else {}

# 使用示例
user_manager = UserManager(
    "mongodb://admin:password@localhost:27017/", 
    "myapp_database"
)

# 创建用户
user_id = user_manager.create_user({
    "username": "john_doe",
    "email": "john@example.com",
    "password": "secure_password",
    "age": 25,
    "address": {
        "city": "北京",
        "district": "朝阳区"
    }
})

# 认证用户
user = user_manager.authenticate_user("john@example.com", "secure_password")
if user:
    print(f"用户认证成功: {user['username']}")

7.4.2 内容管理系统

class ContentManager:
    def __init__(self, connection_string: str, database_name: str):
        self.client = MongoClient(connection_string)
        self.db = self.client[database_name]
        self.posts_collection = self.db.posts
        self.comments_collection = self.db.comments
        
        # 创建索引
        self.posts_collection.create_index("slug", unique=True)
        self.posts_collection.create_index("created_at", expireAfterSeconds=3600*24*30)  # 30天过期
        self.comments_collection.create_index("post_id")
        self.posts_collection.create_index([("title", "text"), ("content", "text")])
    
    def create_post(self, post_data: Dict) -> str:
        """创建文章"""
        post_data['created_at'] = datetime.now()
        post_data['updated_at'] = datetime.now()
        post_data['slug'] = self.generate_slug(post_data['title'])
        post_data['view_count'] = 0
        post_data['like_count'] = 0
        post_data['comment_count'] = 0
        
        result = self.posts_collection.insert_one(post_data)
        return str(result.inserted_id)
    
    def generate_slug(self, title: str) -> str:
        """生成URL友好slug"""
        import re
        slug = re.sub(r'[^\w\s-]', '', title.lower())
        slug = re.sub(r'[-\s]+', '-', slug)
        return slug
    
    def get_post_with_comments(self, post_id: str) -> Optional[Dict]:
        """获取文章及评论"""
        post = self.posts_collection.find_one({"_id": ObjectId(post_id)})
        if not post:
            return None
        
        comments = list(self.comments_collection.find({
            "post_id": ObjectId(post_id)
        }).sort("created_at", -1))
        
        post['comments'] = comments
        return post
    
    def add_comment(self, post_id: str, comment_data: Dict) -> str:
        """添加评论"""
        comment_data['post_id'] = ObjectId(post_id)
        comment_data['created_at'] = datetime.now()
        
        result = self.comments_collection.insert_one(comment_data)
        
        # 更新文章评论计数
        self.posts_collection.update_one(
            {"_id": ObjectId(post_id)},
            {"$inc": {"comment_count": 1}}
        )
        
        return str(result.inserted_id)
    
    def search_posts(self, query: str, page: int = 1, limit: int = 10) -> List[Dict]:
        """搜索文章"""
        skip = (page - 1) * limit
        
        posts = list(
            self.posts_collection.find({
                "$text": {"$search": query}
            })
            .skip(skip)
            .limit(limit)
            .sort([("score", {"$meta": "textScore"}), ("created_at", -1)])
        )
        
        return posts
    
    def get_popular_posts(self, days: int = 7, limit: int = 10) -> List[Dict]:
        """获取热门文章"""
        from datetime import timedelta
        
        cutoff_date = datetime.now() - timedelta(days=days)
        
        pipeline = [
            {
                "$match": {
                    "created_at": {"$gte": cutoff_date}
                }
            },
            {
                "$addFields": {
                    "popularity_score": {
                        "$add": [
                            {"$multiply": ["$view_count", 1]},
                            {"$multiply": ["$like_count", 2]},
                            {"$multiply": ["$comment_count", 3]}
                        ]
                    }
                }
            },
            {
                "$sort": {"popularity_score": -1}
            },
            {
                "$limit": limit
            }
        ]
        
        posts = list(self.posts_collection.aggregate(pipeline))
        return posts

# 使用示例
content_manager = ContentManager(
    "mongodb://admin:password@localhost:27017/",
    "blog_database"
)

# 创建文章
post_id = content_manager.create_post({
    "title": "MongoDB实战指南",
    "content": "MongoDB是一个强大的NoSQL数据库...",
    "author": "张三",
    "tags": ["mongodb", "nosql", "database"],
    "category": "技术"
})

# 添加评论
content_manager.add_comment(post_id, {
    "author": "李四",
    "email": "lisi@example.com",
    "content": "写得很详细,感谢分享!",
    "parent_id": None  # 支持嵌套评论
})

# 搜索文章
search_results = content_manager.search_posts("mongodb")
print(f"找到 {len(search_results)} 篇相关文章")

8. 性能优化

8.1 查询优化

// 优化前的查询
db.users.find({"address.city": "北京", "age": {$gte: 25}})

// 优化后的查询(需要复合索引)
db.users.createIndex({"address.city": 1, "age": 1})
db.users.find({"address.city": "北京", "age": {$gte: 25}})

// 使用hint指定索引
db.users.find({"name": "张三", "age": 30}).hint({"name": 1})

// 避免全表扫描
// 不好:db.users.find({$where: "this.age > 25"})
// 好:db.users.find({age: {$gt: 25}})

// 限制返回字段
db.users.find({}, {name: 1, email: 1, _id: 0})

8.2 索引优化策略

// 复合索引设计原则
// 1. 等值条件字段在前
// 2. 排序字段居中
// 3. 范围条件字段在后

// 好的复合索引顺序
db.orders.createIndex({status: 1, created_at: -1, amount: 1})

// 查询示例
db.orders.find({
  status: "completed",  // 等值条件
  created_at: {$gte: ISODate("2026-01-01")},  // 范围条件
  amount: {$gte: 100}  // 范围条件
}).sort({created_at: -1})  // 排序

// 索引覆盖率查询
db.users.createIndex({name: 1, age: 1, email: 1})
// 以下查询完全由索引覆盖,不需要访问文档
db.users.find({name: "张三"}, {name: 1, age: 1, _id: 0})

8.3 写入性能优化

// 批量写入优化
// 避免逐个插入
for (let i = 0; i < 1000; i++) {
  db.users.insertOne({name: `User${i}`, age: i});
}

// 使用批量插入
let bulkOps = [];
for (let i = 0; i < 1000; i++) {
  bulkOps.push({
    insertOne: {
      document: {name: `User${i}`, age: i}
    }
  });
}
db.users.bulkWrite(bulkOps);

// 写关注选项
db.users.insertOne(
  {name: "张三", age: 25},
  {writeConcern: {w: "majority", j: true}}  // 等待多数节点确认并写入日志
)

9. 集群与分片

9.1 副本集配置

// 副本集配置示例
config = {
  _id: "rs0",
  members: [
    {_id: 0, host: "mongo1:27017"},
    {_id: 1, host: "mongo2:27017"},
    {_id: 2, host: "mongo3:27017"}
  ]
}

// 初始化副本集
rs.initiate(config)

// 查看副本集状态
rs.status()
rs.printSlaveReplicationInfo()

9.2 分片配置

// 启用分片
sh.enableSharding("myapp_database")

// 为集合启用分片
sh.shardCollection("myapp_database.users", {"_id": "hashed"})

// 或使用范围分片
sh.shardCollection("myapp_database.orders", {"user_id": 1, "order_date": -1})

// 添加分片
sh.addShard("shard1/mongo-shard1:27017,mongo-shard2:27017,mongo-shard3:27017")

// 查看分片信息
sh.status()
sh.getBalancerState()

9.3 Python连接集群

# 连接副本集
client = MongoClient(
    "mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
    replicaSet="rs0",
    readPreference="secondaryPreferred"
)

# 连接分片集群
client = MongoClient(
    "mongodb://mongos1:27017,mongos2:27017",
    readPreference="nearest"
)

# 读取偏好设置
from pymongo.read_preferences import ReadPreference

# 主节点读取(默认)
db.collection.find().read_preference(ReadPreference.PRIMARY)

# 从节点读取(降低主节点负载)
db.collection.find().read_preference(ReadPreference.SECONDARY)

# 最近节点读取
db.collection.find().read_preference(ReadPreference.NEAREST)

10. 监控与运维

10.1 性能监控

// 查看当前运行的操作
db.currentOp()

// 查看慢查询
db.setProfilingLevel(2, {slowms: 100})  // 记录超过100ms的查询

// 查看最近的慢查询
db.system.profile.find().sort({ts: -1}).limit(5)

// 查看数据库统计
db.stats()

// 查看集合统计
db.users.stats()

// 查看索引使用情况
db.users.aggregate([{$indexStats: {}}])

10.2 备份与恢复

# 备份整个数据库
mongodump --host localhost:27017 --username admin --password password --db myapp_database --out /backup/

# 恢复数据库
mongorestore --host localhost:27017 --username admin --password password --db myapp_database /backup/myapp_database/

# 备份特定集合
mongodump --collection users --db myapp_database --out /backup/

# 导出特定查询结果
mongoexport --db myapp_database --collection users --query '{"age": {"$gte": 25}}' --out users_export.json

# 导入数据
mongoimport --db myapp_database --collection users --file users_export.json

11. 最佳实践

11.1 设计最佳实践

"""
MongoDB设计最佳实践:
1. 合理的文档结构:嵌套与引用的权衡
2. 索引策略:避免过度索引,关注查询模式
3. 分片键选择:均匀分布数据
4. 读写关注:根据业务需求调整一致性级别
5. 连接池管理:合理配置连接数
"""

# 嵌套vs引用示例
# 嵌套 - 适合数据变化少、查询频繁的场景
user_with_address = {
    "name": "张三",
    "address": {
        "street": "北京路123号",
        "city": "北京",
        "country": "中国"
    }
}

# 引用 - 适合数据变化频繁、共享数据的场景
user_with_address_ref = {
    "name": "张三",
    "address_id": ObjectId("address_object_id")
}

11.2 安全配置

// 创建用户
use admin
db.createUser({
  user: "app_user",
  pwd: "secure_password",
  roles: [
    {role: "readWrite", db: "myapp_database"},
    {role: "read", db: "other_database"}
  ]
})

// 启用身份验证
// 在mongod.conf中设置
security:
  authorization: "enabled"

11.3 应用场景总结

场景说明推荐配置
内容管理博客、新闻、文档存储动态模式,文本索引
实时分析日志、指标、用户行为TTL索引,聚合管道
电商应用商品、订单、用户信息复合索引,分片
IoT数据传感器、设备数据时间序列,TTL索引
移动应用用户数据、配置信息地理索引,压缩存储

总结

MongoDB是一个功能强大、灵活的NoSQL数据库,适用于各种数据存储需求。通过合理的文档设计、索引策略和集群配置,可以构建高性能、可扩展的应用程序。掌握MongoDB的核心概念和最佳实践,能够帮助开发者充分利用其优势,构建出色的数据存储解决方案。