Come Scalare le Infrastrutture di Scraping

Modelli di architettura per scaling web scraping: sistemi basati sulla coda, progettazione pipeline, scalabilità orizzontale con contenitori e gestione proxy in scala. Codice in Python, Node.js, and Go.

Come Scalare le Infrastrutture di Scraping

Perché Scraping Infrastructure ha bisogno di architettura dedicata

Uno script single-threaded che colpisce un sito web funziona bene per piccole attività. Ma quando è necessario raschiare milioni di pagine ogni giorno attraverso decine di obiettivi, che lo script diventa un collo di bottiglia. Infrastrutture di raschiamento richiede di passare da script lineari a architetture distribuite, basate sulla coda che gestiscono i guasti con grazia, gestire la rotazione del proxy e massimizzare il throughput.

Questa guida copre i modelli di architettura, i sistemi di coda, le strategie di scaling orizzontale e le tecniche di gestione proxy che alimentano la demolizione di livello di produzione su scala.

Questo articolo si basa sui concetti del nostro Guida completa ai proxy Web Scraping. Per il dimensionamento del pool proxy, vedere Quanti Proxies hai bisogno di Scraping?

Modelli di architettura per Scraping scalabile

Modello 1: Scraping basato su coda

Il fondamento della raschiatura scalabile è un coda di messaggio che decouples URL scoperta da data fetching. I lavoratori tirano le attività dalla coda, mettono le pagine attraverso i proxy e spingono i risultati alla memorizzazione.

# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
#                               → Worker 2 →
#                               → Worker N →
#                               ↓
#                        [Dead Letter Queue]
#                        (failed requests)

Vantaggi di questo modello:

  • Ridimensionamento orizzontale: Aggiungere o rimuovere i lavoratori senza cambiare il sistema
  • Tolleranza di guasto: Le attività fallite ritornano alla coda per la riprova
  • Controllo delle tariffe: Regolare il conteggio dei lavoratori per controllare la produttività complessiva
  • Visibilità: La profondità di coda mostra il backlog; il tasso di completamento mostra la salute

Attuazione Python con Redis Queue

import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
    url: str
    target: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
class ScrapingQueue:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:tasks"
        self.results_key = "scrape:results"
        self.dlq_key = "scrape:dead_letter"
    def enqueue(self, task: ScrapeTask):
        self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
    def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
        result = self.redis.brpop(self.queue_key, timeout=timeout)
        if result:
            data = json.loads(result[1])
            return ScrapeTask(**data)
        return None
    def store_result(self, url: str, data: dict):
        self.redis.hset(self.results_key, url, json.dumps(data))
    def send_to_dlq(self, task: ScrapeTask, error: str):
        task_data = task.__dict__
        task_data["error"] = error
        self.redis.lpush(self.dlq_key, json.dumps(task_data))
    @property
    def pending_count(self) -> int:
        return self.redis.llen(self.queue_key)
class ScrapingWorker:
    def __init__(self, queue: ScrapingQueue, worker_id: int):
        self.queue = queue
        self.worker_id = worker_id
        self.session = requests.Session()
        self.session.proxies = {"http": PROXY, "https": PROXY}
    def process_task(self, task: ScrapeTask) -> bool:
        try:
            resp = self.session.get(task.url, timeout=30)
            if resp.status_code == 200:
                self.queue.store_result(task.url, {
                    "status": 200,
                    "body": resp.text[:10000],  # Truncate for storage
                    "target": task.target,
                })
                return True
            elif resp.status_code in [429, 503]:
                # Retry with backoff
                if task.retries < task.max_retries:
                    task.retries += 1
                    time.sleep(2 ** task.retries)
                    self.queue.enqueue(task)
                else:
                    self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
            else:
                self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
        except Exception as e:
            if task.retries < task.max_retries:
                task.retries += 1
                self.queue.enqueue(task)
            else:
                self.queue.send_to_dlq(task, str(e))
        return False
    def run(self):
        print(f"Worker {self.worker_id} started")
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
    queue.enqueue(ScrapeTask(
        url=f"https://example.com/product/{i}",
        target="example.com"
    ))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
    workers = [ScrapingWorker(queue, i) for i in range(10)]
    for worker in workers:
        executor.submit(worker.run)

Node.js Attuazione con Bull Queue

const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => {  // 10 concurrent workers
  const { url, target } = job.data;
  try {
    const res = await fetch(url, { agent, timeout: 30000 });
    if (res.ok) {
      const body = await res.text();
      return { url, status: res.status, body: body.slice(0, 10000) };
    }
    if (res.status === 429 || res.status === 503) {
      throw new Error(`Rate limited: HTTP ${res.status}`);
    }
    return { url, status: res.status, body: null };
  } catch (err) {
    throw err; // Bull will retry based on job options
  }
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
  for (const url of urls) {
    await scrapeQueue.add(
      { url, target },
      {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: 1000,
        removeOnFail: false,
      }
    );
  }
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
  console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
  console.error(`Failed: ${job.data.url} - ${err.message}`);
});

Modello 2: Pipeline Architettura

Per i flussi di lavoro di raschiatura complessi, utilizzare un pipeline dove ogni fase gestisce una diversa preoccupazione:

# Pipeline stages:
#
# Stage 1: URL Discovery    → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction  → parses HTML, extracts data
# Stage 4: Data Validation  → checks quality, deduplicates
# Stage 5: Storage          → saves to database/warehouse

Attuazione

package main
import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)
type ScrapeResult struct {
    URL    string `json:"url"`
    Status int    `json:"status"`
    Body   string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
    defer wg.Done()
    proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
    client := &http.Client{
        Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
        Timeout:   30 * time.Second,
    }
    for u := range urls {
        resp, err := client.Get(u)
        if err != nil {
            results <- ScrapeResult{URL: u, Status: 0}
            continue
        }
        body, _ := io.ReadAll(resp.Body)
        resp.Body.Close()
        results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
    }
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
    for result := range results {
        if result.Status == 200 {
            // Extract and store data
            data, _ := json.Marshal(result)
            fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
        }
    }
    done <- true
}
func main() {
    urls := make(chan string, 1000)
    results := make(chan ScrapeResult, 1000)
    done := make(chan bool)
    // Start 20 fetcher workers
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go fetcher(urls, results, &wg)
    }
    // Start processor
    go processor(results, done)
    // Feed URLs
    go func() {
        for i := 0; i < 10000; i++ {
            urls <- fmt.Sprintf("https://example.com/product/%d", i)
        }
        close(urls)
    }()
    // Wait for all fetchers, then close results
    wg.Wait()
    close(results)
    <-done
}

Strategie di scala orizzontale

Strategia 1: Distribuzione multi-macchina

Distribuire lavoratori su più macchine. La coda funge da punto di coordinamento:

ComponentiDistribuzioneScala
Queue (Redis/RabbitMQ)Server dedicato o servizio gestitoVerticale (più RAM)
LavoratoriMacchine multiple o contenitoriOrizzontale (istanze aggiuntive)
Risultati della conservazioneDatabase o negozio di oggettiVerticale + sharding
MonitoraggioCruscotto centralizzatoEsempio singolo

Strategia 2: Scala basata su container

Utilizzare Docker e Kubernetes per scaling elastico. Ogni lavoratore opera in un contenitore che può essere replicato:

# docker-compose.yml for scraping workers
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  scraper-worker:
    build: .
    environment:
      - PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
      - REDIS_URL=redis://redis:6379/0
      - WORKER_CONCURRENCY=10
    deploy:
      replicas: 5        # 5 containers × 10 concurrency = 50 parallel requests
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
    depends_on:
      - redis

Gestione del proxy in scala

In scala, la gestione dei proxy diventa un componente del sistema critico. Considerazioni chiave:

Connessione Piscina

Riutilizzare le connessioni al gateway proxy invece di crearne di nuove a richiesta. Questo riduce la latenza e la connessione in testa:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
    """Create a session with connection pooling and retry logic."""
    session = requests.Session()
    # Connection pool: keep 20 connections, max 50
    adapter = HTTPAdapter(
        pool_connections=20,
        pool_maxsize=50,
        max_retries=Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[500, 502, 503],
        )
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.proxies = {
        "http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
        "https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
    }
    return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
    resp = session.get(url, timeout=30)

Monitoraggio della salute

Monitorare le prestazioni del proxy in tempo reale per rilevare i problemi in anticipo:

import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
    """Track proxy health metrics for monitoring."""
    requests_total: int = 0
    requests_success: int = 0
    requests_blocked: int = 0
    requests_timeout: int = 0
    latency_samples: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    def record_request(self, status_code: int, latency_ms: float):
        self.requests_total += 1
        self.status_codes[status_code] += 1
        self.latency_samples.append(latency_ms)
        if status_code == 200:
            self.requests_success += 1
        elif status_code in [403, 429]:
            self.requests_blocked += 1
        # Keep only last 1000 samples
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]
    @property
    def success_rate(self) -> float:
        return self.requests_success / self.requests_total if self.requests_total else 0
    @property
    def avg_latency(self) -> float:
        return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
    def report(self) -> str:
        return (
            f"Total: {self.requests_total}, "
            f"Success: {self.success_rate:.1%}, "
            f"Blocked: {self.requests_blocked}, "
            f"Avg Latency: {self.avg_latency:.0f}ms"
        )

Conservazione dei dati in scala

Tipo di stoccaggioMigliore perScala
PostgreSQLDati strutturati del prodotto/prezziMilioni di file
MongoliPiani semistrutturati/variabiliFatturazione dei documenti
storage S3/ObjectArchivio HTML crudoPetabyte
Ricerca elasticaRicerca full-text sui dati raschiatiFatturazione dei documenti
ClickHouseAnalisi su grandi set di datiTrilioni di righe

Lista di controllo scala

  • Decouple URL scoperta da fetching. Utilizzare una coda di messaggio tra le fasi.
  • Realizzare una corretta logica di riprovazione. Backoff esponenziale con code di lettere morte per guasti persistenti.
  • Controlla tutto. Profondità di coda, tassi di successo, latenza, tassi di errore per dominio di destinazione.
  • Utilizzare la connessione pooling. Riutilizzare le connessioni proxy invece di crearne di nuove a richiesta.
  • Piano di fallimento. I lavoratori crash, i proxy vengono bloccati, gli obiettivi cambiano la loro struttura. Costruisci la resilienza in ogni strato.
  • Prova su scala prima del lancio. Un sistema che funziona a 100 RPM può fallire a 10.000 RPM a causa di memoria, limiti di connessione o strozzature di coda.

Per le strategie di rotazione proxy che completano l'architettura di scaling, leggere Strategie di rotazione proxy per Scraping a grande scala. Per gestire i limiti di tasso come si scala, vedere Limiti di velocità di scorrimento spiegati.

Utilizzare Python SDKNode SDKo Vai SDK per l'integrazione dei proxy di produzione ed esplorare Piani ProxyHat per raschiatura ad alto volume.

Domande frequenti

Quale sistema di coda è meglio per raschiare in scala?

Redis con Bull (Node.js) o RQ (Python) funziona bene fino a milioni di compiti al giorno. Per dimensioni più grandi, Apache Kafka o RabbitMQ fornisce una migliore durata e produttività. Scegli in base alla tua infrastruttura esistente e alle competenze del team.

Quanti lavoratori concorrenti dovrei correre?

Inizia con 10-20 lavoratori e scala in base alla capacità del proxy e alla tolleranza del sito di destinazione. Monitorare i tassi di successo — se cadono al di sotto del 90%, ridurre la competitività prima di aggiungere più lavoratori. Ogni lavoratore tramite ProxyHat ottiene rotazione automatica dell'IP.

Dovrei usare asincrona o filettatura per i lavoratori?

Per la raschiatura I/O-bound (la maggior parte dei casi), asinc (Python asyncio, Node.js) fornisce una migliore efficienza delle risorse rispetto al threading. Utilizzare la filettatura o il multiprocessing solo quando è necessario la parsing CPU-heavy accanto a fetching. Vai goroutines eccellere a entrambi i modelli.

Come posso gestire i cambiamenti della struttura del sito di destinazione?

Implementare la convalida dei dati nella vostra pipeline. Quando i dati analizzati non riescono a convalidare (i campi mancanti, i tipi errati), avvisa il team e la coda gli URL interessati per il rielaborazione con i parser aggiornati. Versione il tuo parser in modo da poter tornare indietro se necessario.

Pronto per iniziare?

Accedi a oltre 50M di IP residenziali in oltre 148 paesi con filtraggio AI.

Vedi i prezziProxy residenziali
← Torna al Blog