分布式去重与调度 - 高效去重算法与分布式协调机制详解
📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:Scrapy-Redis分布式架构 · Spider中间件深度定制 · 大规模爬虫优化
目录
分布式去重的核心挑战与选型
当爬虫从单机扩展为多节点协同工作时,重复抓取就成了最大的成本黑洞。你不仅要保证同一个节点不会重复处理相同的URL,还要确保跨节点之间也不会“撞车”。这就带来了几个实实在在的挑战。
三大核心挑战
主流去重方案对比
下表列出了几种常见的方案,看完你就知道什么时候该选哪种。
假阳性的意思是:过滤器可能会偶尔告诉你“这个URL已经存在”,但实际上它是新的。这在大数据量下通常可以接受,因为我们还能通过其他手段二次确认。
布隆过滤器:空间效率之王
布隆过滤器(Bloom Filter)是一种非常聪明的概率型数据结构。它有两个坚定不移的特性:
- 判断“不存在”100%准确 – 只要它说URL没处理过,那你就放心吧。
- 判断“存在”有极低误判率 – 偶尔会“冤枉”一个新URL,但概率可以调控。
它的工作原理可以理解为:每个URL被多个哈希函数映射到一个很长的二进制数组上;添加时把对应的位置置为1,查询时检查这些位置是否都为1。
简化版本地布隆过滤器
下面的实现省略了复杂的参数推导,直接使用经过实战验证的预配置值(位数组长度和哈希函数个数)。你可以把它看作一个开箱即用的工具。
import mmh3
from bitarray import bitarray
class SimpleBloomFilter:
def __init__(self, capacity=1000000, error_rate=0.01):
"""
初始化布隆过滤器
capacity : 预期插入的元素数量
error_rate: 可接受的最大假阳性率(默认1%)
"""
# 预配置的常用参数组合(位数组大小 m,哈希函数个数 k)
self.configs = {
(1000000, 0.01): (14377587, 10),
(10000000, 0.001): (143775871, 10),
}
default_m, default_k = 14377587, 10
self.m, self.k = self.configs.get((capacity, error_rate), (default_m, default_k))
self.bit_array = bitarray(self.m)
self.bit_array.setall(0)
def _get_hashes(self, item):
"""用不同种子生成 k 个哈希位置"""
return [mmh3.hash(item, i) % self.m for i in range(self.k)]
def add(self, item):
"""将URL标记为已处理"""
for idx in self._get_hashes(item):
self.bit_array[idx] = 1
def contains(self, item):
"""检查URL是否可能已经存在"""
return all(self.bit_array[idx] for idx in self._get_hashes(item))
# ---- 使用示例 ----
bf = SimpleBloomFilter()
urls = ["https://a.com", "https://b.com", "https://a.com"]
for url in urls:
if not bf.contains(url):
print(f"✅ 新URL: {url}")
bf.add(url)
else:
print(f"❌ 可能重复: {url}")
运行结果类似:
✅ 新URL: https://a.com
✅ 新URL: https://b.com
❌ 可能重复: https://a.com
Redis分布式去重落地
本地布隆过滤器跑得飞快,但只活在单个进程的内存里。要让它服务于整个爬虫集群,我们需要把位数组搬到 Redis 中——所有节点通过 Redis 读写同一个布隆过滤器。
Redis版布隆过滤器(单实例)
import redis
import mmh3
class RedisBloomFilter:
def __init__(self, redis_client, key_prefix="crawler_bloom", capacity=1000000, error_rate=0.01):
self.redis = redis_client
self.key = f"{key_prefix}:bitmap"
# 沿用和本地布隆相同的预配置参数
configs = {(1000000, 0.01): (14377587, 10)}
self.m, self.k = configs.get((capacity, error_rate), (14377587, 10))
def _get_hashes(self, item):
return [mmh3.hash(item, i) % self.m for i in range(self.k)]
def add(self, item):
"""批量执行 Redis 位操作,减少网络往返"""
hashes = self._get_hashes(item)
pipe = self.redis.pipeline()
for idx in hashes:
pipe.setbit(self.key, idx, 1)
pipe.execute()
def contains(self, item):
hashes = self._get_hashes(item)
return all(self.redis.getbit(self.key, idx) for idx in hashes)
# ---- 连接 Redis 并初始化 ----
redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=False)
rbf = RedisBloomFilter(redis_client)
利用 Redis 的 setbit 和 getbit 操作,它只占用极小的内存,却能支撑几十亿URL的判重。
分布式锁:保证并发安全
多个爬虫节点同时往 Redis 里添加 URL,或者同时去取新的任务种子,很容易出现数据竞争。比如:两个节点都认为自己是第一个拿到某个种子的人,结果把一个 URL 爬了两次。
这时候就需要分布式锁来控制并发。Redis 提供的 SET key value NX EX 命令可以原子性地实现“只有在 key 不存在时才设置,并自动设置过期时间”。
最简实用Redis分布式锁
import uuid
import time
class SimpleRedisLock:
def __init__(self, redis_client, lock_key, timeout=30):
self.redis = redis_client
self.lock_key = lock_key
self.timeout = timeout # 锁自动过期时间,防止死锁
self.identifier = str(uuid.uuid4()) # 唯一标识,防止误删别人的锁
def acquire(self):
"""原子性加锁,仅在锁不存在时成功"""
return self.redis.set(
self.lock_key,
self.identifier,
nx=True,
ex=self.timeout
)
def release(self):
"""安全释放锁:先检查是不是自己的锁,再删除"""
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
return self.redis.eval(lua_script, 1, self.lock_key, self.identifier)
# ---- 典型用法 ----
lock = SimpleRedisLock(redis_client, "crawler:seed:lock")
if lock.acquire():
try:
print("🔒 获得锁,操作共享资源")
# 这里写获取种子、分派任务等关键操作
finally:
lock.release()
print("🔓 释放锁")
else:
print("⚠️ 获取锁失败,稍后重试")
这个实现虽然简单,但已经解决了最核心的两个问题:
- 加锁原子性 – 多节点不会同时获得锁
- 自动过期 – 持有锁的节点如果崩溃,锁不会永久占用
释放锁使用了 Lua 脚本,保证“检查标识”和“删除”两步操作的原子性,避免误删其他节点刚刚获得的锁。
一致性哈希:负载均衡神器
当我们将爬虫任务分配给不同节点时,一般会采用哈希取模的方式(hash(task) % N)。但一旦节点数量发生变化(扩容、缩容、故障),几乎所有任务都需要重新分配——这会引发大规模的缓存失效和状态迁移。
一致性哈希把节点和任务都映射到一个环上,每个任务被顺时针分配给第一个够得着的节点。节点增减时,只有相邻节点的任务需要迁移,其他大部分保持不变。
简化版一致性哈希环
import hashlib
import bisect
class ConsistentHashRing:
def __init__(self, nodes=None, virtual_nodes=150):
"""
nodes : 真实节点列表(如 ["node1", "node2"])
virtual_nodes : 每个真实节点的虚拟节点数量,用于均衡分布
"""
self.virtual_nodes = virtual_nodes
self.ring = {}
self.sorted_keys = []
if nodes:
for node in nodes:
self.add_node(node)
def _hash(self, key):
"""将任意字符串映射为一个整数"""
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node):
"""添加一个真实节点,并为其生成多个虚拟节点"""
for i in range(self.virtual_nodes):
virtual_key = f"{node}:vnode:{i}"
hash_key = self._hash(virtual_key)
self.ring[hash_key] = node
self.sorted_keys.append(hash_key)
self.sorted_keys.sort()
def get_node(self, key):
"""根据任务ID找到应该负责的节点"""
if not self.ring:
return None
hash_key = self._hash(key)
idx = bisect.bisect_right(self.sorted_keys, hash_key)
# 环形结构,若超出范围则回到第一个节点
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
# ---- 演示任务分配 ----
ring = ConsistentHashRing(["node1", "node2", "node3"])
tasks = [f"task_{i}" for i in range(10)]
for task in tasks:
node = ring.get_node(task)
print(f"📋 任务 {task} -> 节点 {node}")
输出示例:
📋 任务 task_0 -> 节点 node2
📋 任务 task_1 -> 节点 node1
📋 任务 task_2 -> 节点 node3
📋 任务 task_3 -> 节点 node1
...
每个节点大约承担三分之一的负载。而且当你增加第4个节点时,大约只有四分之一的原有任务会被迁移到新节点,大幅降低了系统抖动。
核心优化策略
掌握了基本工具后,下面是一些生产环境中立竿见影的优化技巧。
1. 批量处理
Redis 的每一次网络往返都是开销。在 RedisBloomFilter.add() 中我们已经用了 pipeline 将多个 setbit 合并成一次通信。同样,查询批量 URL 时也可以使用 pipeline 一次性获取所有位。
2. 本地缓存
对于重复出现率高的 URL(例如列表页、目录页链接),可以在进程内加一层 LRU 缓存,避免每次都去查询 Redis 布隆过滤器。
from cachetools import LRUCache
class CachedRedisBloomFilter(RedisBloomFilter):
def __init__(self, *args, cache_size=10000, **kwargs):
super().__init__(*args, **kwargs)
self.cache = LRUCache(maxsize=cache_size)
def contains(self, item):
if item in self.cache:
return self.cache[item]
res = super().contains(item)
self.cache[item] = res
return res
def add(self, item):
super().add(item)
self.cache[item] = True # 新添加的直接标记为存在
3. 分段设计
- 分段锁:把共享资源拆成多段,每段一把锁,减少竞争
- 分段布隆:单一大 Redis 布隆可能成为性能瓶颈,可以按URL前缀或哈希值将其拆分为多个布隆过滤器,分散在不同 Redis 实例上
最佳实践总结
设计原则一览
常见问题快速定位
这套“去重 + 调度”组合拳已经支撑了多个日均数亿URL的爬虫集群。你可以根据自身业务直接套用,也可以按需替换其中的组件(例如将 Redis 换成 Redis Cluster 或替代品)。关键是掌握分层思想、概率与精确的搭配、以及分布式协调的原子性保障。
🔗 相关教程推荐
🏷️ 标签云: 分布式去重 布隆过滤器 分布式锁 一致性哈希 Redis 爬虫优化