Scrapy数据清洗与校验完全指南

📂 所属阶段:第二阶段 — 数据流转(数据处理篇)
🔗 相关章节:Pipeline管道实战 · 数据去重与增量更新

从网页抓回的原始数据,十有八九带着空格、乱码、格式错误甚至重复内容。 如果直接丢进数据库,轻则分析结果偏差,重则污染整个数据仓库。Scrapy 提供了 Pipeline 这套灵活的流水线机制,让我们可以在数据入库前集中完成清洗、校验和去重。这篇文章将带你搭建一条轻量高效的数据生产线,覆盖文本、数值、日期的常见清洗场景,以及必填、范围、一致性校验的核心技巧,同时给出性能优化和避坑建议,帮你轻松搞定 90% 的数据质量问题。


一、Pipeline 数据处理的核心架构

在 Scrapy 中,数据经过 Spider 产出后,会依次流过我们配置的 Pipeline 类。为了让逻辑清晰可维护,推荐按照 “清洗 → 校验 → 去重” 的顺序,用不同的 Pipeline 类各司其职:

# settings.py 配置Pipeline优先级(数字越小优先级越高)
ITEM_PIPELINES = {
    'myproject.pipelines.TextCleaningPipeline': 100,    # 1. 文本清洗
    'myproject.pipelines.NumericDateTimePipeline': 150, # 2. 数值/日期处理
    'myproject.pipelines.CoreValidationPipeline': 200,  # 3. 核心校验
    # 'myproject.pipelines.DuplicatesPipeline': 250,    # 4.(可选)去重
}

通过调整数字,你就能控制数据的处理顺序,而且每个 Pipeline 只做一件事,后期维护和扩展都会非常方便。


二、数据清洗基础实战

2.1 文本数据清洗 —— 最频繁的脏活累活

文本字段通常最容易出现问题:首尾空白、换行符、制表符、HTML 标签残留、实体编码(如  )、Unicode 混乱等等。下面这个 TextCleaningPipeline 几乎可以应对 80% 的文本清洗需求:

import re
import unicodedata
from html import unescape

class TextCleaningPipeline:
    def process_item(self, item, spider):
        # 只清洗声明过的文本字段,避免误改其他字段
        text_fields = getattr(item, 'text_fields', ['title', 'content', 'desc'])

        for field in text_fields:
            if field not in item or not isinstance(item[field], str):
                continue

            raw = item[field]
            # 1. Unicode 标准化(解决全角/半角混用、乱码)
            raw = unicodedata.normalize('NFKC', raw)
            # 2. 解码 HTML 实体(&nbsp; → 空格、&lt; → <)
            raw = unescape(raw)
            # 3. 去除 HTML 标签(轻量场景用正则即可)
            raw = re.sub(r'<[^>]+>', '', raw)
            # 4. 压缩所有空白字符(换行/制表符统一替换为空格)
            raw = re.sub(r'\s+', ' ', raw).strip()

            # 5. 可选:只保留中文、英文、数字和常用标点
            # raw = re.sub(r'[^\w\s\u4e00-\u9fff.,!?;:()""''\[\]{}\-—]', '', raw)

            item[field] = raw
        return item

核心思路:

  • getattr 动态获取 Item 中标记的文本字段,这样不同 Spider 可以灵活指定。
  • 按顺序执行规范化、实体解码、去标签、合并空白,一环套一环,尽力还原出干净的纯文本。
  • 最后预留了“去除非必要字符”的选项,可以按需启用,防止过度清洗。

2.2 数值和日期数据清洗 —— 从字符串到标准类型

要从一堆带符号、单位的字符串中提取出真正的数字和日期,推荐专门写一个 Pipeline 来处理:

from datetime import datetime, timedelta
import re

class NumericDateTimePipeline:
    def __init__(self):
        # 常见日期格式(可按网站实际调整)
        self.date_fmts = ['%Y-%m-%d', '%Y/%m/%d', '%d-%m-%Y', '%b %d, %Y']
        # 相对时间正则("3天前"、"2 hours ago" 等)
        self.relative_re = [
            (r'(\d+)\s*天前', lambda m: datetime.now() - timedelta(days=int(m.group(1)))),
            (r'(\d+)\s*小时前', lambda m: datetime.now() - timedelta(hours=int(m.group(1)))),
            (r'(\d+)\s*days?\s*ago', lambda m: datetime.now() - timedelta(days=int(m.group(1)))),
        ]

    def process_item(self, item, spider):
        num_fields = getattr(item, 'num_fields', ['price', 'rating', 'stock'])
        date_fields = getattr(item, 'date_fields', ['pub_time', 'update_time'])

        for field in num_fields:
            if field not in item:
                continue
            item[field] = self._clean_num(item[field])

        for field in date_fields:
            if field not in item:
                continue
            item[field] = self._clean_date(item[field])

        return item

    def _clean_num(self, raw):
        if isinstance(raw, (int, float)):
            return raw
        if not isinstance(raw, str):
            return None

        # 去除货币符号、单位、"元"、"评分"等干扰字符
        raw = re.sub(r'[¥$,€£₹%\s元个评分]', '', raw)
        try:
            return float(raw) if '.' in raw else int(raw)
        except ValueError:
            return None

    def _clean_date(self, raw):
        if isinstance(raw, datetime):
            return raw
        if not isinstance(raw, str):
            return None

        # 先匹配相对时间(例如 "5天前")
        for pattern, func in self.relative_re:
            match = re.search(pattern, raw, re.IGNORECASE)
            if match:
                return func(match)

        # 再尝试固定格式
        for fmt in self.date_fmts:
            try:
                return datetime.strptime(raw.strip(), fmt)
            except ValueError:
                continue
        return None

这样做的好处:

  • 数字变成 intfloat,日期变成 datetime,后续写入数据库、做数据比较都会非常方便。
  • 相对时间的处理让“刚刚发布”、“3天前”这类表述也能自动转换为准确时间,数据时效性大大提升。
  • 清洗失败的字段会返回 None,不会因为一个错误而中断整个 Item 的处理。

三、数据校验核心实战

数据洗干净了,还得确保它们是“能用”的。校验 Pipeline 负责检查必填字段、数值范围、逻辑一致性,不合格的直接丢弃或打上标记。

from scrapy.exceptions import DropItem

class CoreValidationPipeline:
    def process_item(self, item, spider):
        # 1. 必填字段检查
        required = getattr(item, 'required_fields', ['title', 'price', 'pub_time'])
        missing = [f for f in required if f not in item or item[f] is None]
        if missing:
            raise DropItem(f"Missing required fields: {', '.join(missing)}")

        # 2. 数值范围检查
        range_map = getattr(item, 'range_map', {'price': (0, 1000000), 'rating': (0, 5)})
        for f, (min_v, max_v) in range_map.items():
            if f not in item or not isinstance(item[f], (int, float)):
                continue
            if not (min_v <= item[f] <= max_v):
                spider.logger.warning(f"Field {f} out of range: {item[f]}")
                # 可选:直接丢弃,或者将其限制在边界值
                # item[f] = max(min_v, min(item[f], max_v))

        # 3. 简单一致性检查(如折扣价不能高于原价)
        if 'original_price' in item and 'discount_price' in item:
            if item['original_price'] and item['discount_price'] > item['original_price']:
                spider.logger.warning(
                    f"Discount > original: {item['discount_price']} > {item['original_price']}"
                )
                # 根据业务需要,可以选择 DropItem 或交换两个值

        return item

几个实用的原则:

  • 只丢弃真正无效的数据:对于可以挽救的字段(例如范围超限的值),可以记录警告并修正,而不是直接 DropItem
  • 通过 Item 属性配置:不同 Spider 可以定义不同的 required_fieldsrange_map,一套 Pipeline 多处复用。
  • 日志很重要spider.logger.warning 能让你快速发现数据异常,及时调整清洗策略。

四、性能优化小技巧

Pipeline 是数据流转的咽喉,处理不当很容易成为性能瓶颈。几个简单有效的优化方法:

  1. 避免在 Pipeline 中做 I/O 密集型操作
    不要直接查数据库、发 HTTP 请求。如果需要,把这些操作交给异步线程池,或者放到 Pipeline 之外用消息队列异步完成。

  2. 优先使用原生字符串方法,谨慎引入重量级库
    简单的去空白、替换用 Python 字符串方法最快;复杂模式才用正则;非必要不用 lxml 解析整个 HTML,除非确实需要。

  3. 适时进行垃圾回收
    close_spider 中或每处理 N 个 Item 后调用 gc.collect(),可以防止长期运行的内存膨胀。

  4. 用字段标注代替无差别遍历
    只清洗、校验你声明过的字段,避免对 Item 里每个字段都执行重复逻辑,既提升性能,也减少误伤。


五、常见踩坑指南

  1. 清洗过度,越洗越脏
    盲目用正则一刀切删除字符,可能把正常内容砍掉。建议先通过日志输出异常值,再决定如何处理。

  2. 忘记时区,日期时间错乱
    如果目标网站涉及多时区,单纯的 datetime 不够用,推荐使用 pytz 或 Python 3.9+ 内置的 zoneinfo,确保日期带有时区信息。

  3. DropItem 的滥用
    DropItem 会中断整个 Pipeline 链条,只有在数据完全不可用时才扔出去。部分字段无效时,可以赋值为 None 或打标记,让下游再处理。

  4. Pipeline 顺序混乱
    一定要先清洗再校验,否则校验出来的错误很可能是清洗没做到位,而不是数据本身的问题。

  5. 大文本处理性能
    对于包含长文本的 Item,清洗时尽量采用 re.compile 预编译正则,避免在 process_item 内重复编译。


相关推荐