如何扩展网络爬虫基础设施

缩放网刮的架构模式:基于队列的系统,管道设计,带容器的横向缩放,以及规模化的代理管理. 代码在Python,Node.js,和去.

如何扩展网络爬虫基础设施

为何需要专门建筑

一个单行脚本击中一个网站,对于小任务来说效果很好. 但是,当你需要每天刮去数百万页 跨越数十个目标, 这个脚本成为一个瓶颈。 扩大废旧基础设施 需要从线性脚本向分布式,基于队列的架构移动,这些架构优雅地处理失败,管理代理旋转,并最大限度地实现吞吐量.

本指南涵盖了架构模式,队列系统,水平缩放策略,以及电动生产级刮刮规模的代理管理技术.

这篇文章以我们从 完整网页搜索代理指南。关于代理池大小,参见 需要多少代理吗?

可缩放划痕的架构模式

模式1:基于队列的拼写

可缩放刮刮的基础是一个 消息队列 从数据采集中解开URL发现. 工人从队列中提取任务,通过代理获取页面,并将结果推向存储.

# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
#                               → Worker 2 →
#                               → Worker N →
#                               ↓
#                        [Dead Letter Queue]
#                        (failed requests)

这种模式的好处:

  • 水平缩放 : 在不改变系统的情况下添加或删除工人
  • 过失容忍度: 失败的任务返回队列进行重试
  • 费率控制 : 调整工人人数以控制总体吞吐量
  • 可见度 : 队列深度显示积压;完成率显示健康

用 Redis 队列执行 Python

import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
    url: str
    target: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
class ScrapingQueue:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:tasks"
        self.results_key = "scrape:results"
        self.dlq_key = "scrape:dead_letter"
    def enqueue(self, task: ScrapeTask):
        self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
    def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
        result = self.redis.brpop(self.queue_key, timeout=timeout)
        if result:
            data = json.loads(result[1])
            return ScrapeTask(**data)
        return None
    def store_result(self, url: str, data: dict):
        self.redis.hset(self.results_key, url, json.dumps(data))
    def send_to_dlq(self, task: ScrapeTask, error: str):
        task_data = task.__dict__
        task_data["error"] = error
        self.redis.lpush(self.dlq_key, json.dumps(task_data))
    @property
    def pending_count(self) -> int:
        return self.redis.llen(self.queue_key)
class ScrapingWorker:
    def __init__(self, queue: ScrapingQueue, worker_id: int):
        self.queue = queue
        self.worker_id = worker_id
        self.session = requests.Session()
        self.session.proxies = {"http": PROXY, "https": PROXY}
    def process_task(self, task: ScrapeTask) -> bool:
        try:
            resp = self.session.get(task.url, timeout=30)
            if resp.status_code == 200:
                self.queue.store_result(task.url, {
                    "status": 200,
                    "body": resp.text[:10000],  # Truncate for storage
                    "target": task.target,
                })
                return True
            elif resp.status_code in [429, 503]:
                # Retry with backoff
                if task.retries < task.max_retries:
                    task.retries += 1
                    time.sleep(2 ** task.retries)
                    self.queue.enqueue(task)
                else:
                    self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
            else:
                self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
        except Exception as e:
            if task.retries < task.max_retries:
                task.retries += 1
                self.queue.enqueue(task)
            else:
                self.queue.send_to_dlq(task, str(e))
        return False
    def run(self):
        print(f"Worker {self.worker_id} started")
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
    queue.enqueue(ScrapeTask(
        url=f"https://example.com/product/{i}",
        target="example.com"
    ))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
    workers = [ScrapingWorker(queue, i) for i in range(10)]
    for worker in workers:
        executor.submit(worker.run)

节点.js 用牛排执行

const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => {  // 10 concurrent workers
  const { url, target } = job.data;
  try {
    const res = await fetch(url, { agent, timeout: 30000 });
    if (res.ok) {
      const body = await res.text();
      return { url, status: res.status, body: body.slice(0, 10000) };
    }
    if (res.status === 429 || res.status === 503) {
      throw new Error(`Rate limited: HTTP ${res.status}`);
    }
    return { url, status: res.status, body: null };
  } catch (err) {
    throw err; // Bull will retry based on job options
  }
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
  for (const url of urls) {
    await scrapeQueue.add(
      { url, target },
      {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: 1000,
        removeOnFail: false,
      }
    );
  }
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
  console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
  console.error(`Failed: ${job.data.url} - ${err.message}`);
});

图案2:管道结构

对于复杂的刮碎工作流程,使用 管道 每个阶段处理不同的问题:

# Pipeline stages:
#
# Stage 1: URL Discovery    → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction  → parses HTML, extracts data
# Stage 4: Data Validation  → checks quality, deduplicates
# Stage 5: Storage          → saves to database/warehouse

执行

package main
import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)
type ScrapeResult struct {
    URL    string `json:"url"`
    Status int    `json:"status"`
    Body   string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
    defer wg.Done()
    proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
    client := &http.Client{
        Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
        Timeout:   30 * time.Second,
    }
    for u := range urls {
        resp, err := client.Get(u)
        if err != nil {
            results <- ScrapeResult{URL: u, Status: 0}
            continue
        }
        body, _ := io.ReadAll(resp.Body)
        resp.Body.Close()
        results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
    }
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
    for result := range results {
        if result.Status == 200 {
            // Extract and store data
            data, _ := json.Marshal(result)
            fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
        }
    }
    done <- true
}
func main() {
    urls := make(chan string, 1000)
    results := make(chan ScrapeResult, 1000)
    done := make(chan bool)
    // Start 20 fetcher workers
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go fetcher(urls, results, &wg)
    }
    // Start processor
    go processor(results, done)
    // Feed URLs
    go func() {
        for i := 0; i < 10000; i++ {
            urls <- fmt.Sprintf("https://example.com/product/%d", i)
        }
        close(urls)
    }()
    // Wait for all fetchers, then close results
    wg.Wait()
    close(results)
    <-done
}

水平缩放策略

战略1:多机制部署

将工人分散在多个机器上. 队列充当协调点 :

构成部分部署缩放
队列( Redis/ RabbitMQ)专用服务器或管理服务垂直( 更多内存)
工人多个机器或容器水平( 添加实例)
成果储存数据库或对象存储垂直加硬化
监测集中式仪表板单一实例

战略2:基于集装箱的扩大

使用Docker和Kubernetes进行弹性缩放. 每个工人在一个可以复制的容器中运行:

# docker-compose.yml for scraping workers
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  scraper-worker:
    build: .
    environment:
      - PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
      - REDIS_URL=redis://redis:6379/0
      - WORKER_CONCURRENCY=10
    deploy:
      replicas: 5        # 5 containers × 10 concurrency = 50 parallel requests
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
    depends_on:
      - redis

规模代理管理

在规模上,代理管理成为一个关键的系统组成部分. 主要考虑因素:

连接集合

重用代理网关的连接, 而不是每个请求创建新网关 。 这减少了延迟和连接间接费用:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
    """Create a session with connection pooling and retry logic."""
    session = requests.Session()
    # Connection pool: keep 20 connections, max 50
    adapter = HTTPAdapter(
        pool_connections=20,
        pool_maxsize=50,
        max_retries=Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[500, 502, 503],
        )
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.proxies = {
        "http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
        "https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
    }
    return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
    resp = session.get(url, timeout=30)

卫生监测

实时监视您的代理性能, 以便及早发现问题 :

import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
    """Track proxy health metrics for monitoring."""
    requests_total: int = 0
    requests_success: int = 0
    requests_blocked: int = 0
    requests_timeout: int = 0
    latency_samples: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    def record_request(self, status_code: int, latency_ms: float):
        self.requests_total += 1
        self.status_codes[status_code] += 1
        self.latency_samples.append(latency_ms)
        if status_code == 200:
            self.requests_success += 1
        elif status_code in [403, 429]:
            self.requests_blocked += 1
        # Keep only last 1000 samples
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]
    @property
    def success_rate(self) -> float:
        return self.requests_success / self.requests_total if self.requests_total else 0
    @property
    def avg_latency(self) -> float:
        return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
    def report(self) -> str:
        return (
            f"Total: {self.requests_total}, "
            f"Success: {self.success_rate:.1%}, "
            f"Blocked: {self.requests_blocked}, "
            f"Avg Latency: {self.avg_latency:.0f}ms"
        )

数据按比例存储

存储类型最佳时间缩放
邮递SQL结构产品/定价数据百万行数
蒙哥达半结构/可变计划10亿份文件
S3/ 对象存储Raw HTML 归档字节
弹性搜索对已删除的数据进行全文搜索10亿份文件
点击之家对大数据集的分析数以千计的行数

缩放检查列表

  • 从获取中解析 URL 发现 。 在阶段之间使用信件队列 。
  • 执行正确的重试逻辑 。 以死信队列表示持续失败的反向指示.
  • 监视一切 排队深度,成功率,耐久性,每个目标域的错误率.
  • 使用连接集合 。 重复使用代理连接, 而不是每个请求创建新的连接 。
  • 计划失败。 工人坠机 代理被封锁 目标改变结构 建立每个层次的复原力。
  • 发射前进行规模测试. 一个工作在100 RPM的系统可能因为内存,连接限制,或队列瓶颈而以10,000 RPM失败.

将辅助您缩放架构的代理旋转策略改为 大型碎屑代用旋转策略。要处理比例限制,请参见 速率限制解释。 。 。

使用 Python SDK 键盘, (中文). 节点 SDK,或 冲啊 SDK 用于生产代理集成,并探索 代理哈特计划 用于大容量刮刮。

经常被问到的问题

哪个排队系统最适合按级刮?

Redis with Bull (Node.js) 或 RQ (Python) 工作良好,每天可完成数百万项任务. 对于更大的规模,Apache Kafka或RabbitMQ提供更好的耐久性和吞吐量. 根据您现有的基础设施和团队专长来选择 。

我该跑多少个同时工作的工人?

从10到20名工人开始,并根据你的代理能力和目标站点耐受性进行规模化. 监测成功率——如果低于90%,在增加更多工人之前减少货币。 每个工人通过ProxyHat获得自动IP旋转.

我该给工人用电动或线织吗?

对于I/O式刮刮(大多数案例),ASync(Python asyncio,Node.js)比线程提供更好的资源效率. 仅在需要CPU重度解析并同时获取时使用线程或多处理. 去Goroutines 优异的两种模式。

我如何处理目标地点结构的变化?

在您的管道中执行数据验证 。 当解析数据失败验证( 丢失字段, 错误类型) 时, 提醒您的团队和队列会影响 URL , 以便使用更新的解析器进行再处理 。 版本您的解析器, 以便在需要时可以滚回 。

准备开始了吗?

通过AI过滤访问148多个国家的5000多万个住宅IP。

查看价格住宅代理
← 返回博客