Scrapy数据去重与增量更新完全指南

📂 所属阶段:第二阶段 — 数据流转(数据处理篇)
🔗 相关章节:Pipeline管道实战 · 数据清洗与校验

目录


快速入门场景

做爬虫时最常遇到的两个痛点:

  1. 重复爬取同一页面或内容,浪费带宽、时间和服务器资源;
  2. 全量重爬,无法第一时间捕获新增或变更的数据。

本文会覆盖从简单单机去重带智能调度的增量爬取的完整方案,所有代码均可直接复用,帮你快速落地高性价比的爬虫策略。


Redis指纹去重实现

基础可复用Pipeline

这是最通用的Redis指纹去重方案,支持自定义指纹字段和过期时间。

import hashlib
import redis
from scrapy.exceptions import DropItem
from itemadapter import ItemAdapter

class RedisFingerprintPipeline:
    """基于Redis SHA256指纹的通用去重Pipeline"""
    
    def __init__(self, redis_conf):
        self.redis_conf = redis_conf
        self.redis_conn = None
        self.expire_days = redis_conf.get("expire_days", 7)   # 指纹默认保留7天
        self.dedup_fields = redis_conf.get("dedup_fields", ["url"])  # 默认按URL去重

    @classmethod
    def from_crawler(cls, crawler):
        """从Scrapy项目配置中读取Redis连接参数和去重策略"""
        redis_conf = {
            "host": crawler.settings.get("REDIS_HOST", "localhost"),
            "port": crawler.settings.getint("REDIS_PORT", 6379),
            "db": crawler.settings.getint("REDIS_DB", 0),
            "password": crawler.settings.get("REDIS_PASSWORD"),
            "expire_days": crawler.settings.getint("DUPLICATE_EXPIRE_DAYS", 7),
            "dedup_fields": crawler.settings.getlist("DUPLICATE_DEDUP_FIELDS", ["url"])
        }
        return cls(redis_conf)

    def open_spider(self, spider):
        """爬虫启动时建立Redis连接"""
        self.redis_conn = redis.Redis(
            host=self.redis_conf["host"],
            port=self.redis_conf["port"],
            db=self.redis_conf["db"],
            password=self.redis_conf["password"],
            decode_responses=False   # 哈希值用字节存储更省空间
        )

    def process_item(self, item, spider):
        """核心去重逻辑:计算指纹 -> 检查存在 -> 存入或丢弃"""
        fp = self._gen_fingerprint(item)
        # 检查Redis中是否已有相同指纹
        if self.redis_conn.exists(fp):
            spider.logger.warning(f"检测到重复数据,指纹前缀: {fp.decode()[:16]}")
            raise DropItem(f"Duplicate item: {fp.decode()[:16]}")
        # 新指纹写入Redis并设置过期时间
        self.redis_conn.setex(fp, 86400 * self.expire_days, b"1")
        return item

    def close_spider(self, spider):
        """爬虫关闭时断开Redis连接"""
        if self.redis_conn:
            self.redis_conn.close()

    def _gen_fingerprint(self, item):
        """根据配置的去重字段生成SHA256指纹"""
        adapter = ItemAdapter(item)
        parts = []
        # 按字段名排序,避免因字段顺序不同导致指纹不同
        for field in sorted(self.dedup_fields):
            if field in adapter:
                val = adapter[field]
                if val is not None:
                    parts.append(f"{field}={str(val)}")
        # 若没有配置去重字段,就使用整个item的字典生成指纹
        if not parts:
            item_sorted = str(sorted(adapter.asdict().items()))
            parts.append(item_sorted)
        raw = "|".join(parts).encode("utf-8")
        return hashlib.sha256(raw).hexdigest().encode("utf-8")

核心思路

  • 将Item中你关心的字段(如urltitle)组合成一个固定顺序的字符串;
  • 计算SHA256哈希值作为“数据指纹”;
  • 用Redis的SETEX命令将指纹保存一段时间,下次遇到相同指纹直接丢弃。

配置文件示例

settings.py 中添加如下配置:

# Pipeline启用(数字表示优先级,越小越先执行)
ITEM_PIPELINES = {
    "myproject.pipelines.RedisFingerprintPipeline": 300,
}

# Redis连接与去重配置
REDIS_HOST = "127.0.0.1"
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_PASSWORD = None
DUPLICATE_EXPIRE_DAYS = 7          # 7天后指纹自动清理,防止Redis内存持续增长
DUPLICATE_DEDUP_FIELDS = ["url", "title"]  # 按url+标题组合去重,提高准确性

URL标准化与请求级去重

同一个页面可能以多种不同的URL形式出现:

  • 有的带www,有的不带;
  • 末尾有无斜杠;
  • 带各类追踪参数(utm_sourcegclid等)。

如果直接进行去重,这些URL会被认为是不同的页面,导致重复爬取。因此,在指纹计算前先做URL标准化,能大幅提升去重效果。

URL标准化Pipeline

from urllib.parse import urlparse, parse_qs, urlencode

class URLNormalizationPipeline:
    """URL标准化Pipeline,应在去重Pipeline之前执行"""
    
    def process_item(self, item, spider):
        if "url" not in item:
            return item
        item["url"] = self._normalize(item["url"])
        return item

    def _normalize(self, url):
        """标准化URL的主要步骤"""
        parsed = urlparse(url)
        # 1. 协议、域名统一小写
        scheme = parsed.scheme.lower()
        netloc = parsed.netloc.lower()
        # 2. 路径:移除末尾斜杠(但保留根路径"/")
        path = parsed.path.rstrip("/") if parsed.path != "/" else "/"
        # 3. 查询参数按字母排序
        qs = parse_qs(parsed.query, keep_blank_values=True)
        # 4. 移除常见的追踪参数(这里用前缀匹配)
        tracking_prefixes = ["utm_", "gclid", "fbclid", "ref", "from", "via"]
        filtered_qs = {
            k: v for k, v in qs.items()
            if not any(k.startswith(prefix) for prefix in tracking_prefixes)
        }
        sorted_qs = urlencode(sorted(filtered_qs.items()), doseq=True)
        # 5. 重建标准化URL
        normalized = f"{scheme}://{netloc}{path}"
        if sorted_qs:
            normalized += f"?{sorted_qs}"
        return normalized

优先级调整:在 settings.py 中确保标准化Pipeline在去重Pipeline之前执行,例如:

ITEM_PIPELINES = {
    "myproject.pipelines.URLNormalizationPipeline": 200,  # 先标准化
    "myproject.pipelines.RedisFingerprintPipeline": 300,  # 后去重
}

本地/Redis布隆过滤器优化

布隆过滤器是一种高效的概率型数据结构,用极少的内存快速判断一个元素是否可能存在于集合中。

  • 优点:千万级数据去重时内存开销极小,查询速度极快;
  • 缺点:存在极低的误判率——把一个不存在的元素误判为存在(但反过来不会,已存在的元素绝不会被漏判)。

你可以根据数据规模选择本地内存版布隆过滤器,或者使用Redis提供的布隆过滤器模块。

本地布隆过滤器(适合单机千万级以下)

依赖 mmh3bitarray 库:

pip install mmh3 bitarray
import mmh3
from bitarray import bitarray
from scrapy.exceptions import DropItem

class LocalBloomFilterPipeline:
    """本地内存布隆过滤器Pipeline(注意:应用重启后过滤器会清空)"""
    
    def __init__(self, capacity=10_000_000, error_rate=0.001):
        self.capacity = capacity         # 预期存储的指纹个数
        self.error_rate = error_rate     # 期望的误判率,例如0.001即0.1%
        self._init_bloom()

    def _init_bloom(self):
        # 根据容量和误判率估算位数组大小和哈希函数个数
        # 这里使用预先算好的系数,直接给出常见的对应值,避免公式
        if self.error_rate == 0.001:
            m = int(self.capacity * 14.37)   # 近似系数
            k = 7
        elif self.error_rate == 0.01:
            m = int(self.capacity * 7.21)    # 近似系数
            k = 5
        else:
            # 默认采用安全参数,防止手误配置
            m = int(self.capacity * 14.37)
            k = 7
        self.m = m
        self.k = k
        self.bitarray = bitarray(m)
        self.bitarray.setall(0)

    @classmethod
    def from_crawler(cls, crawler):
        return cls(
            capacity=crawler.settings.getint("BLOOM_CAPACITY", 10_000_000),
            error_rate=crawler.settings.getfloat("BLOOM_ERROR_RATE", 0.001)
        )

    def process_item(self, item, spider):
        # 用url生成指纹,如果item没有url则使用整个item字典
        fp = item.get("url", str(sorted(ItemAdapter(item).asdict().items())))
        # 检查布隆过滤器是否已记录
        for i in range(self.k):
            pos = mmh3.hash(fp, i) % self.m
            if not self.bitarray[pos]:
                break
        else:
            # 所有hash位都已置1,说明极大概率是重复数据
            spider.logger.warning(f"布隆过滤器可能检测到重复: {fp[:50]}")
            raise DropItem(f"Possible duplicate: {fp[:50]}")
        # 将指纹加入布隆过滤器
        for i in range(self.k):
            self.bitarray[mmh3.hash(fp, i) % self.m] = 1
        return item

⚠️ 注意:本地布隆过滤器在进程重启后会丢失所有记录,只适合短期大量去重或对重启不敏感的临时场景。如需持久化,可以考虑将布隆过滤器数据定期保存到磁盘或改用Redis版布隆过滤器。


时间戳+智能增量抓取

增量抓取的核心思路:只抓取上次爬取之后新增或更新的数据,从而避免重复抓取历史内容。

时间戳增量Spider

下面是一个基于Redis记录“上次抓取时间戳”的增量爬虫示例,适合文章、新闻等发布时间明确的数据源。

import scrapy
import redis
from datetime import datetime
from myproject.items import MyItem

class IncrementalSpider(scrapy.Spider):
    name = "incremental_spider"
    allowed_domains = ["example.com"]
    start_urls = ["https://example.com/news"]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.redis_conn = redis.Redis(
            host="127.0.0.1",
            port=6379,
            db=0,
            decode_responses=True   # 时间戳用字符串保存方便对比
        )
        self.last_crawl_key = f"last_crawl:{self.name}"

    def parse(self, response):
        # 1. 获取上次爬取时间(首次运行时默认30天前,确保能抓到近期的历史数据)
        last_crawl_ts = float(
            self.redis_conn.get(self.last_crawl_key) or
            (datetime.now().timestamp() - 86400 * 30)
        )
        # 2. 遍历列表页中的每一条数据
        for news in response.css("div.news-item"):
            pub_time_str = news.css(".pub-time::text").get().strip()
            pub_time_ts = datetime.strptime(pub_time_str, "%Y-%m-%d %H:%M").timestamp()
            if pub_time_ts > last_crawl_ts:   # 只抓比上次时间更新的内容
                detail_url = news.css("a.title::attr(href)").get()
                yield response.follow(detail_url, callback=self.parse_detail)
        # 3. 抓取完成后,更新本次爬取时间
        self.redis_conn.set(self.last_crawl_key, datetime.now().timestamp())

    def parse_detail(self, response):
        """解析详情页,产出Item"""
        item = MyItem()
        item["url"] = response.url
        item["title"] = response.css("h1::text").get().strip()
        item["content"] = "".join(response.css("div.content p::text").getall()).strip()
        yield item

要点解析

  • 时间基准:用Redis存储上一次爬取的完成时间,每次爬完更新;
  • 过滤逻辑:仅处理发布时间晚于该基准的数据;
  • 兜底策略:首次运行时设置一个较旧的初始时间(如30天前),避免遗漏近期刚更新的数据。

核心性能优化

内存LRU缓存 + Redis批量管道

这是最立竿见影的性能优化方案:

  1. 内存LRU缓存:快速过滤最近爬过的热门数据,减少对Redis的频繁访问;
  2. Redis批量管道:将多条指纹写入命令批量发送,减少网络往返次数。
import time

# 在基础Pipeline中增加LRU缓存和批量操作
class OptimizedRedisPipeline(RedisFingerprintPipeline):
    def __init__(self, redis_conf):
        super().__init__(redis_conf)
        self.lru_cache = dict()
        self.lru_max = 10000          # 缓存最多10000条指纹
        self.pending_fps = []         # 待批量写入的指纹
        self.batch_size = 100         # 每累积100条指纹执行一次批量写入

    def process_item(self, item, spider):
        fp = self._gen_fingerprint(item)
        # 1. LRU缓存快速去重
        if fp in self.lru_cache:
            spider.logger.debug("LRU缓存命中,跳过重复")
            raise DropItem("LRU duplicate")
        # 2. 检查Redis
        if self.redis_conn.exists(fp):
            spider.logger.debug("Redis命中,跳过重复")
            raise DropItem("Redis duplicate")
        # 3. 加入LRU缓存(并淘汰最久未使用的条目)
        self.lru_cache[fp] = time.time()
        if len(self.lru_cache) > self.lru_max:
            self.lru_cache.pop(next(iter(self.lru_cache)))
        # 4. 添加到待批量写入列表
        self.pending_fps.append(fp)
        if len(self.pending_fps) >= self.batch_size:
            self._flush_pending(spider)
        return item

    def _flush_pending(self, spider):
        """批量将指纹写入Redis,使用Pipeline管道"""
        if not self.pending_fps:
            return
        pipe = self.redis_conn.pipeline()
        expire = 86400 * self.expire_days
        for fp in self.pending_fps:
            pipe.setex(fp, expire, b"1")
        pipe.execute()
        spider.logger.debug(f"批量存入 {len(self.pending_fps)} 个指纹到Redis")
        self.pending_fps.clear()

    def close_spider(self, spider):
        # 爬虫关闭时确保所有待写入指纹都被处理
        self._flush_pending(spider)
        super().close_spider(spider)

2个高频问题解决

问题1:长时间运行内存溢出

原因:LRU缓存无限增长或Redis指纹过期时间过长,导致内存不断膨胀。

解决方法

  • 调整LRU缓存大小,比如根据服务器内存情况设置 lru_max = 5000
  • 缩短Redis指纹过期时间 DUPLICATE_EXPIRE_DAYS = 3
  • 定期(如每周)人工清理Redis去重库,或者使用Redis的FLUSHDB命令。

问题2:Redis连接超时/断开

原因:网络抖动或Redis服务负载过高。

解决方法:在创建Redis连接时增加超时、重试和健康检查配置。

from redis.backoff import ExponentialBackoff
from redis.retry import Retry

def open_spider(self, spider):
    self.redis_conn = redis.Redis(
        host=self.redis_conf["host"],
        port=self.redis_conf["port"],
        db=self.redis_conf["db"],
        password=self.redis_conf["password"],
        decode_responses=False,
        socket_connect_timeout=5,
        socket_timeout=5,
        retry_on_timeout=True,
        retry=Retry(ExponentialBackoff(cap=10), 3),  # 指数退避重试,最多3次
        health_check_interval=30                     # 每30秒检查一次连接健康状态
    )

指数退避重试的意思是:第一次重试等待1秒,第二次等待2秒,第三次等待4秒(最多等待10秒),能有效避免瞬时连接失败导致整个爬虫中断。


💡 核心总结

  1. 简单场景:URL标准化 + SHA256 Redis指纹去重,就能解决大多数重复问题;
  2. 大规模场景:本地布隆过滤器 + Redis批量管道,实现低成本高性能去重;
  3. 增量爬取:优先使用基于时间戳的增量方案,有条件再加入动态调度;
  4. 容错与监控:务必配置Redis重试和健康检查,防止单点故障导致爬虫瘫痪。