Modern proxy pool design and implementation guide

1. Overview

The proxy pool is the "network invisible shield" of the crawler system - it can not only disperse the request pressure of the target website to avoid IP blocking, but also combine different regional proxies to achieve cross-regional data capture. From technology selection to core implementation to optimization practice, this article will help you quickly build an asynchronous, hierarchical, and highly available lightweight proxy pool.

2. Technology selection

Quick overview of core components

Selection principles: Lightweight, asynchronous first, easy to maintain and expand

Functional modulesTechnical solutions
Storage layerRedis (Sorted Set naturally supports agent priority/score management, random/range query efficiency)
Asynchronous IO toolaiohttp (asynchronous HTTP request + proxy connection reuse)
API service layerFastAPI (automatically generates interface documents, asynchronously friendly, performance far exceeds Flask)
Log systemloguru (color console output, automatic file rotation, no complex configuration required)
Containerization/deploymentDocker (one-click to launch development/production environment), Kubernetes (multi-node high availability optional)

One-click installation of dependencies

pip install aiohttp redis-py pyquery fastapi loguru uvicorn

3. System architecture

The agent pool adopts the design of modular scheduling + asynchronous collaboration, and each component has low coupling and high cohesion:

graph TD
    A[采集模块<br>异步爬取多源代理] --> B[存储模块<br>Redis Sorted Set]
    C[检测模块<br>批量验证可用性/评分] --> B
    B --> D[API模块<br>FastAPI 提供随机/分层查询]
    E[调度模块<br>asyncio 协调采集/检测频率]
    E --> A
    E --> C

4. Core implementation

4.1 Storage module: Redis Sorted Set takes priority

Using Sorted SetscoreThe field is used as a proxy quality score. 100 points is the best. If the score is lower than 1 point, it will be automatically cleaned. Natural implementation:

  • Prioritize access to high-scoring agents
  • Dynamic adjustment of agent scores
  • Expired/invalid agents are automatically eliminated
import redis
import random
from typing import Optional, List

# 常量统一管理
PROXY_KEY = "proxies"
MAX_SCORE = 100
MIN_SCORE = 1
INIT_SCORE = 10

class RedisProxyPool:
    def __init__(self, host: str = "localhost", port: int = 6379, 
                 password: Optional[str] = None, db: int = 0):
        self.db = redis.StrictRedis(
            host=host, port=port, password=password, db=db,
            decode_responses=True
        )

    def add(self, proxy: str, score: int = INIT_SCORE) -> int:
        """添加新代理,默认初始分数"""
        return self.db.zadd(PROXY_KEY, {proxy: score})

    def random(self) -> Optional[str]:
        """优先随机获取100分代理,否则取前100名"""
        full_score_proxies = self.db.zrangebyscore(PROXY_KEY, MAX_SCORE, MAX_SCORE)
        if full_score_proxies:
            return random.choice(full_score_proxies)
        top_proxies = self.db.zrevrange(PROXY_KEY, 0, 99)
        return random.choice(top_proxies) if top_proxies else None

    def decrease(self, proxy: str) -> Optional[int]:
        """代理失效扣分,低于阈值删除"""
        current_score = self.db.zscore(PROXY_KEY, proxy)
        if not current_score:
            return None
        if current_score > MIN_SCORE:
            return self.db.zincrby(PROXY_KEY, -1, proxy)
        else:
            self.db.zrem(PROXY_KEY, proxy)
            return None

    def max(self, proxy: str) -> int:
        """验证成功设为满分"""
        return self.db.zadd(PROXY_KEY, {proxy: MAX_SCORE})

    def count(self) -> int:
        """获取代理总数"""
        return self.db.zcard(PROXY_KEY)

    def all(self) -> List[str]:
        """获取所有代理用于批量检测"""
        return self.db.zrange(PROXY_KEY, 0, -1)

4.2 Collection module: abstract base class + asynchronous crawling

designBaseProxyCrawlerAbstract base class, subsequent new proxy sources only need to inherit and implementurlsandparsemethod with excellent scalability.

from abc import ABC, abstractmethod
import aiohttp
from pyquery import PyQuery as pq
from typing import List

class BaseProxyCrawler(ABC):
    """代理采集抽象基类"""
    @property
    @abstractmethod
    def urls(self) -> List[str]:
        """待采集的代理网站 URL 列表"""
        pass

    async def crawl(self) -> List[str]:
        """异步批量爬取代理"""
        async with aiohttp.ClientSession() as session:
            proxies = []
            for url in self.urls:
                try:
                    async with session.get(url, timeout=15) as resp:
                        text = await resp.text()
                        proxies.extend(self.parse(text))
                except Exception as e:
                    from loguru import logger
                    logger.warning(f"爬取失败 {url}: {str(e)[:50]}")
            return proxies

    @abstractmethod
    def parse(self, html: str) -> List[str]:
        """解析 HTML 提取代理(格式:ip:port)"""
        pass

# 示例:快代理免费代理采集
class KuaiProxyCrawler(BaseProxyCrawler):
    @property
    def urls(self) -> List[str]:
        return [f"https://www.kuaidaili.com/free/inha/{i}/" for i in range(1, 3)]

    def parse(self, html: str) -> List[str]:
        doc = pq(html)
        proxies = []
        for tr in doc("table tbody tr").items():
            ip = tr("td:nth-child(1)").text()
            port = tr("td:nth-child(2)").text()
            proxies.append(f"{ip}:{port}")
        return proxies

4.3 Detection module: batch asynchronous verification + intelligent fault tolerance

Single agent detection is time-consuming, so useasyncio.gatherExecute in batches in parallel while limiting the number of single detections to avoid excessive pressure on Redis or proxy sources.

import asyncio
import aiohttp
from loguru import logger

# 批量检测常量
TEST_URL = "https://www.baidu.com"   # 目标网站或通用测试站
BATCH_SIZE = 30                      # 单次并行检测的代理数
TIMEOUT = 8                          # 单个代理的超时时间
TEST_INTERVAL = 60                   # 检测周期(秒)

class ProxyTester:
    def __init__(self, redis_pool: RedisProxyPool):
        self.redis = redis_pool

    async def test_single(self, proxy: str):
        """测试单个代理的可用性"""
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    TEST_URL,
                    proxy=f"http://{proxy}",
                    timeout=TIMEOUT
                ) as resp:
                    if resp.status == 200:
                        self.redis.max(proxy)
                        logger.success(f"✅ 有效代理: {proxy}")
                    else:
                        self.redis.decrease(proxy)
                        logger.warning(f"⚠️ 状态码异常: {proxy} ({resp.status})")
        except Exception as e:
            self.redis.decrease(proxy)
            logger.error(f"❌ 代理失效: {proxy} ({str(e)[:30]})")

    async def run(self):
        """循环批量检测所有代理"""
        logger.info("🚀 代理检测模块启动")
        while True:
            all_proxies = self.redis.all()
            if not all_proxies:
                await asyncio.sleep(TEST_INTERVAL)
                continue
            # 分批检测
            for i in range(0, len(all_proxies), BATCH_SIZE):
                batch = all_proxies[i:i+BATCH_SIZE]
                tasks = [self.test_single(p) for p in batch]
                await asyncio.gather(*tasks)
            await asyncio.sleep(TEST_INTERVAL)

4.4 API module: FastAPI minimalist implementation

Automatically generate online interface documentation (visithttp://localhost:8000/docsorhttp://localhost:8000/redoc), supports cross-domain debugging by default.

from fastapi import FastAPI, Depends
from fastapi.middleware.cors import CORSMiddleware
from typing import Optional

# 初始化 FastAPI
app = FastAPI(title="高效代理池API", description="轻量级异步代理池接口文档", version="1.0.0")

# 允许所有来源跨域(生产环境需改为 IP 白名单)
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["GET"],
    allow_headers=["*"],
)

# 依赖注入:获取 Redis 连接池
def get_redis_pool() -> RedisProxyPool:
    return RedisProxyPool()

@app.get("/")
async def root():
    return {"status": "running", "docs": "/docs"}

@app.get("/random")
async def get_random_proxy(redis: RedisProxyPool = Depends(get_redis_pool)) -> dict:
    """随机获取一个有效代理(优先 100 分)"""
    proxy = redis.random()
    return {"code": 200, "proxy": proxy} if proxy else {"code": 404, "msg": "暂无可用代理"}

@app.get("/count")
async def get_proxy_count(redis: RedisProxyPool = Depends(get_redis_pool)) -> dict:
    """获取当前代理池的总代理数"""
    return {"code": 200, "count": redis.count()}

5. Best Practices

5.1 Multi-source collection ensures diversity

Connect to at least 5-10 different free proxy sources (Quick Proxy, West Spur, 89IP, etc.), and paid proxies can be stored separately in layersproxies_paidIn the Sorted Set, the API supports specifyinglayer=paidGet.

5.2 Automatic collection trigger

When the available agents (full score/top 100) in the agent pool are less than the threshold (for example, 10), the scheduling module automatically triggers the collection task to avoid agent supply interruption.

5.3 Monitoring alarms

Using logurufile_handlerMonitoring with Prometheus + Grafana:

-Total number of proxy pools

  • Number of full-score agents
  • Agent expiration/new rate

6. Rapid deployment

6.1 Docker single node deployment

createDockerfileandrequirements.txt

FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
COPY main.py .  # 假设所有核心代码整合到 main.py

RUN pip install --no-cache-dir -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple

EXPOSE 8000

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

6.2 Docker Compose pulls up Redis + proxy pool

createdocker-compose.yml

version: '3.8'

services:
  redis:
    image: redis:7-alpine
    container_name: proxy-pool-redis
    restart: always
    ports:
      - "6379:6379"
    volumes:
      - redis-data:/data

  proxy-pool:
    build: .
    container_name: proxy-pool
    restart: always
    ports:
      - "8000:8000"
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379

volumes:
  redis-data:

Start command:

docker-compose up -d

Following this guide, you can build a usable lightweight proxy pool in 10 minutes. Later, you can expand hierarchical management, quality score refinement, multi-node deployment and other functions according to your needs.