За пределами скриптов: почему важна архитектура
Скрапер из одного файла отлично работает для сотни страниц. Но когда вам нужно собирать миллионы точек данных с десятков целевых сайтов по регулярному расписанию, вам нужна система, а не скрипт. Надёжная архитектура скрапинга разделяет задачи на независимые компоненты, которые можно масштабировать, мониторить и восстанавливать независимо.
Это руководство проведёт вас через проектирование продакшн-системы скрапинга — от планирования URL до управления прокси и хранения данных. Каждый компонент проиллюстрирован кодом и подключён к прокси-инфраструктуре ProxyHat.
Хорошо спроектированный скрапер рассматривает сбор данных как инженерную задачу, а не как хакинг. Каждый компонент имеет единственную ответственность, чёткие интерфейсы и наблюдаемое поведение.
Обзор архитектуры системы
Продакшн-система скрапинга состоит из шести основных компонентов:
┌─────────────┐ ┌──────────────┐ ┌─────────────────┐
│ Scheduler │────▶│ URL Queue │────▶│ Crawler Pool │
│ (cron/API) │ │ (Redis) │ │ (workers) │
└─────────────┘ └──────────────┘ └────────┬────────┘
│
┌────────▼────────┐
│ Proxy Layer │
│ (ProxyHat) │
└────────┬────────┘
│
┌────────▼────────┐
│ Parser │
│ (extract data) │
└────────┬────────┘
│
┌──────────────┐ ┌────────▼────────┐
│ Monitoring │◀─────│ Storage │
│ (metrics) │ │ (DB / files) │
└──────────────┘ └─────────────────┘
| Компонент | Ответственность | Технология |
|---|---|---|
| Scheduler | Решает, что и когда скрапить | Cron, Celery Beat, Bull |
| URL Queue | Буферизация URL с приоритетом и дедупликацией | Redis, RabbitMQ, SQS |
| Crawler Pool | Параллельная загрузка страниц | asyncio, горутины, worker-потоки |
| Proxy Layer | Маршрутизация запросов через ротационные прокси | Шлюз ProxyHat |
| Parser | Извлечение структурированных данных из HTML/JSON | BeautifulSoup, cheerio, goquery |
| Storage | Сохранение извлечённых данных | PostgreSQL, MongoDB, S3 |
| Monitoring | Отслеживание здоровья и производительности | Prometheus, логирование, алерты |
Компонент 1: Scheduler
Планировщик решает, какие URL сканировать и когда. Он управляет частотой сканирования, уровнями приоритета и гарантирует, что ни один URL не сканируется чаще необходимого.
import redis
import json
import time
from datetime import datetime, timedelta
class CrawlScheduler:
"""Manages crawl schedules and feeds URLs to the queue."""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis = redis.from_url(redis_url)
self.queue_key = "scraper:url_queue"
self.schedule_key = "scraper:schedules"
self.seen_key = "scraper:seen_urls"
def add_schedule(self, name: str, urls: list[str], interval_hours: int, priority: int = 5):
"""Register a recurring crawl job."""
self.redis.hset(self.schedule_key, name, json.dumps({
"urls": urls,
"interval_hours": interval_hours,
"priority": priority,
"last_run": None,
}))
def tick(self):
"""Check all schedules and enqueue URLs that are due."""
schedules = self.redis.hgetall(self.schedule_key)
enqueued = 0
for name, data in schedules.items():
schedule = json.loads(data)
last_run = schedule.get("last_run")
interval = timedelta(hours=schedule["interval_hours"])
if last_run and datetime.fromisoformat(last_run) + interval > datetime.utcnow():
continue
for url in schedule["urls"]:
self.enqueue(url, priority=schedule["priority"])
enqueued += 1
schedule["last_run"] = datetime.utcnow().isoformat()
self.redis.hset(self.schedule_key, name, json.dumps(schedule))
return enqueued
def enqueue(self, url: str, priority: int = 5):
"""Add a URL to the crawl queue with deduplication."""
# Skip if recently seen (within 1 hour)
if self.redis.sismember(self.seen_key, url):
return False
self.redis.zadd(self.queue_key, {
json.dumps({"url": url, "enqueued_at": time.time()}): priority
})
self.redis.sadd(self.seen_key, url)
self.redis.expire(self.seen_key, 3600) # 1-hour dedup window
return True
def dequeue(self, batch_size: int = 10) -> list[dict]:
"""Pull the highest-priority URLs from the queue."""
items = self.redis.zpopmax(self.queue_key, batch_size)
return [json.loads(item) for item, score in items]
@property
def queue_size(self) -> int:
return self.redis.zcard(self.queue_key)
# Usage
scheduler = CrawlScheduler()
scheduler.add_schedule(
name="product_prices",
urls=[f"https://example.com/product/{i}" for i in range(1, 1001)],
interval_hours=6,
priority=8,
)
scheduler.add_schedule(
name="competitor_pages",
urls=["https://competitor.com/pricing", "https://competitor.com/features"],
interval_hours=24,
priority=5,
)
# Run every minute via cron
enqueued = scheduler.tick()
print(f"Enqueued {enqueued} URLs, queue size: {scheduler.queue_size}")
Компонент 2: Очередь URL
Очередь разделяет планирование и краулинг. Она обеспечивает приоритетный порядок, backpressure и персистентность — чтобы никакие URL не потерялись при падении краулера.
// Node.js queue with Bull
const Queue = require('bull');
const crawlQueue = new Queue('crawl', 'redis://localhost:6379', {
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 100,
removeOnFail: 500,
},
});
// Add URLs with priority (lower number = higher priority)
async function enqueueUrls(urls, priority = 5) {
const jobs = urls.map(url =>
crawlQueue.add(
{ url, enqueuedAt: Date.now() },
{ priority, jobId: url } // jobId for deduplication
)
);
await Promise.all(jobs);
console.log(`Enqueued ${urls.length} URLs`);
}
// Worker processes URLs
crawlQueue.process(10, async (job) => {
const { url } = job.data;
// Crawl logic here (see Crawler Pool below)
return { url, status: 'completed' };
});
crawlQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed: ${err.message}`);
});
Компонент 3: Пул краулеров
Пул краулеров загружает страницы параллельно через промежуточный слой прокси. Он управляет лимитами конкурентности, обрабатывает повторные попытки и передаёт сырые ответы парсеру.
import asyncio
import aiohttp
import uuid
import time
from typing import Optional
class CrawlerPool:
"""Concurrent crawler with proxy rotation and retry logic."""
def __init__(
self,
concurrency: int = 30,
max_retries: int = 3,
timeout: int = 30,
):
self.concurrency = concurrency
self.max_retries = max_retries
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.semaphore = asyncio.Semaphore(concurrency)
self.stats = {"success": 0, "failed": 0, "retries": 0}
def _get_proxy(self, country: Optional[str] = None) -> str:
session_id = uuid.uuid4().hex[:8]
username = f"USERNAME-session-{session_id}"
if country:
username += f"-country-{country}"
return f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
async def fetch(
self,
session: aiohttp.ClientSession,
url: str,
country: Optional[str] = None,
) -> dict:
async with self.semaphore:
for attempt in range(self.max_retries + 1):
proxy = self._get_proxy(country)
start = time.time()
try:
async with session.get(
url, proxy=proxy, timeout=self.timeout,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
},
) as response:
body = await response.text()
latency = time.time() - start
if response.status in (403, 429, 503) and attempt < self.max_retries:
self.stats["retries"] += 1
await asyncio.sleep(2 ** attempt)
continue
if response.status < 400:
self.stats["success"] += 1
else:
self.stats["failed"] += 1
return {
"url": url,
"status": response.status,
"body": body,
"latency": latency,
"success": response.status < 400,
}
except Exception as e:
if attempt < self.max_retries:
self.stats["retries"] += 1
await asyncio.sleep(2 ** attempt)
continue
self.stats["failed"] += 1
return {
"url": url,
"error": str(e),
"latency": time.time() - start,
"success": False,
}
async def crawl(self, urls: list[str], country: Optional[str] = None) -> list[dict]:
async with aiohttp.ClientSession() as session:
tasks = [self.fetch(session, url, country) for url in urls]
return await asyncio.gather(*tasks)
# Usage
crawler = CrawlerPool(concurrency=30, max_retries=3)
urls = [f"https://example.com/product/{i}" for i in range(500)]
results = asyncio.run(crawler.crawl(urls, country="us"))
print(f"Success: {crawler.stats['success']}, Failed: {crawler.stats['failed']}")
Компонент 4: Парсер
Парсер преобразует сырой HTML в структурированные данные. Держите логику парсинга отдельно от краулинга — это делает её тестируемой, переиспользуемой и простой для обновления при изменении целевых сайтов.
from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
import json
@dataclass
class ProductData:
url: str
title: Optional[str] = None
price: Optional[float] = None
currency: Optional[str] = None
availability: Optional[str] = None
rating: Optional[float] = None
review_count: Optional[int] = None
parsed_at: Optional[str] = None
class ProductParser:
"""Extracts structured product data from HTML."""
def parse(self, url: str, html: str) -> ProductData:
soup = BeautifulSoup(html, "html.parser")
data = ProductData(url=url)
try:
data.title = self._extract_text(soup, "h1.product-title")
data.price = self._extract_price(soup)
data.currency = self._extract_currency(soup)
data.availability = self._extract_text(soup, ".availability-status")
data.rating = self._extract_rating(soup)
data.review_count = self._extract_review_count(soup)
data.parsed_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
except Exception as e:
logger.error(f"Parse error for {url}: {e}")
return data
def _extract_text(self, soup, selector: str) -> Optional[str]:
el = soup.select_one(selector)
return el.get_text(strip=True) if el else None
def _extract_price(self, soup) -> Optional[float]:
el = soup.select_one("[data-price], .price, .product-price")
if not el:
return None
price_text = el.get("data-price") or el.get_text(strip=True)
# Remove currency symbols and parse
cleaned = "".join(c for c in price_text if c.isdigit() or c == ".")
return float(cleaned) if cleaned else None
def _extract_currency(self, soup) -> Optional[str]:
el = soup.select_one("[data-currency], .currency")
return el.get("data-currency") or el.get_text(strip=True) if el else None
def _extract_rating(self, soup) -> Optional[float]:
el = soup.select_one("[data-rating], .rating-value")
if el:
val = el.get("data-rating") or el.get_text(strip=True)
try:
return float(val)
except ValueError:
return None
return None
def _extract_review_count(self, soup) -> Optional[int]:
el = soup.select_one(".review-count, [data-reviews]")
if el:
text = el.get("data-reviews") or el.get_text(strip=True)
digits = "".join(c for c in text if c.isdigit())
return int(digits) if digits else None
return None
# Usage
parser = ProductParser()
for result in results:
if result["success"]:
product = parser.parse(result["url"], result["body"])
print(f"{product.title}: ${product.price}")
Компонент 5: Хранилище
Сохраняйте парсированные данные с версионированием и дедупликацией. Используйте upsert-ы для корректной обработки повторных сканирований.
import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
class DataStore:
"""Persists scraped data with upsert and versioning."""
def __init__(self, dsn: str):
self.conn = psycopg2.connect(dsn)
self._ensure_tables()
def _ensure_tables(self):
with self.conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS products (
url TEXT PRIMARY KEY,
title TEXT,
price NUMERIC(10,2),
currency VARCHAR(3),
availability TEXT,
rating NUMERIC(3,2),
review_count INTEGER,
first_seen_at TIMESTAMPTZ DEFAULT NOW(),
last_updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE IF NOT EXISTS price_history (
id SERIAL PRIMARY KEY,
url TEXT REFERENCES products(url),
price NUMERIC(10,2),
recorded_at TIMESTAMPTZ DEFAULT NOW()
);
""")
self.conn.commit()
def upsert_products(self, products: list[ProductData]):
"""Insert or update products, recording price changes."""
with self.conn.cursor() as cur:
for product in products:
if product.price is None:
continue
# Check if price changed
cur.execute(
"SELECT price FROM products WHERE url = %s",
(product.url,)
)
row = cur.fetchone()
# Upsert product
cur.execute("""
INSERT INTO products (url, title, price, currency, availability,
rating, review_count, last_updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())
ON CONFLICT (url) DO UPDATE SET
title = EXCLUDED.title,
price = EXCLUDED.price,
currency = EXCLUDED.currency,
availability = EXCLUDED.availability,
rating = EXCLUDED.rating,
review_count = EXCLUDED.review_count,
last_updated_at = NOW()
""", (
product.url, product.title, product.price,
product.currency, product.availability,
product.rating, product.review_count,
))
# Record price history if changed
if row is None or float(row[0]) != product.price:
cur.execute(
"INSERT INTO price_history (url, price) VALUES (%s, %s)",
(product.url, product.price)
)
self.conn.commit()
@property
def product_count(self) -> int:
with self.conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM products")
return cur.fetchone()[0]
# Usage
store = DataStore("postgresql://user:pass@localhost:5432/scraper")
store.upsert_products(parsed_products)
print(f"Total products: {store.product_count}")
Компонент 6: Мониторинг
Свяжите все компоненты воедино с помощью централизованного мониторинга. Подробные паттерны мониторинга прокси описаны в нашем руководстве Мониторинг производительности прокси.
import time
import json
import logging
from datetime import datetime
class PipelineMonitor:
"""Monitors the entire scraping pipeline."""
def __init__(self):
self.logger = logging.getLogger("pipeline")
self.stage_times = {}
self.stage_counts = {}
def start_stage(self, stage: str):
self.stage_times[stage] = time.time()
def end_stage(self, stage: str, item_count: int = 0):
elapsed = time.time() - self.stage_times.get(stage, time.time())
self.stage_counts[stage] = item_count
self.logger.info(json.dumps({
"timestamp": datetime.utcnow().isoformat(),
"stage": stage,
"duration_seconds": round(elapsed, 2),
"items_processed": item_count,
"items_per_second": round(item_count / elapsed, 1) if elapsed > 0 else 0,
}))
def report(self) -> dict:
return {
"stages": {
stage: {
"items": self.stage_counts.get(stage, 0),
}
for stage in self.stage_counts
},
"timestamp": datetime.utcnow().isoformat(),
}
# Usage within the full pipeline
monitor = PipelineMonitor()
# Stage 1: Schedule
monitor.start_stage("schedule")
urls = scheduler.dequeue(batch_size=500)
monitor.end_stage("schedule", len(urls))
# Stage 2: Crawl
monitor.start_stage("crawl")
results = asyncio.run(crawler.crawl([u["url"] for u in urls]))
monitor.end_stage("crawl", len(results))
# Stage 3: Parse
monitor.start_stage("parse")
products = [parser.parse(r["url"], r["body"]) for r in results if r["success"]]
monitor.end_stage("parse", len(products))
# Stage 4: Store
monitor.start_stage("store")
store.upsert_products(products)
monitor.end_stage("store", len(products))
print(json.dumps(monitor.report(), indent=2))
Собираем всё вместе
Вот полный конвейер, который связывает все шесть компонентов в единую, запускаемую систему скрапинга.
import asyncio
import logging
logging.basicConfig(level=logging.INFO)
async def run_pipeline():
# Initialize components
scheduler = CrawlScheduler()
crawler = CrawlerPool(concurrency=30, max_retries=3)
parser = ProductParser()
store = DataStore("postgresql://user:pass@localhost:5432/scraper")
monitor = PipelineMonitor()
# Schedule crawls
scheduler.add_schedule(
name="daily_products",
urls=[f"https://example.com/product/{i}" for i in range(1, 501)],
interval_hours=24,
priority=8,
)
# Main loop
while True:
# 1. Check schedules and enqueue URLs
monitor.start_stage("schedule")
scheduler.tick()
batch = scheduler.dequeue(batch_size=100)
monitor.end_stage("schedule", len(batch))
if not batch:
await asyncio.sleep(60)
continue
# 2. Crawl
monitor.start_stage("crawl")
urls = [item["url"] for item in batch]
results = await crawler.crawl(urls, country="us")
successful = [r for r in results if r.get("success")]
monitor.end_stage("crawl", len(successful))
# 3. Parse
monitor.start_stage("parse")
products = []
for result in successful:
try:
product = parser.parse(result["url"], result["body"])
products.append(product)
except Exception as e:
logging.error(f"Parse error: {e}")
monitor.end_stage("parse", len(products))
# 4. Store
monitor.start_stage("store")
store.upsert_products(products)
monitor.end_stage("store", len(products))
# 5. Report
logging.info(f"Pipeline: {monitor.report()}")
logging.info(f"Crawler stats: {crawler.stats}")
# Wait before next batch
await asyncio.sleep(5)
asyncio.run(run_pipeline())
Паттерны продакшн-деплоя
Горизонтальное масштабирование
Масштабируйте краулеры независимо от планировщиков и парсеров. Запускайте несколько экземпляров краулеров, потребляющих из одной очереди Redis.
| Компонент | Стратегия масштабирования | Типичный масштаб |
|---|---|---|
| Scheduler | Единственный экземпляр (leader election) | 1 экземпляр |
| URL Queue | Redis-кластер или управляемая очередь | 1 кластер |
| Crawler Pool | Горизонтальное масштабирование подов | 2-20 экземпляров |
| Proxy Layer | Управляется ProxyHat | Н/Д (внешний) |
| Parser | Совмещён с краулером или отдельно | 1:1 с краулерами |
| Storage | Репликация базы данных | 1 primary + реплики |
| Monitoring | Централизованная агрегация | 1 экземпляр |
Восстановление после ошибок
- Падение краулера: URL остаются в очереди Redis. Новый краулер подхватывает их автоматически.
- Сбои прокси: Промежуточный слой повторяет с новыми IP. Устойчивые сбои запускают алерты.
- Ошибки парсера: Сырой HTML сохраняется в dead-letter-очередь для ручной проверки и обновления парсеров.
- Сбои базы данных: Парсированные данные буферизуются в памяти/на диске с write-ahead логированием до восстановления БД.
Проверки качества данных
class DataQualityChecker:
"""Validates parsed data before storage."""
def check(self, product: ProductData) -> list[str]:
issues = []
if not product.title:
issues.append("missing_title")
if product.price is not None and product.price <= 0:
issues.append("invalid_price")
if product.price is not None and product.price > 100000:
issues.append("suspicious_price")
if product.rating is not None and (product.rating < 0 or product.rating > 5):
issues.append("invalid_rating")
return issues
def filter_valid(self, products: list[ProductData]) -> list[ProductData]:
valid = []
for product in products:
issues = self.check(product)
if issues:
logger.warning(f"Data quality issues for {product.url}: {issues}")
else:
valid.append(product)
return valid
# Integrate into pipeline
checker = DataQualityChecker()
valid_products = checker.filter_valid(products)
store.upsert_products(valid_products)
Лучшая архитектура скрапинга — та, о которой можно рассуждать. Каждый компонент должен отвечать на три вопроса: Что он делает? Как он отказывает? Как он восстанавливается?
Для компонента прокси-слоя смотрите Создание промежуточного слоя прокси. Для оптимизации пропускной способности краулера читайте Масштабирование прокси-запросов с контролем конкурентности. Для краулинга через браузер смотрите Ротация прокси с Playwright. Для стратегий защиты от обнаружения изучите как скрапить сайты без блокировки.
Начните с Python SDK, Node SDK или Go SDK для интеграции с прокси. Изучите тарифы ProxyHat и документацию для создания вашей архитектуры скрапинга.






