Redis 实战教程


什么是Redis?

Redis(Remote Dictionary Server)是一个开源的内存数据结构存储系统,可以用作数据库、缓存和消息中间件。它支持多种数据结构,如字符串(strings)、哈希(hashes)、列表(lists)、集合(sets)、有序集合(sorted sets)等。

Redis的主要特点:

  • 高性能:基于内存操作,读写速度极快
  • 丰富的数据结构:支持多种数据类型
  • 持久化:支持RDB和AOF两种持久化方式
  • 主从复制:支持数据复制和故障恢复
  • 集群支持:支持分布式集群模式
  • 事务支持:支持简单的事务操作

1. Redis安装与配置

1.1 Windows环境下安装

Windows环境下可以通过WSL(Windows Subsystem for Linux)安装Redis:

# 使用WSL Ubuntu
sudo apt update
sudo apt install redis-server

# 启动Redis服务
sudo systemctl start redis-server
sudo systemctl enable redis-server

# 检查Redis状态
sudo systemctl status redis-server

1.2 Docker方式安装

# 拉取Redis镜像
docker pull redis:latest

# 运行Redis容器
docker run -d --name my-redis -p 6379:6379 redis:latest

# 指定配置文件运行
docker run -d --name my-redis -p 6379:6379 -v /path/to/redis.conf:/etc/redis/redis.conf redis:latest redis-server /etc/redis/redis.conf

1.3 Redis配置文件详解

# 基本配置
bind 127.0.0.1  # 绑定IP地址
port 6379       # 端口号
timeout 300     # 客户端闲置多久后关闭连接
tcp-keepalive 60 # TCP keepalive时间

# 日志配置
loglevel notice # 日志级别
logfile /var/log/redis/redis-server.log

# 数据库配置
databases 16    # 数据库数量

# RDB持久化配置
save 900 1      # 900秒内至少有1个key发生变化则保存
save 300 10     # 300秒内至少有10个key发生变化则保存
save 60 10000   # 60秒内至少有10000个key发生变化则保存
dbfilename dump.rdb # RDB文件名
dir /var/lib/redis  # RDB文件存放目录

# 安全配置
requirepass yourpassword # 设置密码
maxclients 10000         # 最大客户端连接数
maxmemory 2gb            # 最大内存限制
maxmemory-policy allkeys-lru # 内存淘汰策略

2. Redis数据类型与操作

2.1 字符串(String)

# 设置键值对
SET name "John Doe"
SET age 25 EX 3600 NX  # 设置过期时间1小时,仅当key不存在时设置

# 获取值
GET name

# 数值操作
INCR counter      # 自增1
DECR counter      # 自减1
INCRBY counter 5  # 自增5
DECRBY counter 2  # 自减2

# 批量操作
MSET key1 value1 key2 value2
MGET key1 key2

# 追加字符串
APPEND name " Jr."  # 在原字符串后追加

2.2 哈希(Hash)

# 设置哈希字段
HSET user:1000 name "Alice" age 30 email "alice@example.com"

# 获取哈希字段
HGET user:1000 name
HMGET user:1000 name age email
HGETALL user:1000

# 删除哈希字段
HDEL user:1000 email

# 检查字段是否存在
HEXISTS user:1000 name

# 获取字段数量
HLEN user:1000

2.3 列表(List)

# 从左侧插入
LPUSH mylist "first"
LPUSH mylist "second"

# 从右侧插入
RPUSH mylist "third"
RPUSH mylist "fourth"

# 查看列表
LRANGE mylist 0 -1  # 显示所有元素

# 弹出元素
LPOP mylist         # 从左侧弹出
RPOP mylist         # 从右侧弹出

# 阻塞弹出(用于消息队列)
BLPOP mylist 0      # 阻塞直到有元素可用
BRPOP mylist 0

2.4 集合(Set)

# 添加元素
SADD myset "apple"
SADD myset "banana" "orange"

# 获取所有元素
SMEMBERS myset

# 检查元素是否存在
SISMEMBER myset "apple"

# 随机获取元素
SRANDMEMBER myset 2  # 随机获取2个元素

# 集合运算
SUNION set1 set2     # 并集
SINTER set1 set2     # 交集
SDIFF set1 set2      # 差集

2.5 有序集合(Sorted Set)

# 添加成员(带分数)
ZADD leaderboard 100 "player1"
ZADD leaderboard 95 "player2" 120 "player3"

# 获取排名
ZRANK leaderboard "player2"     # 获取排名(从0开始)
ZREVRANK leaderboard "player2"  # 获取倒序排名

# 获取分数
ZSCORE leaderboard "player1"

# 获取指定范围的成员
ZRANGE leaderboard 0 1 WITHSCORES  # 获取前2名及分数
ZREVRANGE leaderboard 0 1 WITHSCORES # 倒序获取前2名

3. Python与Redis集成

3.1 安装Redis客户端

pip install redis
pip install redis[hiredis]  # 可选:安装hiredis加速

3.2 基本连接与操作

import redis

# 连接Redis
r = redis.Redis(
    host='localhost',
    port=6379,
    db=0,
    password=None,  # 如果设置了密码
    decode_responses=True  # 自动解码响应
)

# 字符串操作
r.set('name', 'Alice')
name = r.get('name')

# 哈希操作
r.hset('user:1000', mapping={
    'name': 'Bob',
    'age': 25,
    'email': 'bob@example.com'
})
user_info = r.hgetall('user:1000')

# 列表操作
r.lpush('tasks', 'task1', 'task2', 'task3')
task = r.rpop('tasks')

# 集合操作
r.sadd('tags', 'python', 'redis', 'web')
all_tags = r.smembers('tags')

# 有序集合操作
r.zadd('scores', {'Alice': 95, 'Bob': 87, 'Charlie': 92})
top_scores = r.zrevrange('scores', 0, 2, withscores=True)

3.3 Redis连接池

import redis

# 创建连接池
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=20
)

# 使用连接池
r = redis.Redis(connection_pool=pool)

# 在FastAPI中的应用
from fastapi import FastAPI
import redis.asyncio as redis_async

app = FastAPI()

@app.on_event("startup")
async def startup_event():
    app.state.redis = await redis_async.Redis(
        host='localhost',
        port=6379,
        db=0,
        decode_responses=True
    )

@app.on_event("shutdown")
async def shutdown_event():
    await app.state.redis.close()

3.4 实际应用场景

3.4.1 缓存实现

import json
from functools import wraps

def cache_result(expire_time=300):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # 生成缓存键
            cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
            
            # 尝试从缓存获取结果
            cached_result = await app.state.redis.get(cache_key)
            if cached_result:
                return json.loads(cached_result)
            
            # 执行函数并缓存结果
            result = await func(*args, **kwargs)
            await app.state.redis.setex(
                cache_key, 
                expire_time, 
                json.dumps(result)
            )
            return result
        return wrapper
    return decorator

# 使用缓存装饰器
@cache_result(expire_time=600)
async def get_user_data(user_id: int):
    # 模拟数据库查询
    return {"id": user_id, "name": f"User{user_id}"}

3.4.2 会话管理

import uuid
import time
from typing import Dict, Any

class SessionManager:
    def __init__(self, redis_client, session_timeout=3600):
        self.redis = redis_client
        self.timeout = session_timeout
    
    async def create_session(self, user_id: str) -> str:
        """创建新会话"""
        session_id = str(uuid.uuid4())
        session_data = {
            "user_id": user_id,
            "created_at": time.time(),
            "last_access": time.time()
        }
        
        await self.redis.hset(f"session:{session_id}", mapping=session_data)
        await self.redis.expire(f"session:{session_id}", self.timeout)
        
        return session_id
    
    async def get_session(self, session_id: str) -> Dict[str, Any]:
        """获取会话数据"""
        session_data = await self.redis.hgetall(f"session:{session_id}")
        if session_data:
            # 更新最后访问时间
            await self.redis.hset(f"session:{session_id}", "last_access", time.time())
            await self.redis.expire(f"session:{session_id}", self.timeout)
        
        return session_data
    
    async def destroy_session(self, session_id: str):
        """销毁会话"""
        await self.redis.delete(f"session:{session_id}")

# 使用示例
session_manager = SessionManager(app.state.redis)

3.4.3 分布式锁

import asyncio
import time
import random

class DistributedLock:
    def __init__(self, redis_client, lock_name: str, timeout: int = 10):
        self.redis = redis_client
        self.lock_name = f"lock:{lock_name}"
        self.timeout = timeout
        self.identifier = str(uuid.uuid4())
    
    async def acquire(self) -> bool:
        """获取锁"""
        end_time = time.time() + self.timeout
        
        while time.time() < end_time:
            # 使用SET命令的NX和EX选项实现原子操作
            if await self.redis.set(
                self.lock_name, 
                self.identifier, 
                nx=True, 
                ex=self.timeout
            ):
                return True
            await asyncio.sleep(0.001)  # 短暂休眠避免忙等待
        
        return False
    
    async def release(self):
        """释放锁"""
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        await self.redis.eval(lua_script, 1, self.lock_name, self.identifier)

# 使用示例
async def critical_section():
    lock = DistributedLock(app.state.redis, "critical_resource", timeout=10)
    
    if await lock.acquire():
        try:
            # 执行临界区代码
            print("执行关键操作...")
            await asyncio.sleep(2)
        finally:
            await lock.release()
    else:
        print("获取锁失败")

4. Redis高级特性

4.1 发布订阅模式

import asyncio
import threading

class RedisPublisherSubscriber:
    def __init__(self, redis_client):
        self.redis = redis_client
    
    async def publish_message(self, channel: str, message: str):
        """发布消息"""
        await self.redis.publish(channel, message)
    
    async def subscribe_channel(self, channel: str):
        """订阅频道"""
        pubsub = self.redis.pubsub()
        await pubsub.subscribe(channel)
        
        async for message in pubsub.listen():
            if message['type'] == 'message':
                print(f"收到消息: {message['data']}")

# 使用示例
publisher_subscriber = RedisPublisherSubscriber(app.state.redis)

# 发布消息
await publisher_subscriber.publish_message("news", "最新新闻")

# 订阅消息(通常在单独的协程中)
async def listen_news():
    await publisher_subscriber.subscribe_channel("news")

4.2 事务操作

# 使用管道(Pipeline)提高性能
pipe = r.pipeline()
pipe.set('key1', 'value1')
pipe.set('key2', 'value2')
pipe.incr('counter')
results = pipe.execute()  # 批量执行

# 使用MULTI/EXEC事务
def transfer_money(redis_client, from_account, to_account, amount):
    pipe = redis_client.pipeline()
    pipe.watch(from_account)  # 监视账户余额
    
    balance = int(redis_client.get(from_account) or 0)
    if balance >= amount:
        pipe.multi()  # 开始事务
        pipe.decrby(from_account, amount)
        pipe.incrby(to_account, amount)
        pipe.execute()  # 执行事务
        return True
    else:
        pipe.unwatch()  # 放弃监视
        return False

4.3 Lua脚本

# 原子操作示例:自增计数器并返回新值
lua_script = """
local current = redis.call('GET', KEYS[1])
if current == false then
    current = 0
else
    current = tonumber(current)
end
current = current + ARGV[1]
redis.call('SET', KEYS[1], current)
return current
"""

# 执行Lua脚本
result = r.eval(lua_script, 1, 'counter', 1)
print(f"计数器新值: {result}")

5. Redis性能优化

5.1 内存优化策略

# 1. 使用合适的数据类型
# 错误:使用字符串存储大量小数据
# 正确:使用哈希存储对象
r.hset('user:1000', 'name', 'Alice')  # 推荐
# 而不是 r.set('user:1000:name', 'Alice') * n

# 2. 合理设置过期时间
r.setex('temp_data', 300, 'temporary value')  # 5分钟后过期

# 3. 批量操作减少网络往返
pipe = r.pipeline()
for i in range(1000):
    pipe.set(f'key:{i}', f'value:{i}')
pipe.execute()

5.2 连接优化

# 连接池配置
pool = redis.ConnectionPool(
    host='localhost',
    port=6379,
    db=0,
    max_connections=50,
    retry_on_timeout=True,
    health_check_interval=30
)

# 监控连接使用情况
def monitor_redis_connections():
    info = r.info('clients')
    print(f"已连接客户端数: {info['connected_clients']}")
    print(f"最大客户端数: {info['maxclients']}")

5.3 持久化策略

# RDB vs AOF 选择
"""
RDB(快照):
- 优点:文件紧凑,恢复速度快
- 缺点:可能丢失最后一次快照后的数据
- 适用:备份、灾难恢复

AOF(追加日志):
- 优点:数据完整性更好,可读性强
- 缺点:文件较大,恢复较慢
- 适用:数据安全性要求高的场景

混合持久化(Redis 4.0+):
- 结合RDB和AOF的优点
- 推荐:生产环境使用
"""

6. Redis集群与高可用

6.1 Redis Sentinel(哨兵模式)

# 配置Sentinel
import redis.sentinel

sentinels = [('localhost', 26379), ('localhost', 26380), ('localhost', 26381)]
sentinel = redis.sentinel.Sentinel(sentinels)

# 获取主节点
master = sentinel.master_for('mymaster', socket_timeout=0.1)
# 获取从节点
slave = sentinel.slave_for('mymaster', socket_timeout=0.1)

# 读写分离
master.set('key', 'value')  # 写入主节点
value = slave.get('key')    # 从节点读取

6.2 Redis Cluster(集群模式)

from redis.cluster import RedisCluster

# 连接Redis集群
startup_nodes = [
    {"host": "127.0.0.1", "port": "7000"},
    {"host": "127.0.0.1", "port": "7001"}
]

rc = RedisCluster(startup_nodes=startup_nodes, decode_responses=True)

# 集群操作
rc.set("key", "value")
result = rc.get("key")

7. Redis监控与运维

7.1 性能监控

def monitor_redis_performance():
    # 获取Redis统计信息
    info = r.info()
    
    print(f"Redis版本: {info['redis_version']}")
    print(f"运行时间: {info['uptime_in_days']}天")
    print(f"已连接客户端: {info['connected_clients']}")
    print(f"内存使用: {info['used_memory_human']}")
    print(f"命中率: {info['keyspace_hits']/(info['keyspace_hits']+info['keyspace_misses'])*100:.2f}%")
    
    # 慢查询日志
    slow_logs = r.slowlog_get(10)  # 获取最近10条慢查询
    for log in slow_logs:
        print(f"慢查询: {log['command']}, 耗时: {log['duration']}微秒")

7.2 备份与恢复

# 手动触发RDB快照
redis-cli BGSAVE

# 查看持久化文件位置
redis-cli CONFIG GET dir
redis-cli CONFIG GET dbfilename

# 恢复数据:将dump.rdb文件复制到Redis数据目录并重启服务

8. Redis最佳实践

8.1 键命名规范

"""
键命名最佳实践:
1. 使用冒号分隔层级:user:profile:1000
2. 包含业务含义:session:user_token_123
3. 考虑过期时间:临时数据设置TTL
4. 避免过长键名:影响内存使用
5. 统一命名风格:团队约定
"""

# 示例键命名
user_profile_key = f"user:profile:{user_id}"
session_key = f"session:{token}"
cache_key = f"cache:{namespace}:{resource_id}"
rate_limit_key = f"rate_limit:{user_id}:{endpoint}"

8.2 安全配置

# redis.conf 安全配置
bind 127.0.0.1                    # 限制绑定IP
protected-mode yes                 # 启用保护模式
requirepass your_strong_password   # 设置强密码
rename-command FLUSHDB ""          # 重命名危险命令
rename-command FLUSHALL ""         # 重命名危险命令
rename-command DEBUG ""            # 重命名调试命令

8.3 应用场景总结

场景推荐数据类型说明
缓存String最常用的缓存场景
会话存储Hash存储用户会话信息
排行榜Sorted Set按分数排序的排行榜
消息队列ListLPUSH/BRPOP实现队列
计数器StringINCR/DECR实现计数
标签系统Set存储不重复的标签
实时统计String/Hash频繁更新的统计数据

总结

Redis是一个功能强大、性能优异的内存数据存储系统。通过合理使用其丰富的数据类型和特性,可以在Web开发中实现高性能的缓存、会话管理、消息队列等功能。掌握Redis的基本操作、高级特性和最佳实践,能够显著提升应用程序的性能和用户体验。