Dlaczego Scraping Infrastructure Needs Dedykowana Architektura
Pojedynczy skrypt uderzający w jedną stronę działa dobrze dla małych zadań. Ale kiedy musisz codziennie zeskrobywać miliony stron na dziesiątki celów, ten scenariusz staje się wąskim gardłem. Infrastruktura skalibrowania wymaga przejścia ze skryptów liniowych na rozproszone, oparte na kolejkach architektury, które obsługują błędy wdzięcznie, zarządzać rotacją proxy i maksymalizować przepustowość.
Niniejszy przewodnik obejmuje wzorce architektury, systemy kolejki, poziome strategie skalowania oraz techniki zarządzania proxy, które produkują energię w skali skrobania.
Ten artykuł opiera się na koncepcjach z naszych Kompletny przewodnik do Web Scraping Proxies. Dla proxy rozmiar puli, patrz Ile nagród potrzebujesz do skrobania?
Wzory architektoniczne do skalowalnego skrobania
Wzór 1: Rozdrabnianie oparte na kolejkach
Podstawą drapania skalowalnego jest kolejka wiadomości który oddziela odkrycie URL od pobierania danych. Pracownicy wyciągają zadania z kolejki, pobierają strony przez proxy i wpychają wyniki do magazynu.
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)Korzyści z tego wzoru:
- Poziome skalowanie: Dodawanie lub usuwanie pracowników bez zmiany systemu
- Tolerancja błędu: Nieudany powrót zadań do kolejki dla powtórki
- Kontrola szybkości: Dostosowanie liczby pracowników do kontroli ogólnej przepustowości
- Widoczność: Głębokość kolejki pokazuje zaległości; wskaźnik ukończenia pokazuje zdrowie
Wdrażanie Pythona z kolejką Redis
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)Wdrażanie Node.js z kolejką Bull
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});Wzór 2: Architektura rurociągów
W celu uzyskania złożonych strumieni roboczych należy użyć rurociąg w przypadku gdy każdy etap zajmuje się innym problemem:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouseWykonanie
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}Strategie skalowania poziomego
Strategia 1: Wdrożenie wielu maszyn
Rozprowadzanie pracowników na wielu maszynach. Kolejka działa jako punkt koordynacji:
| Składnik | Wdrożenie | Skalowanie |
|---|---|---|
| Kolejka (Redis / RabbitMQ) | Serwer dedykowany lub zarządzana usługa | Pionowe (więcej RAM) |
| Pracownicy | Maszyny wielofunkcyjne lub kontenery | Poziome (dodać instancje) |
| Przechowywanie wyników | Baza danych lub magazyn obiektów | Pionowe + ostrzenie |
| Monitorowanie | Centralna deska rozdzielcza | Pojedynczy przypadek |
Strategia 2: Skalowanie oparte na pojemniku
Użyj Docker i Kubernetes do elastycznego skalowania. Każdy pracownik biegnie w kontenerze, który może być replikowany:
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redisZarządzanie proxy w skali
W skali zarządzanie proxy staje się kluczowym składnikiem systemu. Kluczowe względy:
Pooling połączeń
Ponowne użycie połączeń z bramą proxy zamiast tworzenia nowych połączeń na żądanie. Zmniejsza to opóźnienie i podłączenie:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)Monitorowanie zdrowia
Monitoruj wydajność proxy w czasie rzeczywistym, aby wykryć problemy wcześnie:
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)Przechowywanie danych w skali
| Rodzaj przechowywania | Najlepsze dla | Skala |
|---|---|---|
| PostgreSQL | Zorganizowane dane dotyczące produktu / cen | Miliony wierszy |
| MongoB | Schematy półstrukturalne / zmienne | Miliardy dokumentów |
| Przechowywanie S3 / Obiekt | Surowe archiwa HTML | Petabajty |
| Elasticsearch | Wyszukiwarka pełnotekstowa nad zeskrobywanymi danymi | Miliardy dokumentów |
| ClickHouse | Analiza dużych zbiorów danych | Miliardy wierszy |
Skalowanie listy kontrolnej
- Decoupe odkrycie URL z pobierania. Użyj kolejki wiadomości pomiędzy etapami.
- Wdrożenie prawidłowej logiki powtórki. Eksponatyczna kopia zapasowa z martwymi kolejkami liter dla trwałych awarii.
- Monitoruj wszystko. Głębokość kolejki, wskaźniki sukcesu, opóźnienia, poziomy błędów w poszczególnych domenach.
- Użyj łączenia połączeń. Ponowne wykorzystanie połączeń proxy zamiast tworzenia nowych na żądanie.
- Plan na porażkę. Pracownicy się rozbijają, proxy zostają zablokowane, cele zmieniają strukturę. Zbuduj odporność na każdą warstwę.
- Test na skalę przed startem. System pracujący przy 100 RPM może się nie udać przy 10 000 RPM ze względu na pamięć, ograniczenia połączeń lub wąskie gardła w kolejce.
Dla strategii rotacji proxy, które uzupełniają skalowanie architektury, czytaj Strategie rotacji proxy dla rozdrabniania na dużą skalę. Aby poradzić sobie z limitami szybkości jak skalować, zobacz Rozdrapywanie wartości granicznych.
Użyj Python SDK, Węzeł SDKlub Go SDK do integracji proxy produkcji, i zbadać Plany ProxyHat do drapania o dużej objętości.
Często zadawane pytania
Jaki system kolejki jest najlepszy do skrobania w skali?
Redis z Bull (Node.js) lub RQ (Python) działa dobrze do milionów zadań dziennie. Dla większej skali, Apache Kafka lub RabbitMQ zapewnia lepszą trwałość i przepustowość. Wybierz na podstawie istniejącej infrastruktury i wiedzy zespołu.
Ilu pracowników mam prowadzić?
Zacznij od 10- 20 pracowników i skali w oparciu o możliwości proxy i tolerancji miejsca docelowego. Monitorowanie wskaźnika sukcesu - jeśli spadnie poniżej 90%, zmniejszyć współzależność przed dodaniem większej liczby pracowników. Każdy pracownik przez ProxyHat dostaje automatyczną rotację IP.
Czy powinienem używać asynku czy gwintowania dla pracowników?
Dla I / O- scrating (większość przypadków), async (Python asyncio, Node.js) zapewnia lepszą wydajność zasobów niż gwintowanie. Używać gwintowania lub wielokrotnego przetwarzania tylko wtedy, gdy potrzebujesz CPU- ciężkie parsowanie obok pobierania. Goroutines są najlepsze w obu wzorach.
Jak poradzić sobie ze zmianami struktury obiektu docelowego?
Wdrożenie walidacji danych w twoim rurociągu. Gdy dane przelane nie są sprawdzane (brakujące pola, niewłaściwe typy), zaalarmuj swój zespół i kolejkę, które mają wpływ na adresy URL w celu ponownego przetwarzania z zaktualizowanymi parserami. Wersja twoich parserów, dzięki czemu możesz wrócić w razie potrzeby.






