Почему инфраструктура скрапинга требует выделенной архитектуры
Однопоточный скрипт, обращающийся к одному сайту, работает для небольших задач. Но когда нужно ежедневно скрапить миллионы страниц с десятков целей, такой скрипт становится узким местом. Масштабирование инфраструктуры скрапинга требует перехода от линейных скриптов к распределённым, очередным архитектурам, которые грамотно обрабатывают ошибки, управляют ротацией прокси и максимизируют пропускную способность.
Это руководство охватывает архитектурные паттерны, системы очередей, стратегии горизонтального масштабирования и техники управления прокси для скрапинга промышленного уровня.
Статья опирается на концепции из нашего Полного руководства по прокси для веб-скрапинга. О расчёте пула прокси — Сколько прокси нужно для скрапинга?
Архитектурные паттерны масштабируемого скрапинга
Паттерн 1: Очередной скрапинг
Основа масштабируемого скрапинга — очередь сообщений, разделяющая обнаружение URL от получения данных. Воркеры берут задачи из очереди, получают страницы через прокси и отправляют результаты в хранилище.
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)
Преимущества этого паттерна:
- Горизонтальное масштабирование: добавляйте или убирайте воркеров без изменения системы
- Отказоустойчивость: неудачные задачи возвращаются в очередь для повторной попытки
- Контроль скорости: регулируйте количество воркеров для управления общей пропускной способностью
- Наблюдаемость: глубина очереди показывает отставание; скорость завершения — состояние системы
Реализация на Python с Redis Queue
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)
Реализация на Node.js с Bull Queue
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});
Паттерн 2: Конвейерная архитектура
Для сложных рабочих процессов используйте конвейер, где каждый этап отвечает за свою задачу:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouse
Реализация на Go
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}
Стратегии горизонтального масштабирования
Стратегия 1: Многомашинное развёртывание
Распределите воркеров между несколькими машинами. Очередь служит точкой координации:
| Компонент | Развёртывание | Масштабирование |
|---|---|---|
| Очередь (Redis/RabbitMQ) | Выделенный сервер или управляемый сервис | Вертикальное (больше RAM) |
| Воркеры | Несколько машин или контейнеров | Горизонтальное (добавление экземпляров) |
| Хранилище результатов | БД или объектное хранилище | Вертикальное + шардирование |
| Мониторинг | Централизованная панель | Один экземпляр |
Стратегия 2: Контейнерное масштабирование
Используйте Docker и Kubernetes для эластичного масштабирования. Каждый воркер работает в контейнере, который можно реплицировать:
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redis
Управление прокси в масштабе
При масштабировании управление прокси становится критичным компонентом системы. Ключевые аспекты:
Пулинг соединений
Переиспользуйте соединения с прокси-шлюзом вместо создания новых на каждый запрос. Это снижает задержку и нагрузку на соединения:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)
Мониторинг состояния
Мониторьте производительность прокси в реальном времени для раннего обнаружения проблем:
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)
Хранение данных в масштабе
| Тип хранилища | Лучше всего для | Масштаб |
|---|---|---|
| PostgreSQL | Структурированные данные о товарах/ценах | Миллионы строк |
| MongoDB | Полуструктурированные/переменные схемы | Миллиарды документов |
| S3/Объектное хранилище | Архивы сырого HTML | Петабайты |
| Elasticsearch | Полнотекстовый поиск по данным | Миллиарды документов |
| ClickHouse | Аналитика по большим наборам данных | Триллионы строк |
Чек-лист масштабирования
- Разделите обнаружение URL и получение данных. Используйте очередь сообщений между этапами.
- Реализуйте правильную логику повторов. Экспоненциальная задержка с очередями мёртвых писем для постоянных ошибок.
- Мониторьте всё. Глубину очереди, процент успеха, задержку, частоту ошибок по каждому целевому домену.
- Используйте пулинг соединений. Переиспользуйте прокси-соединения вместо создания новых на каждый запрос.
- Планируйте отказы. Воркеры падают, прокси блокируются, цели меняют структуру. Заложите устойчивость в каждый слой.
- Тестируйте в масштабе перед запуском. Система, работающая на 100 RPM, может не справиться на 10 000 RPM из-за памяти, лимитов соединений или узких мест очереди.
О стратегиях ротации прокси для масштабной архитектуры читайте Стратегии ротации прокси для масштабного скрапинга. Об управлении лимитами скорости при масштабировании — Лимиты скорости при скрапинге.
Используйте Python SDK, Node SDK или Go SDK для интеграции прокси в продакшене и изучите тарифы ProxyHat для высоконагруженного скрапинга.
Часто задаваемые вопросы
Какая система очередей лучше для масштабного скрапинга?
Redis с Bull (Node.js) или RQ (Python) хорошо работает до миллионов задач в день. Для большего масштаба Apache Kafka или RabbitMQ обеспечивают лучшую надёжность и пропускную способность. Выбирайте на основе существующей инфраструктуры и опыта команды.
Сколько параллельных воркеров запускать?
Начните с 10-20 воркеров и масштабируйте на основе ёмкости прокси и толерантности целевого сайта. Мониторьте процент успеха — если он падает ниже 90%, снижайте конкурентность, прежде чем добавлять воркеров. Каждый воркер через ProxyHat получает автоматическую ротацию IP.
Использовать async или threading для воркеров?
Для I/O-привязанного скрапинга (большинство случаев) async (Python asyncio, Node.js) обеспечивает лучшую эффективность ресурсов. Используйте threading или multiprocessing только при CPU-интенсивном парсинге. Горутины Go отлично подходят для обоих паттернов.
Как обрабатывать изменения структуры целевого сайта?
Реализуйте валидацию данных в конвейере. Когда парсинг не проходит валидацию (отсутствующие поля, неверные типы), оповестите команду и поставьте затронутые URL на повторную обработку с обновлёнными парсерами. Версионируйте парсеры для возможности отката.






