Как масштабировать инфраструктуру скрапинга

Архитектурные паттерны масштабирования скрапинга: очередные системы, конвейерный дизайн, горизонтальное масштабирование контейнерами и управление прокси. Код на Python, Node.js и Go.

Как масштабировать инфраструктуру скрапинга

Почему инфраструктура скрапинга требует выделенной архитектуры

Однопоточный скрипт, обращающийся к одному сайту, работает для небольших задач. Но когда нужно ежедневно скрапить миллионы страниц с десятков целей, такой скрипт становится узким местом. Масштабирование инфраструктуры скрапинга требует перехода от линейных скриптов к распределённым, очередным архитектурам, которые грамотно обрабатывают ошибки, управляют ротацией прокси и максимизируют пропускную способность.

Это руководство охватывает архитектурные паттерны, системы очередей, стратегии горизонтального масштабирования и техники управления прокси для скрапинга промышленного уровня.

Статья опирается на концепции из нашего Полного руководства по прокси для веб-скрапинга. О расчёте пула прокси — Сколько прокси нужно для скрапинга?

Архитектурные паттерны масштабируемого скрапинга

Паттерн 1: Очередной скрапинг

Основа масштабируемого скрапинга — очередь сообщений, разделяющая обнаружение URL от получения данных. Воркеры берут задачи из очереди, получают страницы через прокси и отправляют результаты в хранилище.

# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
#                               → Worker 2 →
#                               → Worker N →
#                               ↓
#                        [Dead Letter Queue]
#                        (failed requests)

Преимущества этого паттерна:

  • Горизонтальное масштабирование: добавляйте или убирайте воркеров без изменения системы
  • Отказоустойчивость: неудачные задачи возвращаются в очередь для повторной попытки
  • Контроль скорости: регулируйте количество воркеров для управления общей пропускной способностью
  • Наблюдаемость: глубина очереди показывает отставание; скорость завершения — состояние системы

Реализация на Python с Redis Queue

import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
    url: str
    target: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
class ScrapingQueue:
    def __init__(self, redis_url: str):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scrape:tasks"
        self.results_key = "scrape:results"
        self.dlq_key = "scrape:dead_letter"
    def enqueue(self, task: ScrapeTask):
        self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
    def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
        result = self.redis.brpop(self.queue_key, timeout=timeout)
        if result:
            data = json.loads(result[1])
            return ScrapeTask(**data)
        return None
    def store_result(self, url: str, data: dict):
        self.redis.hset(self.results_key, url, json.dumps(data))
    def send_to_dlq(self, task: ScrapeTask, error: str):
        task_data = task.__dict__
        task_data["error"] = error
        self.redis.lpush(self.dlq_key, json.dumps(task_data))
    @property
    def pending_count(self) -> int:
        return self.redis.llen(self.queue_key)
class ScrapingWorker:
    def __init__(self, queue: ScrapingQueue, worker_id: int):
        self.queue = queue
        self.worker_id = worker_id
        self.session = requests.Session()
        self.session.proxies = {"http": PROXY, "https": PROXY}
    def process_task(self, task: ScrapeTask) -> bool:
        try:
            resp = self.session.get(task.url, timeout=30)
            if resp.status_code == 200:
                self.queue.store_result(task.url, {
                    "status": 200,
                    "body": resp.text[:10000],  # Truncate for storage
                    "target": task.target,
                })
                return True
            elif resp.status_code in [429, 503]:
                # Retry with backoff
                if task.retries < task.max_retries:
                    task.retries += 1
                    time.sleep(2 ** task.retries)
                    self.queue.enqueue(task)
                else:
                    self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
            else:
                self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
        except Exception as e:
            if task.retries < task.max_retries:
                task.retries += 1
                self.queue.enqueue(task)
            else:
                self.queue.send_to_dlq(task, str(e))
        return False
    def run(self):
        print(f"Worker {self.worker_id} started")
        while True:
            task = self.queue.dequeue(timeout=5)
            if task:
                self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
    queue.enqueue(ScrapeTask(
        url=f"https://example.com/product/{i}",
        target="example.com"
    ))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
    workers = [ScrapingWorker(queue, i) for i in range(10)]
    for worker in workers:
        executor.submit(worker.run)

Реализация на Node.js с Bull Queue

const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => {  // 10 concurrent workers
  const { url, target } = job.data;
  try {
    const res = await fetch(url, { agent, timeout: 30000 });
    if (res.ok) {
      const body = await res.text();
      return { url, status: res.status, body: body.slice(0, 10000) };
    }
    if (res.status === 429 || res.status === 503) {
      throw new Error(`Rate limited: HTTP ${res.status}`);
    }
    return { url, status: res.status, body: null };
  } catch (err) {
    throw err; // Bull will retry based on job options
  }
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
  for (const url of urls) {
    await scrapeQueue.add(
      { url, target },
      {
        attempts: 3,
        backoff: { type: 'exponential', delay: 2000 },
        removeOnComplete: 1000,
        removeOnFail: false,
      }
    );
  }
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
  console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
  console.error(`Failed: ${job.data.url} - ${err.message}`);
});

Паттерн 2: Конвейерная архитектура

Для сложных рабочих процессов используйте конвейер, где каждый этап отвечает за свою задачу:

# Pipeline stages:
#
# Stage 1: URL Discovery    → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction  → parses HTML, extracts data
# Stage 4: Data Validation  → checks quality, deduplicates
# Stage 5: Storage          → saves to database/warehouse

Реализация на Go

package main
import (
    "encoding/json"
    "fmt"
    "io"
    "net/http"
    "net/url"
    "sync"
    "time"
)
type ScrapeResult struct {
    URL    string `json:"url"`
    Status int    `json:"status"`
    Body   string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
    defer wg.Done()
    proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
    client := &http.Client{
        Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
        Timeout:   30 * time.Second,
    }
    for u := range urls {
        resp, err := client.Get(u)
        if err != nil {
            results <- ScrapeResult{URL: u, Status: 0}
            continue
        }
        body, _ := io.ReadAll(resp.Body)
        resp.Body.Close()
        results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
    }
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
    for result := range results {
        if result.Status == 200 {
            // Extract and store data
            data, _ := json.Marshal(result)
            fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
        }
    }
    done <- true
}
func main() {
    urls := make(chan string, 1000)
    results := make(chan ScrapeResult, 1000)
    done := make(chan bool)
    // Start 20 fetcher workers
    var wg sync.WaitGroup
    for i := 0; i < 20; i++ {
        wg.Add(1)
        go fetcher(urls, results, &wg)
    }
    // Start processor
    go processor(results, done)
    // Feed URLs
    go func() {
        for i := 0; i < 10000; i++ {
            urls <- fmt.Sprintf("https://example.com/product/%d", i)
        }
        close(urls)
    }()
    // Wait for all fetchers, then close results
    wg.Wait()
    close(results)
    <-done
}

Стратегии горизонтального масштабирования

Стратегия 1: Многомашинное развёртывание

Распределите воркеров между несколькими машинами. Очередь служит точкой координации:

КомпонентРазвёртываниеМасштабирование
Очередь (Redis/RabbitMQ)Выделенный сервер или управляемый сервисВертикальное (больше RAM)
ВоркерыНесколько машин или контейнеровГоризонтальное (добавление экземпляров)
Хранилище результатовБД или объектное хранилищеВертикальное + шардирование
МониторингЦентрализованная панельОдин экземпляр

Стратегия 2: Контейнерное масштабирование

Используйте Docker и Kubernetes для эластичного масштабирования. Каждый воркер работает в контейнере, который можно реплицировать:

# docker-compose.yml for scraping workers
version: '3.8'
services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
  scraper-worker:
    build: .
    environment:
      - PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
      - REDIS_URL=redis://redis:6379/0
      - WORKER_CONCURRENCY=10
    deploy:
      replicas: 5        # 5 containers × 10 concurrency = 50 parallel requests
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
    depends_on:
      - redis

Управление прокси в масштабе

При масштабировании управление прокси становится критичным компонентом системы. Ключевые аспекты:

Пулинг соединений

Переиспользуйте соединения с прокси-шлюзом вместо создания новых на каждый запрос. Это снижает задержку и нагрузку на соединения:

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
    """Create a session with connection pooling and retry logic."""
    session = requests.Session()
    # Connection pool: keep 20 connections, max 50
    adapter = HTTPAdapter(
        pool_connections=20,
        pool_maxsize=50,
        max_retries=Retry(
            total=3,
            backoff_factor=1,
            status_forcelist=[500, 502, 503],
        )
    )
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    session.proxies = {
        "http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
        "https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
    }
    return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
    resp = session.get(url, timeout=30)

Мониторинг состояния

Мониторьте производительность прокси в реальном времени для раннего обнаружения проблем:

import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
    """Track proxy health metrics for monitoring."""
    requests_total: int = 0
    requests_success: int = 0
    requests_blocked: int = 0
    requests_timeout: int = 0
    latency_samples: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    def record_request(self, status_code: int, latency_ms: float):
        self.requests_total += 1
        self.status_codes[status_code] += 1
        self.latency_samples.append(latency_ms)
        if status_code == 200:
            self.requests_success += 1
        elif status_code in [403, 429]:
            self.requests_blocked += 1
        # Keep only last 1000 samples
        if len(self.latency_samples) > 1000:
            self.latency_samples = self.latency_samples[-1000:]
    @property
    def success_rate(self) -> float:
        return self.requests_success / self.requests_total if self.requests_total else 0
    @property
    def avg_latency(self) -> float:
        return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
    def report(self) -> str:
        return (
            f"Total: {self.requests_total}, "
            f"Success: {self.success_rate:.1%}, "
            f"Blocked: {self.requests_blocked}, "
            f"Avg Latency: {self.avg_latency:.0f}ms"
        )

Хранение данных в масштабе

Тип хранилищаЛучше всего дляМасштаб
PostgreSQLСтруктурированные данные о товарах/ценахМиллионы строк
MongoDBПолуструктурированные/переменные схемыМиллиарды документов
S3/Объектное хранилищеАрхивы сырого HTMLПетабайты
ElasticsearchПолнотекстовый поиск по даннымМиллиарды документов
ClickHouseАналитика по большим наборам данныхТриллионы строк

Чек-лист масштабирования

  • Разделите обнаружение URL и получение данных. Используйте очередь сообщений между этапами.
  • Реализуйте правильную логику повторов. Экспоненциальная задержка с очередями мёртвых писем для постоянных ошибок.
  • Мониторьте всё. Глубину очереди, процент успеха, задержку, частоту ошибок по каждому целевому домену.
  • Используйте пулинг соединений. Переиспользуйте прокси-соединения вместо создания новых на каждый запрос.
  • Планируйте отказы. Воркеры падают, прокси блокируются, цели меняют структуру. Заложите устойчивость в каждый слой.
  • Тестируйте в масштабе перед запуском. Система, работающая на 100 RPM, может не справиться на 10 000 RPM из-за памяти, лимитов соединений или узких мест очереди.

О стратегиях ротации прокси для масштабной архитектуры читайте Стратегии ротации прокси для масштабного скрапинга. Об управлении лимитами скорости при масштабировании — Лимиты скорости при скрапинге.

Используйте Python SDK, Node SDK или Go SDK для интеграции прокси в продакшене и изучите тарифы ProxyHat для высоконагруженного скрапинга.

Часто задаваемые вопросы

Какая система очередей лучше для масштабного скрапинга?

Redis с Bull (Node.js) или RQ (Python) хорошо работает до миллионов задач в день. Для большего масштаба Apache Kafka или RabbitMQ обеспечивают лучшую надёжность и пропускную способность. Выбирайте на основе существующей инфраструктуры и опыта команды.

Сколько параллельных воркеров запускать?

Начните с 10-20 воркеров и масштабируйте на основе ёмкости прокси и толерантности целевого сайта. Мониторьте процент успеха — если он падает ниже 90%, снижайте конкурентность, прежде чем добавлять воркеров. Каждый воркер через ProxyHat получает автоматическую ротацию IP.

Использовать async или threading для воркеров?

Для I/O-привязанного скрапинга (большинство случаев) async (Python asyncio, Node.js) обеспечивает лучшую эффективность ресурсов. Используйте threading или multiprocessing только при CPU-интенсивном парсинге. Горутины Go отлично подходят для обоих паттернов.

Как обрабатывать изменения структуры целевого сайта?

Реализуйте валидацию данных в конвейере. Когда парсинг не проходит валидацию (отсутствующие поля, неверные типы), оповестите команду и поставьте затронутые URL на повторную обработку с обновлёнными парсерами. Версионируйте парсеры для возможности отката.

Готовы начать?

Доступ к более чем 50 млн резидентных IP в 148+ странах с AI-фильтрацией.

Смотреть ценыРезидентные прокси
← Вернуться в Блог