现代代理池设计与实现指南

1. 概述

代理池是爬虫系统的「网络隐形盾」——既能分散目标网站的请求压力避免封IP,又能结合不同区域代理实现跨地域数据抓取。本文从技术选型到核心实现再到优化实践,带你快速搭建异步、分层、高可用的轻量级代理池。

2. 技术选型

核心组件速览

选择原则:轻量、异步优先、易维护扩展

功能模块技术方案
存储层Redis(Sorted Set 天然支持代理优先级/分数管理、随机/范围查询高效)
异步IO工具aiohttp(异步HTTP请求+代理连接复用)
API服务层FastAPI(自动生成接口文档、异步友好、性能远超Flask)
日志系统loguru(彩色控制台输出、自动文件轮转、无需复杂配置)
容器化/部署Docker(一键拉起开发/生产环境)、Kubernetes(多节点高可用可选)

一键安装依赖

pip install aiohttp redis-py pyquery fastapi loguru uvicorn

3. 系统架构

代理池采用模块化调度+异步协作的设计,各组件低耦合高内聚:

graph TD
    A[采集模块<br>异步爬取多源代理] --> B[存储模块<br>Redis Sorted Set]
    C[检测模块<br>批量验证可用性/评分] --> B
    B --> D[API模块<br>FastAPI 提供随机/分层查询]
    E[调度模块<br>asyncio 协调采集/检测频率]
    E --> A
    E --> C

4. 核心实现

4.1 存储模块:Redis Sorted Set 优先

用Sorted Set的score字段做代理质量评分,100分最优,1分以下自动清理,天然实现:

  • 优先获取高分代理
  • 代理分数动态调整
  • 过期/失效代理自动淘汰
import redis
import random
from typing import Optional, List

# 常量统一管理
PROXY_KEY = "proxies"
MAX_SCORE = 100
MIN_SCORE = 1
INIT_SCORE = 10

class RedisProxyPool:
    def __init__(self, host: str = "localhost", port: int = 6379, 
                 password: Optional[str] = None, db: int = 0):
        self.db = redis.StrictRedis(
            host=host, port=port, password=password, db=db,
            decode_responses=True
        )

    def add(self, proxy: str, score: int = INIT_SCORE) -> int:
        """添加新代理,默认初始分数"""
        return self.db.zadd(PROXY_KEY, {proxy: score})

    def random(self) -> Optional[str]:
        """优先随机获取100分代理,否则取前100名"""
        full_score_proxies = self.db.zrangebyscore(PROXY_KEY, MAX_SCORE, MAX_SCORE)
        if full_score_proxies:
            return random.choice(full_score_proxies)
        top_proxies = self.db.zrevrange(PROXY_KEY, 0, 99)
        return random.choice(top_proxies) if top_proxies else None

    def decrease(self, proxy: str) -> Optional[int]:
        """代理失效扣分,低于阈值删除"""
        current_score = self.db.zscore(PROXY_KEY, proxy)
        if not current_score:
            return None
        if current_score > MIN_SCORE:
            return self.db.zincrby(PROXY_KEY, -1, proxy)
        else:
            self.db.zrem(PROXY_KEY, proxy)
            return None

    def max(self, proxy: str) -> int:
        """验证成功设为满分"""
        return self.db.zadd(PROXY_KEY, {proxy: MAX_SCORE})

    def count(self) -> int:
        """获取代理总数"""
        return self.db.zcard(PROXY_KEY)

    def all(self) -> List[str]:
        """获取所有代理用于批量检测"""
        return self.db.zrange(PROXY_KEY, 0, -1)

4.2 采集模块:抽象基类+异步爬取

设计BaseProxyCrawler抽象基类,后续新增代理源只需继承实现urlsparse方法,扩展性拉满。

from abc import ABC, abstractmethod
import aiohttp
from pyquery import PyQuery as pq
from typing import List

class BaseProxyCrawler(ABC):
    """代理采集抽象基类"""
    @property
    @abstractmethod
    def urls(self) -> List[str]:
        """待采集的代理网站URL列表"""
        pass

    async def crawl(self) -> List[str]:
        """异步批量爬取代理"""
        async with aiohttp.ClientSession() as session:
            proxies = []
            for url in self.urls:
                try:
                    async with session.get(url, timeout=15) as resp:
                        text = await resp.text()
                        proxies.extend(self.parse(text))
                except Exception as e:
                    from loguru import logger
                    logger.warning(f"爬取失败 {url}: {str(e)[:50]}")
            return proxies

    @abstractmethod
    def parse(self, html: str) -> List[str]:
        """解析HTML提取代理(格式:ip:port)"""
        pass

# 示例:快代理免费代理采集
class KuaiProxyCrawler(BaseProxyCrawler):
    @property
    def urls(self) -> List[str]:
        return [f"https://www.kuaidaili.com/free/inha/{i}/" for i in range(1, 3)]

    def parse(self, html: str) -> List[str]:
        doc = pq(html)
        proxies = []
        for tr in doc("table tbody tr").items():
            ip = tr("td:nth-child(1)").text()
            port = tr("td:nth-child(2)").text()
            proxies.append(f"{ip}:{port}")
        return proxies

4.3 检测模块:批量异步验证+智能容错

单个代理检测耗时较高,采用asyncio.gather批量并行执行,同时限制单次检测数量避免Redis或代理源压力过大。

import asyncio
import aiohttp
from loguru import logger

# 批量检测常量
TEST_URL = "https://www.baidu.com"  # 目标网站或通用测试站
BATCH_SIZE = 30  # 单次并行检测的代理数
TIMEOUT = 8  # 单个代理的超时时间
TEST_INTERVAL = 60  # 检测周期(秒)

class ProxyTester:
    def __init__(self, redis_pool: RedisProxyPool):
        self.redis = redis_pool

    async def test_single(self, proxy: str):
        """测试单个代理的可用性"""
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    TEST_URL,
                    proxy=f"http://{proxy}",
                    timeout=TIMEOUT
                ) as resp:
                    if resp.status == 200:
                        self.redis.max(proxy)
                        logger.success(f"✅ 有效代理: {proxy}")
                    else:
                        self.redis.decrease(proxy)
                        logger.warning(f"⚠️ 状态码异常: {proxy} ({resp.status})")
        except Exception as e:
            self.redis.decrease(proxy)
            logger.error(f"❌ 代理失效: {proxy} ({str(e)[:30]})")

    async def run(self):
        """循环批量检测所有代理"""
        logger.info("🚀 代理检测模块启动")
        while True:
            all_proxies = self.redis.all()
            if not all_proxies:
                await asyncio.sleep(TEST_INTERVAL)
                continue
            # 分批检测
            for i in range(0, len(all_proxies), BATCH_SIZE):
                batch = all_proxies[i:i+BATCH_SIZE]
                tasks = [self.test_single(p) for p in batch]
                await asyncio.gather(*tasks)
            await asyncio.sleep(TEST_INTERVAL)

4.4 API模块:FastAPI 极简实现

自动生成在线接口文档(访问http://localhost:8000/docshttp://localhost:8000/redoc),默认跨域方便调试。

from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional

# 初始化FastAPI
app = FastAPI(title="高效代理池API", description="轻量级异步代理池接口文档", version="1.0.0")

# 允许所有来源跨域(生产环境需改为IP白名单)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["GET"],
    allow_headers=["*"],
)

# 依赖注入:获取Redis连接池
def get_redis_pool() -> RedisProxyPool:
    return RedisProxyPool()

@app.get("/")
async def root():
    return {"status": "running", "docs": "/docs"}

@app.get("/random")
async def get_random_proxy(redis: RedisProxyPool = Depends(get_redis_pool)) -> dict:
    """随机获取一个有效代理(优先100分)"""
    proxy = redis.random()
    return {"code": 200, "proxy": proxy} if proxy else {"code": 404, "msg": "暂无可用代理"}

@app.get("/count")
async def get_proxy_count(redis: RedisProxyPool = Depends(get_redis_pool)) -> dict:
    """获取当前代理池的总代理数"""
    return {"code": 200, "count": redis.count()}

5. 最佳实践

5.1 多源采集保障多样性

至少接入5-10个不同的免费代理源(快代理、西刺、89IP等),付费代理可单独分层存储到proxies_paid的Sorted Set中,API支持指定layer=paid获取。

5.2 自动采集触发

当代理池可用代理(满分/前100名)不足阈值时(比如10个),调度模块自动触发采集任务,避免代理断供。

5.3 监控告警

用loguru的file_handler配合Prometheus+Grafana监控:

  • 代理池总数量
  • 满分代理数量
  • 代理失效/新增速率

6. 快速部署

6.1 Docker单节点部署

创建Dockerfilerequirements.txt

FROM python:3.11-slim

# 工作目录
WORKDIR /app

# 复制依赖文件
COPY requirements.txt .
COPY main.py .  # 假设所有核心代码整合到main.py

# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

# 暴露端口
EXPOSE 8000

# 启动命令
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

6.2 Docker Compose 拉起Redis+代理池

创建docker-compose.yml

version: '3.8'

services:
  redis:
    image: redis:7-alpine
    container_name: proxy-pool-redis
    restart: always
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  proxy-pool:
    build: .
    container_name: proxy-pool
    restart: always
    ports:
      - "8000:8000"
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379

volumes:
  redis-data:

启动命令:

docker-compose up -d

通过本指南,你可以在10分钟内搭建一个可用的轻量级代理池,后续可根据需求扩展分层管理、质量评分细化、多节点部署等功能。