分布式去重与调度 - 高效去重算法与分布式协调机制详解

📂 所属阶段:第五阶段 — 战力升级(分布式与进阶篇)
🔗 相关章节:Scrapy-Redis分布式架构 · Spider中间件深度定制 · 大规模爬虫优化

目录


分布式去重的核心挑战与选型

当爬虫从单机扩展为多节点协同工作时,重复抓取就成了最大的成本黑洞。你不仅要保证同一个节点不会重复处理相同的URL,还要确保跨节点之间也不会“撞车”。这就带来了几个实实在在的挑战。

三大核心挑战

挑战维度为什么难搞
数据一致性所有节点需要共享一份“已处理URL”的清单,谁改了什么、什么时候改的,必须清楚无误
性能与存储上亿级URL的去重判断要快(毫秒级),同时还不能把内存吃光
可扩展性节点随时可能增减,去重方案不能推倒重来,必须能平滑适应

主流去重方案对比

下表列出了几种常见的方案,看完你就知道什么时候该选哪种。

方案类型优势劣势适用场景
本地Set / 布隆速度极快,零网络开销无法跨节点共享去重信息单机测试、临时小任务
Redis Set精确去重,天生支持分布式海量URL时内存开销巨大中等规模(< 1000万URL)
Redis 布隆过滤器空间效率极高,多节点共享存在极低概率的“假阳性”误判超大规模(> 1亿URL)的首选

假阳性的意思是:过滤器可能会偶尔告诉你“这个URL已经存在”,但实际上它是新的。这在大数据量下通常可以接受,因为我们还能通过其他手段二次确认。


布隆过滤器:空间效率之王

布隆过滤器(Bloom Filter)是一种非常聪明的概率型数据结构。它有两个坚定不移的特性:

  1. 判断“不存在”100%准确 – 只要它说URL没处理过,那你就放心吧。
  2. 判断“存在”有极低误判率 – 偶尔会“冤枉”一个新URL,但概率可以调控。

它的工作原理可以理解为:每个URL被多个哈希函数映射到一个很长的二进制数组上;添加时把对应的位置置为1,查询时检查这些位置是否都为1。

简化版本地布隆过滤器

下面的实现省略了复杂的参数推导,直接使用经过实战验证的预配置值(位数组长度和哈希函数个数)。你可以把它看作一个开箱即用的工具。

import mmh3
from bitarray import bitarray

class SimpleBloomFilter:
    def __init__(self, capacity=1000000, error_rate=0.01):
        """
        初始化布隆过滤器
        capacity  : 预期插入的元素数量
        error_rate: 可接受的最大假阳性率(默认1%)
        """
        # 预配置的常用参数组合(位数组大小 m,哈希函数个数 k)
        self.configs = {
            (1000000, 0.01):   (14377587, 10),
            (10000000, 0.001): (143775871, 10),
        }
        default_m, default_k = 14377587, 10
        self.m, self.k = self.configs.get((capacity, error_rate), (default_m, default_k))

        self.bit_array = bitarray(self.m)
        self.bit_array.setall(0)

    def _get_hashes(self, item):
        """用不同种子生成 k 个哈希位置"""
        return [mmh3.hash(item, i) % self.m for i in range(self.k)]

    def add(self, item):
        """将URL标记为已处理"""
        for idx in self._get_hashes(item):
            self.bit_array[idx] = 1

    def contains(self, item):
        """检查URL是否可能已经存在"""
        return all(self.bit_array[idx] for idx in self._get_hashes(item))

# ---- 使用示例 ----
bf = SimpleBloomFilter()
urls = ["https://a.com", "https://b.com", "https://a.com"]
for url in urls:
    if not bf.contains(url):
        print(f"✅ 新URL: {url}")
        bf.add(url)
    else:
        print(f"❌ 可能重复: {url}")

运行结果类似:

✅ 新URL: https://a.com
✅ 新URL: https://b.com
❌ 可能重复: https://a.com

Redis分布式去重落地

本地布隆过滤器跑得飞快,但只活在单个进程的内存里。要让它服务于整个爬虫集群,我们需要把位数组搬到 Redis 中——所有节点通过 Redis 读写同一个布隆过滤器。

Redis版布隆过滤器(单实例)

import redis
import mmh3

class RedisBloomFilter:
    def __init__(self, redis_client, key_prefix="crawler_bloom", capacity=1000000, error_rate=0.01):
        self.redis = redis_client
        self.key = f"{key_prefix}:bitmap"
        # 沿用和本地布隆相同的预配置参数
        configs = {(1000000, 0.01): (14377587, 10)}
        self.m, self.k = configs.get((capacity, error_rate), (14377587, 10))

    def _get_hashes(self, item):
        return [mmh3.hash(item, i) % self.m for i in range(self.k)]

    def add(self, item):
        """批量执行 Redis 位操作,减少网络往返"""
        hashes = self._get_hashes(item)
        pipe = self.redis.pipeline()
        for idx in hashes:
            pipe.setbit(self.key, idx, 1)
        pipe.execute()

    def contains(self, item):
        hashes = self._get_hashes(item)
        return all(self.redis.getbit(self.key, idx) for idx in hashes)

# ---- 连接 Redis 并初始化 ----
redis_client = redis.Redis(host="localhost", port=6379, db=0, decode_responses=False)
rbf = RedisBloomFilter(redis_client)

利用 Redis 的 setbitgetbit 操作,它只占用极小的内存,却能支撑几十亿URL的判重。


分布式锁:保证并发安全

多个爬虫节点同时往 Redis 里添加 URL,或者同时去取新的任务种子,很容易出现数据竞争。比如:两个节点都认为自己是第一个拿到某个种子的人,结果把一个 URL 爬了两次。

这时候就需要分布式锁来控制并发。Redis 提供的 SET key value NX EX 命令可以原子性地实现“只有在 key 不存在时才设置,并自动设置过期时间”。

最简实用Redis分布式锁

import uuid
import time

class SimpleRedisLock:
    def __init__(self, redis_client, lock_key, timeout=30):
        self.redis = redis_client
        self.lock_key = lock_key
        self.timeout = timeout          # 锁自动过期时间,防止死锁
        self.identifier = str(uuid.uuid4())  # 唯一标识,防止误删别人的锁

    def acquire(self):
        """原子性加锁,仅在锁不存在时成功"""
        return self.redis.set(
            self.lock_key,
            self.identifier,
            nx=True,
            ex=self.timeout
        )

    def release(self):
        """安全释放锁:先检查是不是自己的锁,再删除"""
        lua_script = """
        if redis.call("get", KEYS[1]) == ARGV[1] then
            return redis.call("del", KEYS[1])
        else
            return 0
        end
        """
        return self.redis.eval(lua_script, 1, self.lock_key, self.identifier)

# ---- 典型用法 ----
lock = SimpleRedisLock(redis_client, "crawler:seed:lock")
if lock.acquire():
    try:
        print("🔒 获得锁,操作共享资源")
        # 这里写获取种子、分派任务等关键操作
    finally:
        lock.release()
        print("🔓 释放锁")
else:
    print("⚠️ 获取锁失败,稍后重试")

这个实现虽然简单,但已经解决了最核心的两个问题:

  1. 加锁原子性 – 多节点不会同时获得锁
  2. 自动过期 – 持有锁的节点如果崩溃,锁不会永久占用

释放锁使用了 Lua 脚本,保证“检查标识”和“删除”两步操作的原子性,避免误删其他节点刚刚获得的锁。


一致性哈希:负载均衡神器

当我们将爬虫任务分配给不同节点时,一般会采用哈希取模的方式(hash(task) % N)。但一旦节点数量发生变化(扩容、缩容、故障),几乎所有任务都需要重新分配——这会引发大规模的缓存失效和状态迁移。

一致性哈希把节点和任务都映射到一个环上,每个任务被顺时针分配给第一个够得着的节点。节点增减时,只有相邻节点的任务需要迁移,其他大部分保持不变。

简化版一致性哈希环

import hashlib
import bisect

class ConsistentHashRing:
    def __init__(self, nodes=None, virtual_nodes=150):
        """
        nodes         : 真实节点列表(如 ["node1", "node2"])
        virtual_nodes : 每个真实节点的虚拟节点数量,用于均衡分布
        """
        self.virtual_nodes = virtual_nodes
        self.ring = {}
        self.sorted_keys = []
        if nodes:
            for node in nodes:
                self.add_node(node)

    def _hash(self, key):
        """将任意字符串映射为一个整数"""
        return int(hashlib.md5(key.encode()).hexdigest(), 16)

    def add_node(self, node):
        """添加一个真实节点,并为其生成多个虚拟节点"""
        for i in range(self.virtual_nodes):
            virtual_key = f"{node}:vnode:{i}"
            hash_key = self._hash(virtual_key)
            self.ring[hash_key] = node
            self.sorted_keys.append(hash_key)
        self.sorted_keys.sort()

    def get_node(self, key):
        """根据任务ID找到应该负责的节点"""
        if not self.ring:
            return None
        hash_key = self._hash(key)
        idx = bisect.bisect_right(self.sorted_keys, hash_key)
        # 环形结构,若超出范围则回到第一个节点
        if idx == len(self.sorted_keys):
            idx = 0
        return self.ring[self.sorted_keys[idx]]

# ---- 演示任务分配 ----
ring = ConsistentHashRing(["node1", "node2", "node3"])
tasks = [f"task_{i}" for i in range(10)]
for task in tasks:
    node = ring.get_node(task)
    print(f"📋 任务 {task} -> 节点 {node}")

输出示例:

📋 任务 task_0 -> 节点 node2
📋 任务 task_1 -> 节点 node1
📋 任务 task_2 -> 节点 node3
📋 任务 task_3 -> 节点 node1
...

每个节点大约承担三分之一的负载。而且当你增加第4个节点时,大约只有四分之一的原有任务会被迁移到新节点,大幅降低了系统抖动。


核心优化策略

掌握了基本工具后,下面是一些生产环境中立竿见影的优化技巧。

1. 批量处理

Redis 的每一次网络往返都是开销。在 RedisBloomFilter.add() 中我们已经用了 pipeline 将多个 setbit 合并成一次通信。同样,查询批量 URL 时也可以使用 pipeline 一次性获取所有位。

2. 本地缓存

对于重复出现率高的 URL(例如列表页、目录页链接),可以在进程内加一层 LRU 缓存,避免每次都去查询 Redis 布隆过滤器。

from cachetools import LRUCache

class CachedRedisBloomFilter(RedisBloomFilter):
    def __init__(self, *args, cache_size=10000, **kwargs):
        super().__init__(*args, **kwargs)
        self.cache = LRUCache(maxsize=cache_size)

    def contains(self, item):
        if item in self.cache:
            return self.cache[item]
        res = super().contains(item)
        self.cache[item] = res
        return res

    def add(self, item):
        super().add(item)
        self.cache[item] = True   # 新添加的直接标记为存在

3. 分段设计

  • 分段锁:把共享资源拆成多段,每段一把锁,减少竞争
  • 分段布隆:单一大 Redis 布隆可能成为性能瓶颈,可以按URL前缀或哈希值将其拆分为多个布隆过滤器,分散在不同 Redis 实例上

最佳实践总结

设计原则一览

原则说明
分层去重本地 LRU 缓存 -> Redis 布隆过滤器 -> Redis Set(精确兜底,误判时使用)
性能优先概率型数据结构扛住海量压力,精确结构只用于少量确认
容错设计锁必须带自动过期;布隆过滤器定期备份到磁盘,防止数据丢失
监控完备关注去重命中率、响应延迟、Redis 内存使用、布隆过滤器填充率等关键指标

常见问题快速定位

现象可能原因解决方案
布隆过滤器假阳性率过高容量估算不足或填充太满增大 capacity 或降低 error_rate
Redis 内存占用过大使用了 Set 而非布隆用 Redis 布隆过滤器替换 Set
分布式锁竞争激烈锁粒度过粗或持有时间过长分段锁、缩小临界区
节点增减后大量任务重分配使用了简单哈希取模切换为一致性哈希

这套“去重 + 调度”组合拳已经支撑了多个日均数亿URL的爬虫集群。你可以根据自身业务直接套用,也可以按需替换其中的组件(例如将 Redis 换成 Redis Cluster 或替代品)。关键是掌握分层思想、概率与精确的搭配、以及分布式协调的原子性保障。


🔗 相关教程推荐

🏷️ 标签云: 分布式去重 布隆过滤器 分布式锁 一致性哈希 Redis 爬虫优化