Perché Scraping Infrastructure ha bisogno di architettura dedicata
Uno script single-threaded che colpisce un sito web funziona bene per piccole attività. Ma quando è necessario raschiare milioni di pagine ogni giorno attraverso decine di obiettivi, che lo script diventa un collo di bottiglia. Infrastrutture di raschiamento richiede di passare da script lineari a architetture distribuite, basate sulla coda che gestiscono i guasti con grazia, gestire la rotazione del proxy e massimizzare il throughput.
Questa guida copre i modelli di architettura, i sistemi di coda, le strategie di scaling orizzontale e le tecniche di gestione proxy che alimentano la demolizione di livello di produzione su scala.
Questo articolo si basa sui concetti del nostro Guida completa ai proxy Web Scraping. Per il dimensionamento del pool proxy, vedere Quanti Proxies hai bisogno di Scraping?
Modelli di architettura per Scraping scalabile
Modello 1: Scraping basato su coda
Il fondamento della raschiatura scalabile è un coda di messaggio che decouples URL scoperta da data fetching. I lavoratori tirano le attività dalla coda, mettono le pagine attraverso i proxy e spingono i risultati alla memorizzazione.
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)Vantaggi di questo modello:
- Ridimensionamento orizzontale: Aggiungere o rimuovere i lavoratori senza cambiare il sistema
- Tolleranza di guasto: Le attività fallite ritornano alla coda per la riprova
- Controllo delle tariffe: Regolare il conteggio dei lavoratori per controllare la produttività complessiva
- Visibilità: La profondità di coda mostra il backlog; il tasso di completamento mostra la salute
Attuazione Python con Redis Queue
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)Node.js Attuazione con Bull Queue
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});Modello 2: Pipeline Architettura
Per i flussi di lavoro di raschiatura complessi, utilizzare un pipeline dove ogni fase gestisce una diversa preoccupazione:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouseAttuazione
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}Strategie di scala orizzontale
Strategia 1: Distribuzione multi-macchina
Distribuire lavoratori su più macchine. La coda funge da punto di coordinamento:
| Componenti | Distribuzione | Scala |
|---|---|---|
| Queue (Redis/RabbitMQ) | Server dedicato o servizio gestito | Verticale (più RAM) |
| Lavoratori | Macchine multiple o contenitori | Orizzontale (istanze aggiuntive) |
| Risultati della conservazione | Database o negozio di oggetti | Verticale + sharding |
| Monitoraggio | Cruscotto centralizzato | Esempio singolo |
Strategia 2: Scala basata su container
Utilizzare Docker e Kubernetes per scaling elastico. Ogni lavoratore opera in un contenitore che può essere replicato:
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redisGestione del proxy in scala
In scala, la gestione dei proxy diventa un componente del sistema critico. Considerazioni chiave:
Connessione Piscina
Riutilizzare le connessioni al gateway proxy invece di crearne di nuove a richiesta. Questo riduce la latenza e la connessione in testa:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)Monitoraggio della salute
Monitorare le prestazioni del proxy in tempo reale per rilevare i problemi in anticipo:
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)Conservazione dei dati in scala
| Tipo di stoccaggio | Migliore per | Scala |
|---|---|---|
| PostgreSQL | Dati strutturati del prodotto/prezzi | Milioni di file |
| Mongoli | Piani semistrutturati/variabili | Fatturazione dei documenti |
| storage S3/Object | Archivio HTML crudo | Petabyte |
| Ricerca elastica | Ricerca full-text sui dati raschiati | Fatturazione dei documenti |
| ClickHouse | Analisi su grandi set di dati | Trilioni di righe |
Lista di controllo scala
- Decouple URL scoperta da fetching. Utilizzare una coda di messaggio tra le fasi.
- Realizzare una corretta logica di riprovazione. Backoff esponenziale con code di lettere morte per guasti persistenti.
- Controlla tutto. Profondità di coda, tassi di successo, latenza, tassi di errore per dominio di destinazione.
- Utilizzare la connessione pooling. Riutilizzare le connessioni proxy invece di crearne di nuove a richiesta.
- Piano di fallimento. I lavoratori crash, i proxy vengono bloccati, gli obiettivi cambiano la loro struttura. Costruisci la resilienza in ogni strato.
- Prova su scala prima del lancio. Un sistema che funziona a 100 RPM può fallire a 10.000 RPM a causa di memoria, limiti di connessione o strozzature di coda.
Per le strategie di rotazione proxy che completano l'architettura di scaling, leggere Strategie di rotazione proxy per Scraping a grande scala. Per gestire i limiti di tasso come si scala, vedere Limiti di velocità di scorrimento spiegati.
Utilizzare Python SDK♪ Node SDKo Vai SDK per l'integrazione dei proxy di produzione ed esplorare Piani ProxyHat per raschiatura ad alto volume.
Domande frequenti
Quale sistema di coda è meglio per raschiare in scala?
Redis con Bull (Node.js) o RQ (Python) funziona bene fino a milioni di compiti al giorno. Per dimensioni più grandi, Apache Kafka o RabbitMQ fornisce una migliore durata e produttività. Scegli in base alla tua infrastruttura esistente e alle competenze del team.
Quanti lavoratori concorrenti dovrei correre?
Inizia con 10-20 lavoratori e scala in base alla capacità del proxy e alla tolleranza del sito di destinazione. Monitorare i tassi di successo — se cadono al di sotto del 90%, ridurre la competitività prima di aggiungere più lavoratori. Ogni lavoratore tramite ProxyHat ottiene rotazione automatica dell'IP.
Dovrei usare asincrona o filettatura per i lavoratori?
Per la raschiatura I/O-bound (la maggior parte dei casi), asinc (Python asyncio, Node.js) fornisce una migliore efficienza delle risorse rispetto al threading. Utilizzare la filettatura o il multiprocessing solo quando è necessario la parsing CPU-heavy accanto a fetching. Vai goroutines eccellere a entrambi i modelli.
Come posso gestire i cambiamenti della struttura del sito di destinazione?
Implementare la convalida dei dati nella vostra pipeline. Quando i dati analizzati non riescono a convalidare (i campi mancanti, i tipi errati), avvisa il team e la coda gli URL interessati per il rielaborazione con i parser aggiornati. Versione il tuo parser in modo da poter tornare indietro se necessario.






