#MongoDB 实战教程
#什么是MongoDB?
MongoDB是一个基于文档的开源NoSQL数据库,属于非关系型数据库。它以BSON(Binary JSON)格式存储数据,具有高可扩展性、高性能和灵活的数据模型等特点。
#MongoDB的主要特点:
- 文档存储:使用类似JSON的BSON格式存储数据
- 动态模式:无需预定义表结构,模式灵活
- 高可用性:支持副本集和自动故障转移
- 水平扩展:支持分片集群,可处理海量数据
- 丰富的查询语言:支持复杂的查询和聚合操作
- 索引支持:支持多种类型的索引优化查询
#1. MongoDB安装与配置
#1.1 Docker方式安装
# 拉取MongoDB镜像
docker pull mongo:7.0
# 运行MongoDB容器
docker run -d --name mongodb \
-p 27017:27017 \
-e MONGO_INITDB_ROOT_USERNAME=admin \
-e MONGO_INITDB_ROOT_PASSWORD=password \
-v mongodb_data:/data/db \
mongo:7.0
# 连接到MongoDB容器
docker exec -it mongodb mongosh -u admin -p password#1.2 Docker Compose方式安装
# docker-compose.yml
version: '3.8'
services:
mongodb:
image: mongo:7.0
container_name: mongodb
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: admin
MONGO_INITDB_ROOT_PASSWORD: password
ports:
- "27017:27017"
volumes:
- mongodb_data:/data/db
- ./mongo-init.js:/docker-entrypoint-initdb.d/mongo-init.js
volumes:
mongodb_data:#1.3 配置文件详解
// mongod.conf - MongoDB配置文件
storage:
engine: wiredTiger
wiredTiger:
engineConfig:
cacheSizeGB: 1
dbPath: /var/lib/mongodb
journal:
enabled: true
systemLog:
destination: file
logAppend: true
path: /var/log/mongodb/mongod.log
net:
port: 27017
bindIp: 0.0.0.0
processManagement:
fork: true
pidFilePath: /var/run/mongodb/mongod.pid
setParameter:
enableLocalhostAuthBypass: false
security:
authorization: enabled#2. MongoDB基础概念
#2.1 核心概念
| 概念 | 说明 | 类比关系型数据库 |
|---|---|---|
| Database | 数据库 | Database |
| Collection | 文档集合 | Table |
| Document | 数据记录 | Row |
| Field | 字段 | Column |
| Index | 索引 | Index |
| Cursor | 游标 | ResultSet |
#2.2 BSON数据类型
// BSON支持的数据类型示例
{
// 字符串
name: "张三",
// 整数
age: 25,
// 浮点数
salary: 5000.50,
// 布尔值
isActive: true,
// 日期
createdAt: new Date(),
// ObjectId
_id: ObjectId("507f1f77bcf86cd799439011"),
// 数组
skills: ["Python", "MongoDB", "JavaScript"],
// 嵌套文档
address: {
street: "北京路123号",
city: "广州",
country: "中国"
},
// null值
middleName: null,
// 二进制数据
avatar: BinData(0, "encoded_binary_data_here")
}#3. 数据库操作
#3.1 基本数据库操作
// 连接到MongoDB
use myapp_database
// 查看所有数据库
show dbs
// 查看当前数据库
db.getName()
// 查看当前数据库状态
db.stats()
// 创建数据库(首次插入数据时自动创建)
use ecommerce_db
// 删除数据库
db.dropDatabase()
// 列出所有集合
show collections
// 创建集合
db.createCollection("users")
db.createCollection("products", {
capped: true,
size: 100000,
max: 100
})
// 删除集合
db.products.drop()#3.2 集合管理
// 查看集合信息
db.users.stats()
// 重命名集合
db.users.renameCollection("customers")
// 查看集合大小
db.users.dataSize()
db.users.storageSize()
// 集合验证
db.users.validate()#4. 文档操作
#4.1 插入文档
// 插入单个文档
db.users.insertOne({
name: "张三",
email: "zhangsan@example.com",
age: 25,
hobbies: ["读书", "游泳", "旅行"],
address: {
city: "北京",
district: "朝阳区"
},
createdAt: new Date()
})
// 插入多个文档
db.users.insertMany([
{
name: "李四",
email: "lisi@example.com",
age: 30,
hobbies: ["摄影", "电影"],
address: {
city: "上海",
district: "浦东新区"
}
},
{
name: "王五",
email: "wangwu@example.com",
age: 28,
hobbies: ["音乐", "运动"],
address: {
city: "深圳",
district: "南山区"
}
}
])
// 使用save方法(如果_id存在则更新,否则插入)
db.users.save({
_id: ObjectId("507f1f77bcf86cd799439011"),
name: "赵六",
email: "zhaoliu@example.com"
})#4.2 查询文档
// 查询所有文档
db.users.find()
// 查询单个文档
db.users.findOne({name: "张三"})
// 条件查询
db.users.find({age: {$gt: 25}}) // 年龄大于25
db.users.find({age: {$gte: 25, $lt: 35}}) // 年龄在25-35之间
db.users.find({name: /^张/}) // 名字以"张"开头
db.users.find({hobbies: "读书"}) // 爱好包含"读书"
// 复杂查询
db.users.find({
$and: [
{age: {$gte: 20}},
{age: {$lte: 40}},
{hobbies: {$in: ["读书", "游泳"]}}
]
})
// 投影(只返回指定字段)
db.users.find({}, {name: 1, email: 1, _id: 0})
// 限制结果数量
db.users.find().limit(10)
// 跳过结果
db.users.find().skip(5).limit(10)
// 排序
db.users.find().sort({age: 1}) // 1升序,-1降序
db.users.find().sort({age: -1, name: 1})
// 统计文档数量
db.users.countDocuments({age: {$gte: 25}})
db.users.estimatedDocumentCount() // 估算总数(更快)
// 检查文档是否存在
db.users.findOne({email: "zhangsan@example.com"}) != null#4.3 更新文档
// 更新单个文档
db.users.updateOne(
{name: "张三"}, // 查询条件
{$set: {age: 26}} // 更新操作
)
// 更新多个文档
db.users.updateMany(
{age: {$lt: 30}}, // 查询条件
{$inc: {age: 1}} // 将年龄增加1
)
// 替换整个文档
db.users.replaceOne(
{name: "李四"},
{
name: "李四更新",
email: "lisi_new@example.com",
age: 31,
updatedAt: new Date()
}
)
// upsert操作(如果不存在则插入)
db.users.updateOne(
{email: "newuser@example.com"},
{
$set: {
name: "新用户",
email: "newuser@example.com",
age: 20
}
},
{upsert: true}
)
// 数组操作
db.users.updateOne(
{name: "张三"},
{
$push: {hobbies: "登山"}, // 添加到数组
$pull: {hobbies: "游泳"}, // 从数组移除
$addToSet: {skills: "Python"} // 只添加不存在的元素
}
)
// 原子操作
db.users.updateOne(
{name: "张三"},
{
$inc: {age: 1}, // 原子递增
$mul: {salary: 1.1}, // 原子乘法
$min: {age: 30}, // 只在新值更小时更新
$max: {age: 50} // 只在新值更大时更新
}
)#4.4 删除文档
// 删除单个文档
db.users.deleteOne({name: "赵六"})
// 删除多个文档
db.users.deleteMany({age: {$lt: 20}})
// 删除所有文档
db.users.deleteMany({})
// 删除符合条件的文档并返回被删除的文档
db.users.findOneAndDelete({name: "张三"})#5. 索引管理
#5.1 创建索引
// 创建单字段索引
db.users.createIndex({name: 1}) // 1表示升序,-1表示降序
// 创建复合索引
db.users.createIndex({name: 1, age: -1})
// 创建唯一索引
db.users.createIndex({email: 1}, {unique: true})
// 创建稀疏索引(只索引包含字段的文档)
db.users.createIndex({phone: 1}, {sparse: true})
// 创建TTL索引(自动过期)
db.sessions.createIndex({createdAt: 1}, {expireAfterSeconds: 3600})
// 创建文本索引
db.articles.createIndex({title: "text", content: "text"})
// 创建地理位置索引
db.places.createIndex({location: "2dsphere"})
// 创建多键索引(数组字段)
db.users.createIndex({hobbies: 1})
// 创建部分索引
db.users.createIndex(
{age: 1},
{partialFilterExpression: {status: "active"}}
)
// 创建后台索引(不影响数据库操作)
db.large_collection.createIndex({field: 1}, {background: true})#5.2 索引操作
// 查看集合的所有索引
db.users.getIndexes()
// 查看索引大小
db.users.totalIndexSize()
// 删除索引
db.users.dropIndex("name_1")
db.users.dropIndex({name: 1}) // 通过索引规范删除
db.users.dropIndexes() // 删除所有索引(保留_id索引)
// 重建索引
db.users.reIndex()
// 查看索引使用情况
db.users.aggregate([{$indexStats: {}}])
// 分析查询计划
db.users.find({name: "张三"}).explain("executionStats")#6. 聚合管道
#6.1 基础聚合操作
// 简单聚合
db.orders.aggregate([
{$match: {status: "completed"}},
{$group: {_id: "$customer_id", total_orders: {$sum: 1}}},
{$sort: {total_orders: -1}},
{$limit: 10}
])
// 字段投影
db.users.aggregate([
{$project: {
name: 1,
email: 1,
age: 1,
fullName: {$concat: ["$firstName", " ", "$lastName"]}
}}
])
// 字段重命名
db.users.aggregate([
{$project: {
_id: 0,
customerName: "$name",
customerEmail: "$email"
}}
])
// 条件操作
db.orders.aggregate([
{$project: {
customer_id: 1,
order_amount: 1,
discount_category: {
$switch: {
branches: [
{case: {$gte: ["$order_amount", 1000]}, then: "VIP"},
{case: {$gte: ["$order_amount", 500]}, then: "Premium"},
{case: {$gte: ["$order_amount", 100]}, then: "Regular"}
],
default: "Basic"
}
}
}}
])#6.2 高级聚合操作
// 数组操作
db.posts.aggregate([
{$unwind: "$comments"}, // 展开数组
{$group: {
_id: "$author",
commentCount: {$sum: 1},
avgRating: {$avg: "$comments.rating"}
}}
])
// 查找重复数据
db.users.aggregate([
{$group: {
_id: "$email",
count: {$sum: 1},
docs: {$push: "$$ROOT"}
}},
{$match: {count: {$gt: 1}}}
])
// 连接其他集合
db.orders.aggregate([
{$lookup: {
from: "customers",
localField: "customer_id",
foreignField: "_id",
as: "customer_info"
}},
{$unwind: "$customer_info"},
{$project: {
order_id: 1,
customer_name: "$customer_info.name",
customer_email: "$customer_info.email",
order_date: 1
}}
])
// 窗口函数(MongoDB 4.2+)
db.sales.aggregate([
{$setWindowFields: {
partitionBy: "$region",
sortBy: {sale_date: 1},
output: {
cumulative_sales: {
$sum: "$amount",
window: {$documentNumber: [0, 0]}
}
}
}}
])#6.3 聚合优化
// 优化聚合管道
db.orders.aggregate([
// 尽早过滤数据
{$match: {order_date: {$gte: ISODate("2026-01-01")}}},
// 使用索引友好的操作
{$sort: {customer_id: 1, order_date: -1}},
// 分组操作
{$group: {
_id: "$customer_id",
orders: {$push: "$$ROOT"},
total_spent: {$sum: "$amount"},
order_count: {$sum: 1}
}},
// 限制结果
{$limit: 1000}
])#7. Python与MongoDB集成
#7.1 安装驱动程序
pip install pymongo
pip install motor # 异步驱动
pip install dnspython # 支持SRV记录的连接字符串#7.2 基本连接与操作
from pymongo import MongoClient
from bson import ObjectId
from datetime import datetime
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
# 同步连接
client = MongoClient(
"mongodb://admin:password@localhost:27017/",
serverSelectionTimeoutMS=5000 # 5秒超时
)
# 获取数据库
db = client.myapp_database
users_collection = db.users
# 异步连接
async def get_async_client():
client = AsyncIOMotorClient("mongodb://admin:password@localhost:27017/")
return client
# 插入文档
def insert_user(user_data):
result = users_collection.insert_one(user_data)
return result.inserted_id
# 批量插入
def insert_multiple_users(users_list):
result = users_collection.insert_many(users_list)
return result.inserted_ids
# 查询文档
def find_user_by_email(email):
user = users_collection.find_one({"email": email})
return user
def find_users_by_age(min_age, max_age):
users = list(users_collection.find({
"age": {"$gte": min_age, "$lte": max_age}
}).sort("age", 1))
return users
# 更新文档
def update_user_age(user_id, new_age):
result = users_collection.update_one(
{"_id": ObjectId(user_id)},
{"$set": {"age": new_age, "updatedAt": datetime.now()}}
)
return result.modified_count
# 删除文档
def delete_user(user_id):
result = users_collection.delete_one({"_id": ObjectId(user_id)})
return result.deleted_count
# 聚合查询
def get_user_statistics():
pipeline = [
{"$group": {
"_id": None,
"total_users": {"$sum": 1},
"avg_age": {"$avg": "$age"},
"max_age": {"$max": "$age"},
"min_age": {"$min": "$age"}
}}
]
result = list(users_collection.aggregate(pipeline))
return result[0] if result else {}#7.3 高级功能
# 使用连接池
from pymongo.errors import ConnectionFailure
def create_client_with_pool():
client = MongoClient(
"mongodb://admin:password@localhost:27017/",
maxPoolSize=50, # 最大连接池大小
minPoolSize=10, # 最小连接池大小
maxIdleTimeMS=30000, # 最大空闲时间
serverSelectionTimeoutMS=5000,
connectTimeoutMS=20000,
socketTimeoutMS=20000
)
# 测试连接
try:
client.admin.command('ping')
print("MongoDB connection successful")
except ConnectionFailure:
print("MongoDB server not available")
return client
# 事务操作(MongoDB 4.0+)
def perform_transaction():
client = create_client_with_pool()
db = client.myapp_database
with client.start_session() as session:
with session.start_transaction():
# 扣款操作
result1 = db.accounts.update_one(
{"account_id": "acc1", "balance": {"$gte": 100}},
{"$inc": {"balance": -100}},
session=session
)
if result1.modified_count == 0:
raise Exception("Insufficient funds")
# 入账操作
result2 = db.accounts.update_one(
{"account_id": "acc2"},
{"$inc": {"balance": 100}},
session=session
)
print("Transaction completed successfully")
# 异步操作
async def async_operations():
client = AsyncIOMotorClient("mongodb://admin:password@localhost:27017/")
db = client.myapp_database
collection = db.users
# 异步插入
user_doc = {
"name": "异步用户",
"email": "async@example.com",
"created_at": datetime.now()
}
result = await collection.insert_one(user_doc)
# 异步查询
user = await collection.find_one({"_id": result.inserted_id})
# 异常处理
try:
await collection.find_one_and_update(
{"email": "async@example.com"},
{"$set": {"updated_at": datetime.now()}}
)
except Exception as e:
print(f"Update error: {e}")
return user
# 监听变更流
def watch_changes():
collection = db.users
# 监听集合变更
with collection.watch() as stream:
for change in stream:
print(f"Change detected: {change}")
# 处理变更事件
if change['operationType'] == 'insert':
print(f"New document inserted: {change['fullDocument']}")
elif change['operationType'] == 'update':
print(f"Document updated: {change['documentKey']}")
elif change['operationType'] == 'delete':
print(f"Document deleted: {change['documentKey']}")#7.4 实际应用场景
#7.4.1 用户管理系统
from pymongo import MongoClient
from bson import ObjectId
from datetime import datetime
from typing import Optional, List, Dict
import hashlib
class UserManager:
def __init__(self, connection_string: str, database_name: str):
self.client = MongoClient(connection_string)
self.db = self.client[database_name]
self.collection = self.db.users
# 创建索引
self.collection.create_index("email", unique=True)
self.collection.create_index("username", unique=True)
self.collection.create_index("created_at")
def hash_password(self, password: str) -> str:
"""哈希密码"""
return hashlib.sha256(password.encode()).hexdigest()
def create_user(self, user_data: Dict) -> str:
"""创建用户"""
user_data['password'] = self.hash_password(user_data['password'])
user_data['created_at'] = datetime.now()
user_data['updated_at'] = datetime.now()
user_data['is_active'] = True
result = self.collection.insert_one(user_data)
return str(result.inserted_id)
def get_user_by_email(self, email: str) -> Optional[Dict]:
"""通过邮箱获取用户"""
user = self.collection.find_one({"email": email})
return user
def authenticate_user(self, email: str, password: str) -> Optional[Dict]:
"""用户认证"""
hashed_password = self.hash_password(password)
user = self.collection.find_one({
"email": email,
"password": hashed_password,
"is_active": True
})
return user
def update_user_profile(self, user_id: str, profile_data: Dict) -> int:
"""更新用户资料"""
result = self.collection.update_one(
{"_id": ObjectId(user_id)},
{
"$set": {
**profile_data,
"updated_at": datetime.now()
}
}
)
return result.modified_count
def get_users_paginated(self, page: int = 1, limit: int = 10) -> List[Dict]:
"""分页获取用户"""
skip = (page - 1) * limit
users = list(
self.collection.find({})
.skip(skip)
.limit(limit)
.sort("created_at", -1)
)
return users
def get_user_statistics(self) -> Dict:
"""获取用户统计信息"""
pipeline = [
{
"$group": {
"_id": None,
"total_users": {"$sum": 1},
"active_users": {
"$sum": {"$cond": [{"$eq": ["$is_active", True]}, 1, 0]}
},
"avg_age": {"$avg": "$age"},
"users_by_city": {
"$push": "$address.city"
}
}
}
]
result = list(self.collection.aggregate(pipeline))
return result[0] if result else {}
# 使用示例
user_manager = UserManager(
"mongodb://admin:password@localhost:27017/",
"myapp_database"
)
# 创建用户
user_id = user_manager.create_user({
"username": "john_doe",
"email": "john@example.com",
"password": "secure_password",
"age": 25,
"address": {
"city": "北京",
"district": "朝阳区"
}
})
# 认证用户
user = user_manager.authenticate_user("john@example.com", "secure_password")
if user:
print(f"用户认证成功: {user['username']}")#7.4.2 内容管理系统
class ContentManager:
def __init__(self, connection_string: str, database_name: str):
self.client = MongoClient(connection_string)
self.db = self.client[database_name]
self.posts_collection = self.db.posts
self.comments_collection = self.db.comments
# 创建索引
self.posts_collection.create_index("slug", unique=True)
self.posts_collection.create_index("created_at", expireAfterSeconds=3600*24*30) # 30天过期
self.comments_collection.create_index("post_id")
self.posts_collection.create_index([("title", "text"), ("content", "text")])
def create_post(self, post_data: Dict) -> str:
"""创建文章"""
post_data['created_at'] = datetime.now()
post_data['updated_at'] = datetime.now()
post_data['slug'] = self.generate_slug(post_data['title'])
post_data['view_count'] = 0
post_data['like_count'] = 0
post_data['comment_count'] = 0
result = self.posts_collection.insert_one(post_data)
return str(result.inserted_id)
def generate_slug(self, title: str) -> str:
"""生成URL友好slug"""
import re
slug = re.sub(r'[^\w\s-]', '', title.lower())
slug = re.sub(r'[-\s]+', '-', slug)
return slug
def get_post_with_comments(self, post_id: str) -> Optional[Dict]:
"""获取文章及评论"""
post = self.posts_collection.find_one({"_id": ObjectId(post_id)})
if not post:
return None
comments = list(self.comments_collection.find({
"post_id": ObjectId(post_id)
}).sort("created_at", -1))
post['comments'] = comments
return post
def add_comment(self, post_id: str, comment_data: Dict) -> str:
"""添加评论"""
comment_data['post_id'] = ObjectId(post_id)
comment_data['created_at'] = datetime.now()
result = self.comments_collection.insert_one(comment_data)
# 更新文章评论计数
self.posts_collection.update_one(
{"_id": ObjectId(post_id)},
{"$inc": {"comment_count": 1}}
)
return str(result.inserted_id)
def search_posts(self, query: str, page: int = 1, limit: int = 10) -> List[Dict]:
"""搜索文章"""
skip = (page - 1) * limit
posts = list(
self.posts_collection.find({
"$text": {"$search": query}
})
.skip(skip)
.limit(limit)
.sort([("score", {"$meta": "textScore"}), ("created_at", -1)])
)
return posts
def get_popular_posts(self, days: int = 7, limit: int = 10) -> List[Dict]:
"""获取热门文章"""
from datetime import timedelta
cutoff_date = datetime.now() - timedelta(days=days)
pipeline = [
{
"$match": {
"created_at": {"$gte": cutoff_date}
}
},
{
"$addFields": {
"popularity_score": {
"$add": [
{"$multiply": ["$view_count", 1]},
{"$multiply": ["$like_count", 2]},
{"$multiply": ["$comment_count", 3]}
]
}
}
},
{
"$sort": {"popularity_score": -1}
},
{
"$limit": limit
}
]
posts = list(self.posts_collection.aggregate(pipeline))
return posts
# 使用示例
content_manager = ContentManager(
"mongodb://admin:password@localhost:27017/",
"blog_database"
)
# 创建文章
post_id = content_manager.create_post({
"title": "MongoDB实战指南",
"content": "MongoDB是一个强大的NoSQL数据库...",
"author": "张三",
"tags": ["mongodb", "nosql", "database"],
"category": "技术"
})
# 添加评论
content_manager.add_comment(post_id, {
"author": "李四",
"email": "lisi@example.com",
"content": "写得很详细,感谢分享!",
"parent_id": None # 支持嵌套评论
})
# 搜索文章
search_results = content_manager.search_posts("mongodb")
print(f"找到 {len(search_results)} 篇相关文章")#8. 性能优化
#8.1 查询优化
// 优化前的查询
db.users.find({"address.city": "北京", "age": {$gte: 25}})
// 优化后的查询(需要复合索引)
db.users.createIndex({"address.city": 1, "age": 1})
db.users.find({"address.city": "北京", "age": {$gte: 25}})
// 使用hint指定索引
db.users.find({"name": "张三", "age": 30}).hint({"name": 1})
// 避免全表扫描
// 不好:db.users.find({$where: "this.age > 25"})
// 好:db.users.find({age: {$gt: 25}})
// 限制返回字段
db.users.find({}, {name: 1, email: 1, _id: 0})#8.2 索引优化策略
// 复合索引设计原则
// 1. 等值条件字段在前
// 2. 排序字段居中
// 3. 范围条件字段在后
// 好的复合索引顺序
db.orders.createIndex({status: 1, created_at: -1, amount: 1})
// 查询示例
db.orders.find({
status: "completed", // 等值条件
created_at: {$gte: ISODate("2026-01-01")}, // 范围条件
amount: {$gte: 100} // 范围条件
}).sort({created_at: -1}) // 排序
// 索引覆盖率查询
db.users.createIndex({name: 1, age: 1, email: 1})
// 以下查询完全由索引覆盖,不需要访问文档
db.users.find({name: "张三"}, {name: 1, age: 1, _id: 0})#8.3 写入性能优化
// 批量写入优化
// 避免逐个插入
for (let i = 0; i < 1000; i++) {
db.users.insertOne({name: `User${i}`, age: i});
}
// 使用批量插入
let bulkOps = [];
for (let i = 0; i < 1000; i++) {
bulkOps.push({
insertOne: {
document: {name: `User${i}`, age: i}
}
});
}
db.users.bulkWrite(bulkOps);
// 写关注选项
db.users.insertOne(
{name: "张三", age: 25},
{writeConcern: {w: "majority", j: true}} // 等待多数节点确认并写入日志
)#9. 集群与分片
#9.1 副本集配置
// 副本集配置示例
config = {
_id: "rs0",
members: [
{_id: 0, host: "mongo1:27017"},
{_id: 1, host: "mongo2:27017"},
{_id: 2, host: "mongo3:27017"}
]
}
// 初始化副本集
rs.initiate(config)
// 查看副本集状态
rs.status()
rs.printSlaveReplicationInfo()#9.2 分片配置
// 启用分片
sh.enableSharding("myapp_database")
// 为集合启用分片
sh.shardCollection("myapp_database.users", {"_id": "hashed"})
// 或使用范围分片
sh.shardCollection("myapp_database.orders", {"user_id": 1, "order_date": -1})
// 添加分片
sh.addShard("shard1/mongo-shard1:27017,mongo-shard2:27017,mongo-shard3:27017")
// 查看分片信息
sh.status()
sh.getBalancerState()#9.3 Python连接集群
# 连接副本集
client = MongoClient(
"mongodb://mongo1:27017,mongo2:27017,mongo3:27017",
replicaSet="rs0",
readPreference="secondaryPreferred"
)
# 连接分片集群
client = MongoClient(
"mongodb://mongos1:27017,mongos2:27017",
readPreference="nearest"
)
# 读取偏好设置
from pymongo.read_preferences import ReadPreference
# 主节点读取(默认)
db.collection.find().read_preference(ReadPreference.PRIMARY)
# 从节点读取(降低主节点负载)
db.collection.find().read_preference(ReadPreference.SECONDARY)
# 最近节点读取
db.collection.find().read_preference(ReadPreference.NEAREST)#10. 监控与运维
#10.1 性能监控
// 查看当前运行的操作
db.currentOp()
// 查看慢查询
db.setProfilingLevel(2, {slowms: 100}) // 记录超过100ms的查询
// 查看最近的慢查询
db.system.profile.find().sort({ts: -1}).limit(5)
// 查看数据库统计
db.stats()
// 查看集合统计
db.users.stats()
// 查看索引使用情况
db.users.aggregate([{$indexStats: {}}])#10.2 备份与恢复
# 备份整个数据库
mongodump --host localhost:27017 --username admin --password password --db myapp_database --out /backup/
# 恢复数据库
mongorestore --host localhost:27017 --username admin --password password --db myapp_database /backup/myapp_database/
# 备份特定集合
mongodump --collection users --db myapp_database --out /backup/
# 导出特定查询结果
mongoexport --db myapp_database --collection users --query '{"age": {"$gte": 25}}' --out users_export.json
# 导入数据
mongoimport --db myapp_database --collection users --file users_export.json#11. 最佳实践
#11.1 设计最佳实践
"""
MongoDB设计最佳实践:
1. 合理的文档结构:嵌套与引用的权衡
2. 索引策略:避免过度索引,关注查询模式
3. 分片键选择:均匀分布数据
4. 读写关注:根据业务需求调整一致性级别
5. 连接池管理:合理配置连接数
"""
# 嵌套vs引用示例
# 嵌套 - 适合数据变化少、查询频繁的场景
user_with_address = {
"name": "张三",
"address": {
"street": "北京路123号",
"city": "北京",
"country": "中国"
}
}
# 引用 - 适合数据变化频繁、共享数据的场景
user_with_address_ref = {
"name": "张三",
"address_id": ObjectId("address_object_id")
}#11.2 安全配置
// 创建用户
use admin
db.createUser({
user: "app_user",
pwd: "secure_password",
roles: [
{role: "readWrite", db: "myapp_database"},
{role: "read", db: "other_database"}
]
})
// 启用身份验证
// 在mongod.conf中设置
security:
authorization: "enabled"#11.3 应用场景总结
| 场景 | 说明 | 推荐配置 |
|---|---|---|
| 内容管理 | 博客、新闻、文档存储 | 动态模式,文本索引 |
| 实时分析 | 日志、指标、用户行为 | TTL索引,聚合管道 |
| 电商应用 | 商品、订单、用户信息 | 复合索引,分片 |
| IoT数据 | 传感器、设备数据 | 时间序列,TTL索引 |
| 移动应用 | 用户数据、配置信息 | 地理索引,压缩存储 |
#总结
MongoDB是一个功能强大、灵活的NoSQL数据库,适用于各种数据存储需求。通过合理的文档设计、索引策略和集群配置,可以构建高性能、可扩展的应用程序。掌握MongoDB的核心概念和最佳实践,能够帮助开发者充分利用其优势,构建出色的数据存储解决方案。

