#Redis 实战教程
#什么是Redis?
Redis(Remote Dictionary Server)是一个开源的内存数据结构存储系统,可以用作数据库、缓存和消息中间件。它支持多种数据结构,如字符串(strings)、哈希(hashes)、列表(lists)、集合(sets)、有序集合(sorted sets)等。
#Redis的主要特点:
- 高性能:基于内存操作,读写速度极快
- 丰富的数据结构:支持多种数据类型
- 持久化:支持RDB和AOF两种持久化方式
- 主从复制:支持数据复制和故障恢复
- 集群支持:支持分布式集群模式
- 事务支持:支持简单的事务操作
#1. Redis安装与配置
#1.1 Windows环境下安装
Windows环境下可以通过WSL(Windows Subsystem for Linux)安装Redis:
# 使用WSL Ubuntu
sudo apt update
sudo apt install redis-server
# 启动Redis服务
sudo systemctl start redis-server
sudo systemctl enable redis-server
# 检查Redis状态
sudo systemctl status redis-server#1.2 Docker方式安装
# 拉取Redis镜像
docker pull redis:latest
# 运行Redis容器
docker run -d --name my-redis -p 6379:6379 redis:latest
# 指定配置文件运行
docker run -d --name my-redis -p 6379:6379 -v /path/to/redis.conf:/etc/redis/redis.conf redis:latest redis-server /etc/redis/redis.conf#1.3 Redis配置文件详解
# 基本配置
bind 127.0.0.1 # 绑定IP地址
port 6379 # 端口号
timeout 300 # 客户端闲置多久后关闭连接
tcp-keepalive 60 # TCP keepalive时间
# 日志配置
loglevel notice # 日志级别
logfile /var/log/redis/redis-server.log
# 数据库配置
databases 16 # 数据库数量
# RDB持久化配置
save 900 1 # 900秒内至少有1个key发生变化则保存
save 300 10 # 300秒内至少有10个key发生变化则保存
save 60 10000 # 60秒内至少有10000个key发生变化则保存
dbfilename dump.rdb # RDB文件名
dir /var/lib/redis # RDB文件存放目录
# 安全配置
requirepass yourpassword # 设置密码
maxclients 10000 # 最大客户端连接数
maxmemory 2gb # 最大内存限制
maxmemory-policy allkeys-lru # 内存淘汰策略#2. Redis数据类型与操作
#2.1 字符串(String)
# 设置键值对
SET name "John Doe"
SET age 25 EX 3600 NX # 设置过期时间1小时,仅当key不存在时设置
# 获取值
GET name
# 数值操作
INCR counter # 自增1
DECR counter # 自减1
INCRBY counter 5 # 自增5
DECRBY counter 2 # 自减2
# 批量操作
MSET key1 value1 key2 value2
MGET key1 key2
# 追加字符串
APPEND name " Jr." # 在原字符串后追加#2.2 哈希(Hash)
# 设置哈希字段
HSET user:1000 name "Alice" age 30 email "alice@example.com"
# 获取哈希字段
HGET user:1000 name
HMGET user:1000 name age email
HGETALL user:1000
# 删除哈希字段
HDEL user:1000 email
# 检查字段是否存在
HEXISTS user:1000 name
# 获取字段数量
HLEN user:1000#2.3 列表(List)
# 从左侧插入
LPUSH mylist "first"
LPUSH mylist "second"
# 从右侧插入
RPUSH mylist "third"
RPUSH mylist "fourth"
# 查看列表
LRANGE mylist 0 -1 # 显示所有元素
# 弹出元素
LPOP mylist # 从左侧弹出
RPOP mylist # 从右侧弹出
# 阻塞弹出(用于消息队列)
BLPOP mylist 0 # 阻塞直到有元素可用
BRPOP mylist 0#2.4 集合(Set)
# 添加元素
SADD myset "apple"
SADD myset "banana" "orange"
# 获取所有元素
SMEMBERS myset
# 检查元素是否存在
SISMEMBER myset "apple"
# 随机获取元素
SRANDMEMBER myset 2 # 随机获取2个元素
# 集合运算
SUNION set1 set2 # 并集
SINTER set1 set2 # 交集
SDIFF set1 set2 # 差集#2.5 有序集合(Sorted Set)
# 添加成员(带分数)
ZADD leaderboard 100 "player1"
ZADD leaderboard 95 "player2" 120 "player3"
# 获取排名
ZRANK leaderboard "player2" # 获取排名(从0开始)
ZREVRANK leaderboard "player2" # 获取倒序排名
# 获取分数
ZSCORE leaderboard "player1"
# 获取指定范围的成员
ZRANGE leaderboard 0 1 WITHSCORES # 获取前2名及分数
ZREVRANGE leaderboard 0 1 WITHSCORES # 倒序获取前2名#3. Python与Redis集成
#3.1 安装Redis客户端
pip install redis
pip install redis[hiredis] # 可选:安装hiredis加速#3.2 基本连接与操作
import redis
# 连接Redis
r = redis.Redis(
host='localhost',
port=6379,
db=0,
password=None, # 如果设置了密码
decode_responses=True # 自动解码响应
)
# 字符串操作
r.set('name', 'Alice')
name = r.get('name')
# 哈希操作
r.hset('user:1000', mapping={
'name': 'Bob',
'age': 25,
'email': 'bob@example.com'
})
user_info = r.hgetall('user:1000')
# 列表操作
r.lpush('tasks', 'task1', 'task2', 'task3')
task = r.rpop('tasks')
# 集合操作
r.sadd('tags', 'python', 'redis', 'web')
all_tags = r.smembers('tags')
# 有序集合操作
r.zadd('scores', {'Alice': 95, 'Bob': 87, 'Charlie': 92})
top_scores = r.zrevrange('scores', 0, 2, withscores=True)#3.3 Redis连接池
import redis
# 创建连接池
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=20
)
# 使用连接池
r = redis.Redis(connection_pool=pool)
# 在FastAPI中的应用
from fastapi import FastAPI
import redis.asyncio as redis_async
app = FastAPI()
@app.on_event("startup")
async def startup_event():
app.state.redis = await redis_async.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
@app.on_event("shutdown")
async def shutdown_event():
await app.state.redis.close()#3.4 实际应用场景
#3.4.1 缓存实现
import json
from functools import wraps
def cache_result(expire_time=300):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
# 尝试从缓存获取结果
cached_result = await app.state.redis.get(cache_key)
if cached_result:
return json.loads(cached_result)
# 执行函数并缓存结果
result = await func(*args, **kwargs)
await app.state.redis.setex(
cache_key,
expire_time,
json.dumps(result)
)
return result
return wrapper
return decorator
# 使用缓存装饰器
@cache_result(expire_time=600)
async def get_user_data(user_id: int):
# 模拟数据库查询
return {"id": user_id, "name": f"User{user_id}"}#3.4.2 会话管理
import uuid
import time
from typing import Dict, Any
class SessionManager:
def __init__(self, redis_client, session_timeout=3600):
self.redis = redis_client
self.timeout = session_timeout
async def create_session(self, user_id: str) -> str:
"""创建新会话"""
session_id = str(uuid.uuid4())
session_data = {
"user_id": user_id,
"created_at": time.time(),
"last_access": time.time()
}
await self.redis.hset(f"session:{session_id}", mapping=session_data)
await self.redis.expire(f"session:{session_id}", self.timeout)
return session_id
async def get_session(self, session_id: str) -> Dict[str, Any]:
"""获取会话数据"""
session_data = await self.redis.hgetall(f"session:{session_id}")
if session_data:
# 更新最后访问时间
await self.redis.hset(f"session:{session_id}", "last_access", time.time())
await self.redis.expire(f"session:{session_id}", self.timeout)
return session_data
async def destroy_session(self, session_id: str):
"""销毁会话"""
await self.redis.delete(f"session:{session_id}")
# 使用示例
session_manager = SessionManager(app.state.redis)#3.4.3 分布式锁
import asyncio
import time
import random
class DistributedLock:
def __init__(self, redis_client, lock_name: str, timeout: int = 10):
self.redis = redis_client
self.lock_name = f"lock:{lock_name}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
async def acquire(self) -> bool:
"""获取锁"""
end_time = time.time() + self.timeout
while time.time() < end_time:
# 使用SET命令的NX和EX选项实现原子操作
if await self.redis.set(
self.lock_name,
self.identifier,
nx=True,
ex=self.timeout
):
return True
await asyncio.sleep(0.001) # 短暂休眠避免忙等待
return False
async def release(self):
"""释放锁"""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
await self.redis.eval(lua_script, 1, self.lock_name, self.identifier)
# 使用示例
async def critical_section():
lock = DistributedLock(app.state.redis, "critical_resource", timeout=10)
if await lock.acquire():
try:
# 执行临界区代码
print("执行关键操作...")
await asyncio.sleep(2)
finally:
await lock.release()
else:
print("获取锁失败")#4. Redis高级特性
#4.1 发布订阅模式
import asyncio
import threading
class RedisPublisherSubscriber:
def __init__(self, redis_client):
self.redis = redis_client
async def publish_message(self, channel: str, message: str):
"""发布消息"""
await self.redis.publish(channel, message)
async def subscribe_channel(self, channel: str):
"""订阅频道"""
pubsub = self.redis.pubsub()
await pubsub.subscribe(channel)
async for message in pubsub.listen():
if message['type'] == 'message':
print(f"收到消息: {message['data']}")
# 使用示例
publisher_subscriber = RedisPublisherSubscriber(app.state.redis)
# 发布消息
await publisher_subscriber.publish_message("news", "最新新闻")
# 订阅消息(通常在单独的协程中)
async def listen_news():
await publisher_subscriber.subscribe_channel("news")#4.2 事务操作
# 使用管道(Pipeline)提高性能
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.incr('counter')
results = pipe.execute() # 批量执行
# 使用MULTI/EXEC事务
def transfer_money(redis_client, from_account, to_account, amount):
pipe = redis_client.pipeline()
pipe.watch(from_account) # 监视账户余额
balance = int(redis_client.get(from_account) or 0)
if balance >= amount:
pipe.multi() # 开始事务
pipe.decrby(from_account, amount)
pipe.incrby(to_account, amount)
pipe.execute() # 执行事务
return True
else:
pipe.unwatch() # 放弃监视
return False#4.3 Lua脚本
# 原子操作示例:自增计数器并返回新值
lua_script = """
local current = redis.call('GET', KEYS[1])
if current == false then
current = 0
else
current = tonumber(current)
end
current = current + ARGV[1]
redis.call('SET', KEYS[1], current)
return current
"""
# 执行Lua脚本
result = r.eval(lua_script, 1, 'counter', 1)
print(f"计数器新值: {result}")#5. Redis性能优化
#5.1 内存优化策略
# 1. 使用合适的数据类型
# 错误:使用字符串存储大量小数据
# 正确:使用哈希存储对象
r.hset('user:1000', 'name', 'Alice') # 推荐
# 而不是 r.set('user:1000:name', 'Alice') * n
# 2. 合理设置过期时间
r.setex('temp_data', 300, 'temporary value') # 5分钟后过期
# 3. 批量操作减少网络往返
pipe = r.pipeline()
for i in range(1000):
pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()#5.2 连接优化
# 连接池配置
pool = redis.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50,
retry_on_timeout=True,
health_check_interval=30
)
# 监控连接使用情况
def monitor_redis_connections():
info = r.info('clients')
print(f"已连接客户端数: {info['connected_clients']}")
print(f"最大客户端数: {info['maxclients']}")#5.3 持久化策略
# RDB vs AOF 选择
"""
RDB(快照):
- 优点:文件紧凑,恢复速度快
- 缺点:可能丢失最后一次快照后的数据
- 适用:备份、灾难恢复
AOF(追加日志):
- 优点:数据完整性更好,可读性强
- 缺点:文件较大,恢复较慢
- 适用:数据安全性要求高的场景
混合持久化(Redis 4.0+):
- 结合RDB和AOF的优点
- 推荐:生产环境使用
"""#6. Redis集群与高可用
#6.1 Redis Sentinel(哨兵模式)
# 配置Sentinel
import redis.sentinel
sentinels = [('localhost', 26379), ('localhost', 26380), ('localhost', 26381)]
sentinel = redis.sentinel.Sentinel(sentinels)
# 获取主节点
master = sentinel.master_for('mymaster', socket_timeout=0.1)
# 获取从节点
slave = sentinel.slave_for('mymaster', socket_timeout=0.1)
# 读写分离
master.set('key', 'value') # 写入主节点
value = slave.get('key') # 从节点读取#6.2 Redis Cluster(集群模式)
from redis.cluster import RedisCluster
# 连接Redis集群
startup_nodes = [
{"host": "127.0.0.1", "port": "7000"},
{"host": "127.0.0.1", "port": "7001"}
]
rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)
# 集群操作
rc.set("key", "value")
result = rc.get("key")#7. Redis监控与运维
#7.1 性能监控
def monitor_redis_performance():
# 获取Redis统计信息
info = r.info()
print(f"Redis版本: {info['redis_version']}")
print(f"运行时间: {info['uptime_in_days']}天")
print(f"已连接客户端: {info['connected_clients']}")
print(f"内存使用: {info['used_memory_human']}")
print(f"命中率: {info['keyspace_hits']/(info['keyspace_hits']+info['keyspace_misses'])*100:.2f}%")
# 慢查询日志
slow_logs = r.slowlog_get(10) # 获取最近10条慢查询
for log in slow_logs:
print(f"慢查询: {log['command']}, 耗时: {log['duration']}微秒")#7.2 备份与恢复
# 手动触发RDB快照
redis-cli BGSAVE
# 查看持久化文件位置
redis-cli CONFIG GET dir
redis-cli CONFIG GET dbfilename
# 恢复数据:将dump.rdb文件复制到Redis数据目录并重启服务#8. Redis最佳实践
#8.1 键命名规范
"""
键命名最佳实践:
1. 使用冒号分隔层级:user:profile:1000
2. 包含业务含义:session:user_token_123
3. 考虑过期时间:临时数据设置TTL
4. 避免过长键名:影响内存使用
5. 统一命名风格:团队约定
"""
# 示例键命名
user_profile_key = f"user:profile:{user_id}"
session_key = f"session:{token}"
cache_key = f"cache:{namespace}:{resource_id}"
rate_limit_key = f"rate_limit:{user_id}:{endpoint}"#8.2 安全配置
# redis.conf 安全配置
bind 127.0.0.1 # 限制绑定IP
protected-mode yes # 启用保护模式
requirepass your_strong_password # 设置强密码
rename-command FLUSHDB "" # 重命名危险命令
rename-command FLUSHALL "" # 重命名危险命令
rename-command DEBUG "" # 重命名调试命令#8.3 应用场景总结
| 场景 | 推荐数据类型 | 说明 |
|---|---|---|
| 缓存 | String | 最常用的缓存场景 |
| 会话存储 | Hash | 存储用户会话信息 |
| 排行榜 | Sorted Set | 按分数排序的排行榜 |
| 消息队列 | List | LPUSH/BRPOP实现队列 |
| 计数器 | String | INCR/DECR实现计数 |
| 标签系统 | Set | 存储不重复的标签 |
| 实时统计 | String/Hash | 频繁更新的统计数据 |
#总结
Redis是一个功能强大、性能优异的内存数据存储系统。通过合理使用其丰富的数据类型和特性,可以在Web开发中实现高性能的缓存、会话管理、消息队列等功能。掌握Redis的基本操作、高级特性和最佳实践,能够显著提升应用程序的性能和用户体验。

