Jak rozdrabniać infrastrukturę

Wzory architektoniczne do skalowania skanowania stron internetowych: systemy oparte na kolejkach, projektowanie rurociągów, skalowanie poziome kontenerów oraz zarządzanie proxy w skali. Kod w Pythonie, Node.js i Go.

Jak rozdrabniać infrastrukturę

Dlaczego Scraping Infrastructure Needs Dedykowana Architektura

Pojedynczy skrypt uderzający w jedną stronę działa dobrze dla małych zadań. Ale kiedy musisz codziennie zeskrobywać miliony stron na dziesiątki celów, ten scenariusz staje się wąskim gardłem. Infrastruktura skalibrowania wymaga przejścia ze skryptów liniowych na rozproszone, oparte na kolejkach architektury, które obsługują błędy wdzięcznie, zarządzać rotacją proxy i maksymalizować przepustowość.

Niniejszy przewodnik obejmuje wzorce architektury, systemy kolejki, poziome strategie skalowania oraz techniki zarządzania proxy, które produkują energię w skali skrobania.

Ten artykuł opiera się na koncepcjach z naszych Kompletny przewodnik do Web Scraping Proxies. Dla proxy rozmiar puli, patrz Ile nagród potrzebujesz do skrobania?

Wzory architektoniczne do skalowalnego skrobania

Wzór 1: Rozdrabnianie oparte na kolejkach

Podstawą drapania skalowalnego jest kolejka wiadomości który oddziela odkrycie URL od pobierania danych. Pracownicy wyciągają zadania z kolejki, pobierają strony przez proxy i wpychają wyniki do magazynu.

# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
#                               → Worker 2 →
#                               → Worker N →
#                               ↓
#                        [Dead Letter Queue]
#                        (failed requests)

Korzyści z tego wzoru:

  • Poziome skalowanie: Dodawanie lub usuwanie pracowników bez zmiany systemu
  • Tolerancja błędu: Nieudany powrót zadań do kolejki dla powtórki
  • Kontrola szybkości: Dostosowanie liczby pracowników do kontroli ogólnej przepustowości
  • Widoczność: Głębokość kolejki pokazuje zaległości; wskaźnik ukończenia pokazuje zdrowie

Wdrażanie Pythona z kolejką Redis

import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
    url: str
    target: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
class ScrapingQueue:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:tasks"
        self.results_key = "scrape:results"
        self.dlq_key = "scrape:dead_letter"
    def enqueue(self, task: ScrapeTask):
        self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
    def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
        result = self.redis.brpop(self.queue_key, timeout=timeout)
        if result:
            data = json.loads(result[1])
            return ScrapeTask(**data)
        return None
    def store_result(self, url: str, data: dict):
        self.redis.hset(self.results_key, url, json.dumps(data))
    def send_to_dlq(self, task: ScrapeTask, error: str):
        task_data = task.__dict__
        task_data["error"] = error
        self.redis.lpush(self.dlq_key, json.dumps(task_data))
    @property
    def pending_count(self) -> int:
        return self.redis.llen(self.queue_key)
class ScrapingWorker:
    def __init__(self, queue: ScrapingQueue, worker_id: int):
        self.queue = queue
        self.worker_id = worker_id
        self.session = requests.Session()
        self.session.proxies = {"http": PROXY, "https": PROXY}
    def process_task(self, task: ScrapeTask) -> bool:
        try:
            resp = self.session.get(task.url, timeout=30)
            if resp.status_code == 200:
                self.queue.store_result(task.url, {
                    "status": 200,
                    "body": resp.text[:10000],  # Truncate for storage
                    "target": task.target,
                })
                return True
            elif resp.status_code in [429, 503]:
                # Retry with backoff
                if task.retries < task.max_retries:
                    task.retries += 1
                    time.sleep(2 ** task.retries)
                    self.queue.enqueue(task)
                else:
                    self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
            else:
                self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
        except Exception as e:
            if task.retries < task.max_retries:
                task.retries += 1
                self.queue.enqueue(task)
            else:
                self.queue.send_to_dlq(task, str(e))
        return False
    def run(self):
        print(f"Worker {self.worker_id} started")
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
    queue.enqueue(ScrapeTask(
        url=f"https://example.com/product/{i}",
        target="example.com"
    ))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
    workers = [ScrapingWorker(queue, i) for i in range(10)]
    for worker in workers:
        executor.submit(worker.run)

Wdrażanie Node.js z kolejką Bull

const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => {  // 10 concurrent workers
  const { url, target } = job.data;
  try {
    const res = await fetch(url, { agent, timeout: 30000 });
    if (res.ok) {
      const body = await res.text();
      return { url, status: res.status, body: body.slice(0, 10000) };
    }
    if (res.status === 429 || res.status === 503) {
      throw new Error(`Rate limited: HTTP ${res.status}`);
    }
    return { url, status: res.status, body: null };
  } catch (err) {
    throw err; // Bull will retry based on job options
  }
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
  for (const url of urls) {
    await scrapeQueue.add(
      { url, target },
      {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: 1000,
        removeOnFail: false,
      }
    );
  }
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
  console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
  console.error(`Failed: ${job.data.url} - ${err.message}`);
});

Wzór 2: Architektura rurociągów

W celu uzyskania złożonych strumieni roboczych należy użyć rurociąg w przypadku gdy każdy etap zajmuje się innym problemem:

# Pipeline stages:
#
# Stage 1: URL Discovery    → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction  → parses HTML, extracts data
# Stage 4: Data Validation  → checks quality, deduplicates
# Stage 5: Storage          → saves to database/warehouse

Wykonanie

package main
import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)
type ScrapeResult struct {
    URL    string `json:"url"`
    Status int    `json:"status"`
    Body   string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
    defer wg.Done()
    proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
    client := &http.Client{
        Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
        Timeout:   30 * time.Second,
    }
    for u := range urls {
        resp, err := client.Get(u)
        if err != nil {
            results <- ScrapeResult{URL: u, Status: 0}
            continue
        }
        body, _ := io.ReadAll(resp.Body)
        resp.Body.Close()
        results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
    }
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
    for result := range results {
        if result.Status == 200 {
            // Extract and store data
            data, _ := json.Marshal(result)
            fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
        }
    }
    done <- true
}
func main() {
    urls := make(chan string, 1000)
    results := make(chan ScrapeResult, 1000)
    done := make(chan bool)
    // Start 20 fetcher workers
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go fetcher(urls, results, &wg)
    }
    // Start processor
    go processor(results, done)
    // Feed URLs
    go func() {
        for i := 0; i < 10000; i++ {
            urls <- fmt.Sprintf("https://example.com/product/%d", i)
        }
        close(urls)
    }()
    // Wait for all fetchers, then close results
    wg.Wait()
    close(results)
    <-done
}

Strategie skalowania poziomego

Strategia 1: Wdrożenie wielu maszyn

Rozprowadzanie pracowników na wielu maszynach. Kolejka działa jako punkt koordynacji:

SkładnikWdrożenieSkalowanie
Kolejka (Redis / RabbitMQ)Serwer dedykowany lub zarządzana usługaPionowe (więcej RAM)
PracownicyMaszyny wielofunkcyjne lub konteneryPoziome (dodać instancje)
Przechowywanie wynikówBaza danych lub magazyn obiektówPionowe + ostrzenie
MonitorowanieCentralna deska rozdzielczaPojedynczy przypadek

Strategia 2: Skalowanie oparte na pojemniku

Użyj Docker i Kubernetes do elastycznego skalowania. Każdy pracownik biegnie w kontenerze, który może być replikowany:

# docker-compose.yml for scraping workers
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  scraper-worker:
    build: .
    environment:
      - PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
      - REDIS_URL=redis://redis:6379/0
      - WORKER_CONCURRENCY=10
    deploy:
      replicas: 5        # 5 containers × 10 concurrency = 50 parallel requests
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
    depends_on:
      - redis

Zarządzanie proxy w skali

W skali zarządzanie proxy staje się kluczowym składnikiem systemu. Kluczowe względy:

Pooling połączeń

Ponowne użycie połączeń z bramą proxy zamiast tworzenia nowych połączeń na żądanie. Zmniejsza to opóźnienie i podłączenie:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
    """Create a session with connection pooling and retry logic."""
    session = requests.Session()
    # Connection pool: keep 20 connections, max 50
    adapter = HTTPAdapter(
        pool_connections=20,
        pool_maxsize=50,
        max_retries=Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[500, 502, 503],
        )
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.proxies = {
        "http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
        "https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
    }
    return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
    resp = session.get(url, timeout=30)

Monitorowanie zdrowia

Monitoruj wydajność proxy w czasie rzeczywistym, aby wykryć problemy wcześnie:

import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
    """Track proxy health metrics for monitoring."""
    requests_total: int = 0
    requests_success: int = 0
    requests_blocked: int = 0
    requests_timeout: int = 0
    latency_samples: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    def record_request(self, status_code: int, latency_ms: float):
        self.requests_total += 1
        self.status_codes[status_code] += 1
        self.latency_samples.append(latency_ms)
        if status_code == 200:
            self.requests_success += 1
        elif status_code in [403, 429]:
            self.requests_blocked += 1
        # Keep only last 1000 samples
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]
    @property
    def success_rate(self) -> float:
        return self.requests_success / self.requests_total if self.requests_total else 0
    @property
    def avg_latency(self) -> float:
        return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
    def report(self) -> str:
        return (
            f"Total: {self.requests_total}, "
            f"Success: {self.success_rate:.1%}, "
            f"Blocked: {self.requests_blocked}, "
            f"Avg Latency: {self.avg_latency:.0f}ms"
        )

Przechowywanie danych w skali

Rodzaj przechowywaniaNajlepsze dlaSkala
PostgreSQLZorganizowane dane dotyczące produktu / cenMiliony wierszy
MongoBSchematy półstrukturalne / zmienneMiliardy dokumentów
Przechowywanie S3 / ObiektSurowe archiwa HTMLPetabajty
ElasticsearchWyszukiwarka pełnotekstowa nad zeskrobywanymi danymiMiliardy dokumentów
ClickHouseAnaliza dużych zbiorów danychMiliardy wierszy

Skalowanie listy kontrolnej

  • Decoupe odkrycie URL z pobierania. Użyj kolejki wiadomości pomiędzy etapami.
  • Wdrożenie prawidłowej logiki powtórki. Eksponatyczna kopia zapasowa z martwymi kolejkami liter dla trwałych awarii.
  • Monitoruj wszystko. Głębokość kolejki, wskaźniki sukcesu, opóźnienia, poziomy błędów w poszczególnych domenach.
  • Użyj łączenia połączeń. Ponowne wykorzystanie połączeń proxy zamiast tworzenia nowych na żądanie.
  • Plan na porażkę. Pracownicy się rozbijają, proxy zostają zablokowane, cele zmieniają strukturę. Zbuduj odporność na każdą warstwę.
  • Test na skalę przed startem. System pracujący przy 100 RPM może się nie udać przy 10 000 RPM ze względu na pamięć, ograniczenia połączeń lub wąskie gardła w kolejce.

Dla strategii rotacji proxy, które uzupełniają skalowanie architektury, czytaj Strategie rotacji proxy dla rozdrabniania na dużą skalę. Aby poradzić sobie z limitami szybkości jak skalować, zobacz Rozdrapywanie wartości granicznych.

Użyj Python SDK, Węzeł SDKlub Go SDK do integracji proxy produkcji, i zbadać Plany ProxyHat do drapania o dużej objętości.

Często zadawane pytania

Jaki system kolejki jest najlepszy do skrobania w skali?

Redis z Bull (Node.js) lub RQ (Python) działa dobrze do milionów zadań dziennie. Dla większej skali, Apache Kafka lub RabbitMQ zapewnia lepszą trwałość i przepustowość. Wybierz na podstawie istniejącej infrastruktury i wiedzy zespołu.

Ilu pracowników mam prowadzić?

Zacznij od 10- 20 pracowników i skali w oparciu o możliwości proxy i tolerancji miejsca docelowego. Monitorowanie wskaźnika sukcesu - jeśli spadnie poniżej 90%, zmniejszyć współzależność przed dodaniem większej liczby pracowników. Każdy pracownik przez ProxyHat dostaje automatyczną rotację IP.

Czy powinienem używać asynku czy gwintowania dla pracowników?

Dla I / O- scrating (większość przypadków), async (Python asyncio, Node.js) zapewnia lepszą wydajność zasobów niż gwintowanie. Używać gwintowania lub wielokrotnego przetwarzania tylko wtedy, gdy potrzebujesz CPU- ciężkie parsowanie obok pobierania. Goroutines są najlepsze w obu wzorach.

Jak poradzić sobie ze zmianami struktury obiektu docelowego?

Wdrożenie walidacji danych w twoim rurociągu. Gdy dane przelane nie są sprawdzane (brakujące pola, niewłaściwe typy), zaalarmuj swój zespół i kolejkę, które mają wpływ na adresy URL w celu ponownego przetwarzania z zaktualizowanymi parserami. Wersja twoich parserów, dzięki czemu możesz wrócić w razie potrzeby.

Gotowy, aby zacząć?

Dostęp do ponad 50 mln rezydencjalnych IP w ponad 148 krajach z filtrowaniem AI.

Zobacz cenyProxy rezydencjalne
← Powrót do Bloga