Проектирование надёжной архитектуры скрапинга

Спроектируйте сквозную систему скрапинга: планировщик, очередь URL, пул краулеров, слой прокси, парсер, хранилище и мониторинг. Готовый к продакшну код на Python с диаграммами архитектуры.

Проектирование надёжной архитектуры скрапинга

За пределами скриптов: почему важна архитектура

Скрапер из одного файла отлично работает для сотни страниц. Но когда вам нужно собирать миллионы точек данных с десятков целевых сайтов по регулярному расписанию, вам нужна система, а не скрипт. Надёжная архитектура скрапинга разделяет задачи на независимые компоненты, которые можно масштабировать, мониторить и восстанавливать независимо.

Это руководство проведёт вас через проектирование продакшн-системы скрапинга — от планирования URL до управления прокси и хранения данных. Каждый компонент проиллюстрирован кодом и подключён к прокси-инфраструктуре ProxyHat.

Хорошо спроектированный скрапер рассматривает сбор данных как инженерную задачу, а не как хакинг. Каждый компонент имеет единственную ответственность, чёткие интерфейсы и наблюдаемое поведение.

Обзор архитектуры системы

Продакшн-система скрапинга состоит из шести основных компонентов:

┌─────────────┐     ┌──────────────┐     ┌─────────────────┐
│  Scheduler   │────▶│  URL Queue   │────▶│  Crawler Pool   │
│  (cron/API)  │     │  (Redis)     │     │  (workers)      │
└─────────────┘     └──────────────┘     └────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │  Proxy Layer     │
                                          │  (ProxyHat)      │
                                          └────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │  Parser          │
                                          │  (extract data)  │
                                          └────────┬────────┘
                                                   │
                    ┌──────────────┐      ┌────────▼────────┐
                    │  Monitoring  │◀─────│  Storage         │
                    │  (metrics)   │      │  (DB / files)    │
                    └──────────────┘      └─────────────────┘
КомпонентОтветственностьТехнология
SchedulerРешает, что и когда скрапитьCron, Celery Beat, Bull
URL QueueБуферизация URL с приоритетом и дедупликациейRedis, RabbitMQ, SQS
Crawler PoolПараллельная загрузка страницasyncio, горутины, worker-потоки
Proxy LayerМаршрутизация запросов через ротационные проксиШлюз ProxyHat
ParserИзвлечение структурированных данных из HTML/JSONBeautifulSoup, cheerio, goquery
StorageСохранение извлечённых данныхPostgreSQL, MongoDB, S3
MonitoringОтслеживание здоровья и производительностиPrometheus, логирование, алерты

Компонент 1: Scheduler

Планировщик решает, какие URL сканировать и когда. Он управляет частотой сканирования, уровнями приоритета и гарантирует, что ни один URL не сканируется чаще необходимого.

import redis
import json
import time
from datetime import datetime, timedelta
class CrawlScheduler:
    """Manages crawl schedules and feeds URLs to the queue."""
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scraper:url_queue"
        self.schedule_key = "scraper:schedules"
        self.seen_key = "scraper:seen_urls"
    def add_schedule(self, name: str, urls: list[str], interval_hours: int, priority: int = 5):
        """Register a recurring crawl job."""
        self.redis.hset(self.schedule_key, name, json.dumps({
            "urls": urls,
            "interval_hours": interval_hours,
            "priority": priority,
            "last_run": None,
        }))
    def tick(self):
        """Check all schedules and enqueue URLs that are due."""
        schedules = self.redis.hgetall(self.schedule_key)
        enqueued = 0
        for name, data in schedules.items():
            schedule = json.loads(data)
            last_run = schedule.get("last_run")
            interval = timedelta(hours=schedule["interval_hours"])
            if last_run and datetime.fromisoformat(last_run) + interval > datetime.utcnow():
                continue
            for url in schedule["urls"]:
                self.enqueue(url, priority=schedule["priority"])
                enqueued += 1
            schedule["last_run"] = datetime.utcnow().isoformat()
            self.redis.hset(self.schedule_key, name, json.dumps(schedule))
        return enqueued
    def enqueue(self, url: str, priority: int = 5):
        """Add a URL to the crawl queue with deduplication."""
        # Skip if recently seen (within 1 hour)
        if self.redis.sismember(self.seen_key, url):
            return False
        self.redis.zadd(self.queue_key, {
            json.dumps({"url": url, "enqueued_at": time.time()}): priority
        })
        self.redis.sadd(self.seen_key, url)
        self.redis.expire(self.seen_key, 3600)  # 1-hour dedup window
        return True
    def dequeue(self, batch_size: int = 10) -> list[dict]:
        """Pull the highest-priority URLs from the queue."""
        items = self.redis.zpopmax(self.queue_key, batch_size)
        return [json.loads(item) for item, score in items]
    @property
    def queue_size(self) -> int:
        return self.redis.zcard(self.queue_key)
# Usage
scheduler = CrawlScheduler()
scheduler.add_schedule(
    name="product_prices",
    urls=[f"https://example.com/product/{i}" for i in range(1, 1001)],
    interval_hours=6,
    priority=8,
)
scheduler.add_schedule(
    name="competitor_pages",
    urls=["https://competitor.com/pricing", "https://competitor.com/features"],
    interval_hours=24,
    priority=5,
)
# Run every minute via cron
enqueued = scheduler.tick()
print(f"Enqueued {enqueued} URLs, queue size: {scheduler.queue_size}")

Компонент 2: Очередь URL

Очередь разделяет планирование и краулинг. Она обеспечивает приоритетный порядок, backpressure и персистентность — чтобы никакие URL не потерялись при падении краулера.

// Node.js queue with Bull
const Queue = require('bull');
const crawlQueue = new Queue('crawl', 'redis://localhost:6379', {
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 },
    removeOnComplete: 100,
    removeOnFail: 500,
  },
});
// Add URLs with priority (lower number = higher priority)
async function enqueueUrls(urls, priority = 5) {
  const jobs = urls.map(url =>
    crawlQueue.add(
      { url, enqueuedAt: Date.now() },
      { priority, jobId: url } // jobId for deduplication
    )
  );
  await Promise.all(jobs);
  console.log(`Enqueued ${urls.length} URLs`);
}
// Worker processes URLs
crawlQueue.process(10, async (job) => {
  const { url } = job.data;
  // Crawl logic here (see Crawler Pool below)
  return { url, status: 'completed' };
});
crawlQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed: ${err.message}`);
});

Компонент 3: Пул краулеров

Пул краулеров загружает страницы параллельно через промежуточный слой прокси. Он управляет лимитами конкурентности, обрабатывает повторные попытки и передаёт сырые ответы парсеру.

import asyncio
import aiohttp
import uuid
import time
from typing import Optional
class CrawlerPool:
    """Concurrent crawler with proxy rotation and retry logic."""
    def __init__(
        self,
        concurrency: int = 30,
        max_retries: int = 3,
        timeout: int = 30,
    ):
        self.concurrency = concurrency
        self.max_retries = max_retries
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(concurrency)
        self.stats = {"success": 0, "failed": 0, "retries": 0}
    def _get_proxy(self, country: Optional[str] = None) -> str:
        session_id = uuid.uuid4().hex[:8]
        username = f"USERNAME-session-{session_id}"
        if country:
            username += f"-country-{country}"
        return f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
    async def fetch(
        self,
        session: aiohttp.ClientSession,
        url: str,
        country: Optional[str] = None,
    ) -> dict:
        async with self.semaphore:
            for attempt in range(self.max_retries + 1):
                proxy = self._get_proxy(country)
                start = time.time()
                try:
                    async with session.get(
                        url, proxy=proxy, timeout=self.timeout,
                        headers={
                            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                                          "AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
                            "Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
                        },
                    ) as response:
                        body = await response.text()
                        latency = time.time() - start
                        if response.status in (403, 429, 503) and attempt < self.max_retries:
                            self.stats["retries"] += 1
                            await asyncio.sleep(2 ** attempt)
                            continue
                        if response.status < 400:
                            self.stats["success"] += 1
                        else:
                            self.stats["failed"] += 1
                        return {
                            "url": url,
                            "status": response.status,
                            "body": body,
                            "latency": latency,
                            "success": response.status < 400,
                        }
                except Exception as e:
                    if attempt < self.max_retries:
                        self.stats["retries"] += 1
                        await asyncio.sleep(2 ** attempt)
                        continue
                    self.stats["failed"] += 1
                    return {
                        "url": url,
                        "error": str(e),
                        "latency": time.time() - start,
                        "success": False,
                    }
    async def crawl(self, urls: list[str], country: Optional[str] = None) -> list[dict]:
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url, country) for url in urls]
            return await asyncio.gather(*tasks)
# Usage
crawler = CrawlerPool(concurrency=30, max_retries=3)
urls = [f"https://example.com/product/{i}" for i in range(500)]
results = asyncio.run(crawler.crawl(urls, country="us"))
print(f"Success: {crawler.stats['success']}, Failed: {crawler.stats['failed']}")

Компонент 4: Парсер

Парсер преобразует сырой HTML в структурированные данные. Держите логику парсинга отдельно от краулинга — это делает её тестируемой, переиспользуемой и простой для обновления при изменении целевых сайтов.

from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
import json
@dataclass
class ProductData:
    url: str
    title: Optional[str] = None
    price: Optional[float] = None
    currency: Optional[str] = None
    availability: Optional[str] = None
    rating: Optional[float] = None
    review_count: Optional[int] = None
    parsed_at: Optional[str] = None
class ProductParser:
    """Extracts structured product data from HTML."""
    def parse(self, url: str, html: str) -> ProductData:
        soup = BeautifulSoup(html, "html.parser")
        data = ProductData(url=url)
        try:
            data.title = self._extract_text(soup, "h1.product-title")
            data.price = self._extract_price(soup)
            data.currency = self._extract_currency(soup)
            data.availability = self._extract_text(soup, ".availability-status")
            data.rating = self._extract_rating(soup)
            data.review_count = self._extract_review_count(soup)
            data.parsed_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        except Exception as e:
            logger.error(f"Parse error for {url}: {e}")
        return data
    def _extract_text(self, soup, selector: str) -> Optional[str]:
        el = soup.select_one(selector)
        return el.get_text(strip=True) if el else None
    def _extract_price(self, soup) -> Optional[float]:
        el = soup.select_one("[data-price], .price, .product-price")
        if not el:
            return None
        price_text = el.get("data-price") or el.get_text(strip=True)
        # Remove currency symbols and parse
        cleaned = "".join(c for c in price_text if c.isdigit() or c == ".")
        return float(cleaned) if cleaned else None
    def _extract_currency(self, soup) -> Optional[str]:
        el = soup.select_one("[data-currency], .currency")
        return el.get("data-currency") or el.get_text(strip=True) if el else None
    def _extract_rating(self, soup) -> Optional[float]:
        el = soup.select_one("[data-rating], .rating-value")
        if el:
            val = el.get("data-rating") or el.get_text(strip=True)
            try:
                return float(val)
            except ValueError:
                return None
        return None
    def _extract_review_count(self, soup) -> Optional[int]:
        el = soup.select_one(".review-count, [data-reviews]")
        if el:
            text = el.get("data-reviews") or el.get_text(strip=True)
            digits = "".join(c for c in text if c.isdigit())
            return int(digits) if digits else None
        return None
# Usage
parser = ProductParser()
for result in results:
    if result["success"]:
        product = parser.parse(result["url"], result["body"])
        print(f"{product.title}: ${product.price}")

Компонент 5: Хранилище

Сохраняйте парсированные данные с версионированием и дедупликацией. Используйте upsert-ы для корректной обработки повторных сканирований.

import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
class DataStore:
    """Persists scraped data with upsert and versioning."""
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self._ensure_tables()
    def _ensure_tables(self):
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    url TEXT PRIMARY KEY,
                    title TEXT,
                    price NUMERIC(10,2),
                    currency VARCHAR(3),
                    availability TEXT,
                    rating NUMERIC(3,2),
                    review_count INTEGER,
                    first_seen_at TIMESTAMPTZ DEFAULT NOW(),
                    last_updated_at TIMESTAMPTZ DEFAULT NOW()
                );
                CREATE TABLE IF NOT EXISTS price_history (
                    id SERIAL PRIMARY KEY,
                    url TEXT REFERENCES products(url),
                    price NUMERIC(10,2),
                    recorded_at TIMESTAMPTZ DEFAULT NOW()
                );
            """)
            self.conn.commit()
    def upsert_products(self, products: list[ProductData]):
        """Insert or update products, recording price changes."""
        with self.conn.cursor() as cur:
            for product in products:
                if product.price is None:
                    continue
                # Check if price changed
                cur.execute(
                    "SELECT price FROM products WHERE url = %s",
                    (product.url,)
                )
                row = cur.fetchone()
                # Upsert product
                cur.execute("""
                    INSERT INTO products (url, title, price, currency, availability,
                                          rating, review_count, last_updated_at)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())
                    ON CONFLICT (url) DO UPDATE SET
                        title = EXCLUDED.title,
                        price = EXCLUDED.price,
                        currency = EXCLUDED.currency,
                        availability = EXCLUDED.availability,
                        rating = EXCLUDED.rating,
                        review_count = EXCLUDED.review_count,
                        last_updated_at = NOW()
                """, (
                    product.url, product.title, product.price,
                    product.currency, product.availability,
                    product.rating, product.review_count,
                ))
                # Record price history if changed
                if row is None or float(row[0]) != product.price:
                    cur.execute(
                        "INSERT INTO price_history (url, price) VALUES (%s, %s)",
                        (product.url, product.price)
                    )
            self.conn.commit()
    @property
    def product_count(self) -> int:
        with self.conn.cursor() as cur:
            cur.execute("SELECT COUNT(*) FROM products")
            return cur.fetchone()[0]
# Usage
store = DataStore("postgresql://user:pass@localhost:5432/scraper")
store.upsert_products(parsed_products)
print(f"Total products: {store.product_count}")

Компонент 6: Мониторинг

Свяжите все компоненты воедино с помощью централизованного мониторинга. Подробные паттерны мониторинга прокси описаны в нашем руководстве Мониторинг производительности прокси.

import time
import json
import logging
from datetime import datetime
class PipelineMonitor:
    """Monitors the entire scraping pipeline."""
    def __init__(self):
        self.logger = logging.getLogger("pipeline")
        self.stage_times = {}
        self.stage_counts = {}
    def start_stage(self, stage: str):
        self.stage_times[stage] = time.time()
    def end_stage(self, stage: str, item_count: int = 0):
        elapsed = time.time() - self.stage_times.get(stage, time.time())
        self.stage_counts[stage] = item_count
        self.logger.info(json.dumps({
            "timestamp": datetime.utcnow().isoformat(),
            "stage": stage,
            "duration_seconds": round(elapsed, 2),
            "items_processed": item_count,
            "items_per_second": round(item_count / elapsed, 1) if elapsed > 0 else 0,
        }))
    def report(self) -> dict:
        return {
            "stages": {
                stage: {
                    "items": self.stage_counts.get(stage, 0),
                }
                for stage in self.stage_counts
            },
            "timestamp": datetime.utcnow().isoformat(),
        }
# Usage within the full pipeline
monitor = PipelineMonitor()
# Stage 1: Schedule
monitor.start_stage("schedule")
urls = scheduler.dequeue(batch_size=500)
monitor.end_stage("schedule", len(urls))
# Stage 2: Crawl
monitor.start_stage("crawl")
results = asyncio.run(crawler.crawl([u["url"] for u in urls]))
monitor.end_stage("crawl", len(results))
# Stage 3: Parse
monitor.start_stage("parse")
products = [parser.parse(r["url"], r["body"]) for r in results if r["success"]]
monitor.end_stage("parse", len(products))
# Stage 4: Store
monitor.start_stage("store")
store.upsert_products(products)
monitor.end_stage("store", len(products))
print(json.dumps(monitor.report(), indent=2))

Собираем всё вместе

Вот полный конвейер, который связывает все шесть компонентов в единую, запускаемую систему скрапинга.

import asyncio
import logging
logging.basicConfig(level=logging.INFO)
async def run_pipeline():
    # Initialize components
    scheduler = CrawlScheduler()
    crawler = CrawlerPool(concurrency=30, max_retries=3)
    parser = ProductParser()
    store = DataStore("postgresql://user:pass@localhost:5432/scraper")
    monitor = PipelineMonitor()
    # Schedule crawls
    scheduler.add_schedule(
        name="daily_products",
        urls=[f"https://example.com/product/{i}" for i in range(1, 501)],
        interval_hours=24,
        priority=8,
    )
    # Main loop
    while True:
        # 1. Check schedules and enqueue URLs
        monitor.start_stage("schedule")
        scheduler.tick()
        batch = scheduler.dequeue(batch_size=100)
        monitor.end_stage("schedule", len(batch))
        if not batch:
            await asyncio.sleep(60)
            continue
        # 2. Crawl
        monitor.start_stage("crawl")
        urls = [item["url"] for item in batch]
        results = await crawler.crawl(urls, country="us")
        successful = [r for r in results if r.get("success")]
        monitor.end_stage("crawl", len(successful))
        # 3. Parse
        monitor.start_stage("parse")
        products = []
        for result in successful:
            try:
                product = parser.parse(result["url"], result["body"])
                products.append(product)
            except Exception as e:
                logging.error(f"Parse error: {e}")
        monitor.end_stage("parse", len(products))
        # 4. Store
        monitor.start_stage("store")
        store.upsert_products(products)
        monitor.end_stage("store", len(products))
        # 5. Report
        logging.info(f"Pipeline: {monitor.report()}")
        logging.info(f"Crawler stats: {crawler.stats}")
        # Wait before next batch
        await asyncio.sleep(5)
asyncio.run(run_pipeline())

Паттерны продакшн-деплоя

Горизонтальное масштабирование

Масштабируйте краулеры независимо от планировщиков и парсеров. Запускайте несколько экземпляров краулеров, потребляющих из одной очереди Redis.

КомпонентСтратегия масштабированияТипичный масштаб
SchedulerЕдинственный экземпляр (leader election)1 экземпляр
URL QueueRedis-кластер или управляемая очередь1 кластер
Crawler PoolГоризонтальное масштабирование подов2-20 экземпляров
Proxy LayerУправляется ProxyHatН/Д (внешний)
ParserСовмещён с краулером или отдельно1:1 с краулерами
StorageРепликация базы данных1 primary + реплики
MonitoringЦентрализованная агрегация1 экземпляр

Восстановление после ошибок

  • Падение краулера: URL остаются в очереди Redis. Новый краулер подхватывает их автоматически.
  • Сбои прокси: Промежуточный слой повторяет с новыми IP. Устойчивые сбои запускают алерты.
  • Ошибки парсера: Сырой HTML сохраняется в dead-letter-очередь для ручной проверки и обновления парсеров.
  • Сбои базы данных: Парсированные данные буферизуются в памяти/на диске с write-ahead логированием до восстановления БД.

Проверки качества данных

class DataQualityChecker:
    """Validates parsed data before storage."""
    def check(self, product: ProductData) -> list[str]:
        issues = []
        if not product.title:
            issues.append("missing_title")
        if product.price is not None and product.price <= 0:
            issues.append("invalid_price")
        if product.price is not None and product.price > 100000:
            issues.append("suspicious_price")
        if product.rating is not None and (product.rating < 0 or product.rating > 5):
            issues.append("invalid_rating")
        return issues
    def filter_valid(self, products: list[ProductData]) -> list[ProductData]:
        valid = []
        for product in products:
            issues = self.check(product)
            if issues:
                logger.warning(f"Data quality issues for {product.url}: {issues}")
            else:
                valid.append(product)
        return valid
# Integrate into pipeline
checker = DataQualityChecker()
valid_products = checker.filter_valid(products)
store.upsert_products(valid_products)
Лучшая архитектура скрапинга — та, о которой можно рассуждать. Каждый компонент должен отвечать на три вопроса: Что он делает? Как он отказывает? Как он восстанавливается?

Для компонента прокси-слоя смотрите Создание промежуточного слоя прокси. Для оптимизации пропускной способности краулера читайте Масштабирование прокси-запросов с контролем конкурентности. Для краулинга через браузер смотрите Ротация прокси с Playwright. Для стратегий защиты от обнаружения изучите как скрапить сайты без блокировки.

Начните с Python SDK, Node SDK или Go SDK для интеграции с прокси. Изучите тарифы ProxyHat и документацию для создания вашей архитектуры скрапинга.

Часто задаваемые вопросы

Готовы начать?

Доступ к более чем 50 млн резидентных IP в 148+ странах с AI-фильтрацией.

Смотреть ценыРезидентные прокси
← Вернуться в Блог