Projektowanie niezawodnej architektury skraplania

Zaprojektuj system scrating end-to@-@ end: terminarz, kolejka URL, pula pełzaczy, warstwa proxy, parser, przechowywanie i monitorowanie. Production- ready Python kod z diagramami architektury.

Projektowanie niezawodnej architektury skraplania

Poza skryptami: Dlaczego architektura ma znaczenie

Rzepak pojedynczych plików działa dobrze na sto stron. Ale kiedy musisz zebrać miliony punktów danych w kilkudziesięciu miejscach docelowych na powtarzającym się harmonogramie, potrzebujesz systemu - nie skryptu. Niezawodna architektura skrawania rozdziela obawy na niezależne komponenty, które mogą być skalowane, monitorowane i odzyskiwane niezależnie.

Ten przewodnik przechodzi przez projekt systemu scrating produkcji, od harmonogramu URL poprzez zarządzanie proxy do przechowywania danych. Każdy element jest ilustrowany kodem i podłączony do Infrastruktura proksyHat proxy.

Dobrze dopasowany scraper traktuje zbieranie danych jako problem inżynieryjny, a nie hakerski. Każdy składnik ma jedną odpowiedzialność, jasne interfejsy i obserwowalne zachowanie.

Przegląd architektury systemu

Produkcyjny system skrobania składa się z sześciu podstawowych elementów:

┌─────────────┐     ┌──────────────┐     ┌─────────────────┐
│  Scheduler   │────▶│  URL Queue   │────▶│  Crawler Pool   │
│  (cron/API)  │     │  (Redis)     │     │  (workers)      │
└─────────────┘     └──────────────┘     └────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │  Proxy Layer     │
                                          │  (ProxyHat)      │
                                          └────────┬────────┘
                                                   │
                                          ┌────────▼────────┐
                                          │  Parser          │
                                          │  (extract data)  │
                                          └────────┬────────┘
                                                   │
                    ┌──────────────┐      ┌────────▼────────┐
                    │  Monitoring  │◀─────│  Storage         │
                    │  (metrics)   │      │  (DB / files)    │
                    └──────────────┘      └─────────────────┘
SkładnikOdpowiedzialnośćTechnologia
SchedulerDecyduje, co zeskrobać i kiedyCron, Celery Beat, Bull
Kolejka URLBufory URL z priorytetem i deduplikacjąRedis, RabbitMQ, SQS
Crawler PoolRównocześnie fetches stronyAsyncio, gorotyny, wątki robocze
Proxy LayerProśby o trasach poprzez rotacyjne proxyBrama ProxyHat
ParserEkstrakty uporządkowanych danych z HTML / JSONBeautifulSoup, cheerio, goquery
PrzechowywanieDane ekstrahowane przez ciągnikiPostgreSQL, MongoB, S3
MonitorowanieŚlady zdrowia i wydajnościPrometeusz, logowanie, alarmy

Składnik 1: Scheduler

Harmonogram decyduje, które adresy URL do pełzania i kiedy. Zarządza częstotliwością pełzania, poziomami priorytetowymi i zapewnia, że żaden URL nie jest szorowany częściej niż jest to konieczne.

import redis
import json
import time
from datetime import datetime, timedelta
class CrawlScheduler:
    """Manages crawl schedules and feeds URLs to the queue."""
    def __init__(self, redis_url: str = "redis://localhost:6379"):
        self.redis = redis.from_url(redis_url)
        self.queue_key = "scraper:url_queue"
        self.schedule_key = "scraper:schedules"
        self.seen_key = "scraper:seen_urls"
    def add_schedule(self, name: str, urls: list[str], interval_hours: int, priority: int = 5):
        """Register a recurring crawl job."""
        self.redis.hset(self.schedule_key, name, json.dumps({
            "urls": urls,
            "interval_hours": interval_hours,
            "priority": priority,
            "last_run": None,
        }))
    def tick(self):
        """Check all schedules and enqueue URLs that are due."""
        schedules = self.redis.hgetall(self.schedule_key)
        enqueued = 0
        for name, data in schedules.items():
            schedule = json.loads(data)
            last_run = schedule.get("last_run")
            interval = timedelta(hours=schedule["interval_hours"])
            if last_run and datetime.fromisoformat(last_run) + interval > datetime.utcnow():
                continue
            for url in schedule["urls"]:
                self.enqueue(url, priority=schedule["priority"])
                enqueued += 1
            schedule["last_run"] = datetime.utcnow().isoformat()
            self.redis.hset(self.schedule_key, name, json.dumps(schedule))
        return enqueued
    def enqueue(self, url: str, priority: int = 5):
        """Add a URL to the crawl queue with deduplication."""
        # Skip if recently seen (within 1 hour)
        if self.redis.sismember(self.seen_key, url):
            return False
        self.redis.zadd(self.queue_key, {
            json.dumps({"url": url, "enqueued_at": time.time()}): priority
        })
        self.redis.sadd(self.seen_key, url)
        self.redis.expire(self.seen_key, 3600)  # 1-hour dedup window
        return True
    def dequeue(self, batch_size: int = 10) -> list[dict]:
        """Pull the highest-priority URLs from the queue."""
        items = self.redis.zpopmax(self.queue_key, batch_size)
        return [json.loads(item) for item, score in items]
    @property
    def queue_size(self) -> int:
        return self.redis.zcard(self.queue_key)
# Usage
scheduler = CrawlScheduler()
scheduler.add_schedule(
    name="product_prices",
    urls=[f"https://example.com/product/{i}" for i in range(1, 1001)],
    interval_hours=6,
    priority=8,
)
scheduler.add_schedule(
    name="competitor_pages",
    urls=["https://competitor.com/pricing", "https://competitor.com/features"],
    interval_hours=24,
    priority=5,
)
# Run every minute via cron
enqueued = scheduler.tick()
print(f"Enqueued {enqueued} URLs, queue size: {scheduler.queue_size}")

Komponent 2: kolejka URL

Kolejka oddziela planowanie od pełzania. Zapewnia pierwszeństwo zamawiania, backpressure i wytrwałość - więc żadne adresy URL nie są stracone, jeśli Crawler się rozbije.

// Node.js queue with Bull
const Queue = require('bull');
const crawlQueue = new Queue('crawl', 'redis://localhost:6379', {
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 2000 },
    removeOnComplete: 100,
    removeOnFail: 500,
  },
});
// Add URLs with priority (lower number = higher priority)
async function enqueueUrls(urls, priority = 5) {
  const jobs = urls.map(url =>
    crawlQueue.add(
      { url, enqueuedAt: Date.now() },
      { priority, jobId: url } // jobId for deduplication
    )
  );
  await Promise.all(jobs);
  console.log(`Enqueued ${urls.length} URLs`);
}
// Worker processes URLs
crawlQueue.process(10, async (job) => {
  const { url } = job.data;
  // Crawl logic here (see Crawler Pool below)
  return { url, status: 'completed' };
});
crawlQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed: ${err.message}`);
});

Składnik 3: Crawler Pool

Pula Crawler pobiera strony jednocześnie poprzez warstwa pośrednia proxyZarządza. Limity walutowe, obsługuje powtórki, i przekazuje surowe odpowiedzi do parsera.

import asyncio
import aiohttp
import uuid
import time
from typing import Optional
class CrawlerPool:
    """Concurrent crawler with proxy rotation and retry logic."""
    def __init__(
        self,
        concurrency: int = 30,
        max_retries: int = 3,
        timeout: int = 30,
    ):
        self.concurrency = concurrency
        self.max_retries = max_retries
        self.timeout = aiohttp.ClientTimeout(total=timeout)
        self.semaphore = asyncio.Semaphore(concurrency)
        self.stats = {"success": 0, "failed": 0, "retries": 0}
    def _get_proxy(self, country: Optional[str] = None) -> str:
        session_id = uuid.uuid4().hex[:8]
        username = f"USERNAME-session-{session_id}"
        if country:
            username += f"-country-{country}"
        return f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
    async def fetch(
        self,
        session: aiohttp.ClientSession,
        url: str,
        country: Optional[str] = None,
    ) -> dict:
        async with self.semaphore:
            for attempt in range(self.max_retries + 1):
                proxy = self._get_proxy(country)
                start = time.time()
                try:
                    async with session.get(
                        url, proxy=proxy, timeout=self.timeout,
                        headers={
                            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                                          "AppleWebKit/537.36 Chrome/131.0.0.0 Safari/537.36",
                            "Accept": "text/html,application/xhtml+xml,*/*;q=0.8",
                        },
                    ) as response:
                        body = await response.text()
                        latency = time.time() - start
                        if response.status in (403, 429, 503) and attempt < self.max_retries:
                            self.stats["retries"] += 1
                            await asyncio.sleep(2 ** attempt)
                            continue
                        if response.status < 400:
                            self.stats["success"] += 1
                        else:
                            self.stats["failed"] += 1
                        return {
                            "url": url,
                            "status": response.status,
                            "body": body,
                            "latency": latency,
                            "success": response.status < 400,
                        }
                except Exception as e:
                    if attempt < self.max_retries:
                        self.stats["retries"] += 1
                        await asyncio.sleep(2 ** attempt)
                        continue
                    self.stats["failed"] += 1
                    return {
                        "url": url,
                        "error": str(e),
                        "latency": time.time() - start,
                        "success": False,
                    }
    async def crawl(self, urls: list[str], country: Optional[str] = None) -> list[dict]:
        async with aiohttp.ClientSession() as session:
            tasks = [self.fetch(session, url, country) for url in urls]
            return await asyncio.gather(*tasks)
# Usage
crawler = CrawlerPool(concurrency=30, max_retries=3)
urls = [f"https://example.com/product/{i}" for i in range(500)]
results = asyncio.run(crawler.crawl(urls, country="us"))
print(f"Success: {crawler.stats['success']}, Failed: {crawler.stats['failed']}")

Komponent 4: Parser

Parser przekształca surowy HTML w uporządkowane dane. Utrzymuj logikę oddzielną od czołgania się - to sprawia, że testable, wielokrotnego użytku i łatwe do aktualizacji, gdy miejsca docelowe się zmieniają.

from dataclasses import dataclass
from typing import Optional
from bs4 import BeautifulSoup
import json
@dataclass
class ProductData:
    url: str
    title: Optional[str] = None
    price: Optional[float] = None
    currency: Optional[str] = None
    availability: Optional[str] = None
    rating: Optional[float] = None
    review_count: Optional[int] = None
    parsed_at: Optional[str] = None
class ProductParser:
    """Extracts structured product data from HTML."""
    def parse(self, url: str, html: str) -> ProductData:
        soup = BeautifulSoup(html, "html.parser")
        data = ProductData(url=url)
        try:
            data.title = self._extract_text(soup, "h1.product-title")
            data.price = self._extract_price(soup)
            data.currency = self._extract_currency(soup)
            data.availability = self._extract_text(soup, ".availability-status")
            data.rating = self._extract_rating(soup)
            data.review_count = self._extract_review_count(soup)
            data.parsed_at = time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
        except Exception as e:
            logger.error(f"Parse error for {url}: {e}")
        return data
    def _extract_text(self, soup, selector: str) -> Optional[str]:
        el = soup.select_one(selector)
        return el.get_text(strip=True) if el else None
    def _extract_price(self, soup) -> Optional[float]:
        el = soup.select_one("[data-price], .price, .product-price")
        if not el:
            return None
        price_text = el.get("data-price") or el.get_text(strip=True)
        # Remove currency symbols and parse
        cleaned = "".join(c for c in price_text if c.isdigit() or c == ".")
        return float(cleaned) if cleaned else None
    def _extract_currency(self, soup) -> Optional[str]:
        el = soup.select_one("[data-currency], .currency")
        return el.get("data-currency") or el.get_text(strip=True) if el else None
    def _extract_rating(self, soup) -> Optional[float]:
        el = soup.select_one("[data-rating], .rating-value")
        if el:
            val = el.get("data-rating") or el.get_text(strip=True)
            try:
                return float(val)
            except ValueError:
                return None
        return None
    def _extract_review_count(self, soup) -> Optional[int]:
        el = soup.select_one(".review-count, [data-reviews]")
        if el:
            text = el.get("data-reviews") or el.get_text(strip=True)
            digits = "".join(c for c in text if c.isdigit())
            return int(digits) if digits else None
        return None
# Usage
parser = ProductParser()
for result in results:
    if result["success"]:
        product = parser.parse(result["url"], result["body"])
        print(f"{product.title}: ${product.price}")

Komponent 5: Przechowywanie

Persist parsował dane z wersją i dedukcją. Używaj gorset do radzenia sobie z pełzaniem z wdziękiem.

import psycopg2
from psycopg2.extras import execute_values
from datetime import datetime
class DataStore:
    """Persists scraped data with upsert and versioning."""
    def __init__(self, dsn: str):
        self.conn = psycopg2.connect(dsn)
        self._ensure_tables()
    def _ensure_tables(self):
        with self.conn.cursor() as cur:
            cur.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    url TEXT PRIMARY KEY,
                    title TEXT,
                    price NUMERIC(10,2),
                    currency VARCHAR(3),
                    availability TEXT,
                    rating NUMERIC(3,2),
                    review_count INTEGER,
                    first_seen_at TIMESTAMPTZ DEFAULT NOW(),
                    last_updated_at TIMESTAMPTZ DEFAULT NOW()
                );
                CREATE TABLE IF NOT EXISTS price_history (
                    id SERIAL PRIMARY KEY,
                    url TEXT REFERENCES products(url),
                    price NUMERIC(10,2),
                    recorded_at TIMESTAMPTZ DEFAULT NOW()
                );
            """)
            self.conn.commit()
    def upsert_products(self, products: list[ProductData]):
        """Insert or update products, recording price changes."""
        with self.conn.cursor() as cur:
            for product in products:
                if product.price is None:
                    continue
                # Check if price changed
                cur.execute(
                    "SELECT price FROM products WHERE url = %s",
                    (product.url,)
                )
                row = cur.fetchone()
                # Upsert product
                cur.execute("""
                    INSERT INTO products (url, title, price, currency, availability,
                                          rating, review_count, last_updated_at)
                    VALUES (%s, %s, %s, %s, %s, %s, %s, NOW())
                    ON CONFLICT (url) DO UPDATE SET
                        title = EXCLUDED.title,
                        price = EXCLUDED.price,
                        currency = EXCLUDED.currency,
                        availability = EXCLUDED.availability,
                        rating = EXCLUDED.rating,
                        review_count = EXCLUDED.review_count,
                        last_updated_at = NOW()
                """, (
                    product.url, product.title, product.price,
                    product.currency, product.availability,
                    product.rating, product.review_count,
                ))
                # Record price history if changed
                if row is None or float(row[0]) != product.price:
                    cur.execute(
                        "INSERT INTO price_history (url, price) VALUES (%s, %s)",
                        (product.url, product.price)
                    )
            self.conn.commit()
    @property
    def product_count(self) -> int:
        with self.conn.cursor() as cur:
            cur.execute("SELECT COUNT(*) FROM products")
            return cur.fetchone()[0]
# Usage
store = DataStore("postgresql://user:pass@localhost:5432/scraper")
store.upsert_products(parsed_products)
print(f"Total products: {store.product_count}")

Komponent 6: Monitorowanie

Związać wszystkie komponenty ze scentralizowanym monitorowaniem. Szczegółowe schematy monitorowania proksy- specyficzne, zobacz nasz Monitorowanie wydajności proxy Przewodnik.

import time
import json
import logging
from datetime import datetime
class PipelineMonitor:
    """Monitors the entire scraping pipeline."""
    def __init__(self):
        self.logger = logging.getLogger("pipeline")
        self.stage_times = {}
        self.stage_counts = {}
    def start_stage(self, stage: str):
        self.stage_times[stage] = time.time()
    def end_stage(self, stage: str, item_count: int = 0):
        elapsed = time.time() - self.stage_times.get(stage, time.time())
        self.stage_counts[stage] = item_count
        self.logger.info(json.dumps({
            "timestamp": datetime.utcnow().isoformat(),
            "stage": stage,
            "duration_seconds": round(elapsed, 2),
            "items_processed": item_count,
            "items_per_second": round(item_count / elapsed, 1) if elapsed > 0 else 0,
        }))
    def report(self) -> dict:
        return {
            "stages": {
                stage: {
                    "items": self.stage_counts.get(stage, 0),
                }
                for stage in self.stage_counts
            },
            "timestamp": datetime.utcnow().isoformat(),
        }
# Usage within the full pipeline
monitor = PipelineMonitor()
# Stage 1: Schedule
monitor.start_stage("schedule")
urls = scheduler.dequeue(batch_size=500)
monitor.end_stage("schedule", len(urls))
# Stage 2: Crawl
monitor.start_stage("crawl")
results = asyncio.run(crawler.crawl([u["url"] for u in urls]))
monitor.end_stage("crawl", len(results))
# Stage 3: Parse
monitor.start_stage("parse")
products = [parser.parse(r["url"], r["body"]) for r in results if r["success"]]
monitor.end_stage("parse", len(products))
# Stage 4: Store
monitor.start_stage("store")
store.upsert_products(products)
monitor.end_stage("store", len(products))
print(json.dumps(monitor.report(), indent=2))

Wszystko razem

Oto kompletny rurociąg, który łączy wszystkie sześć komponentów w jeden system skrawania.

import asyncio
import logging
logging.basicConfig(level=logging.INFO)
async def run_pipeline():
    # Initialize components
    scheduler = CrawlScheduler()
    crawler = CrawlerPool(concurrency=30, max_retries=3)
    parser = ProductParser()
    store = DataStore("postgresql://user:pass@localhost:5432/scraper")
    monitor = PipelineMonitor()
    # Schedule crawls
    scheduler.add_schedule(
        name="daily_products",
        urls=[f"https://example.com/product/{i}" for i in range(1, 501)],
        interval_hours=24,
        priority=8,
    )
    # Main loop
    while True:
        # 1. Check schedules and enqueue URLs
        monitor.start_stage("schedule")
        scheduler.tick()
        batch = scheduler.dequeue(batch_size=100)
        monitor.end_stage("schedule", len(batch))
        if not batch:
            await asyncio.sleep(60)
            continue
        # 2. Crawl
        monitor.start_stage("crawl")
        urls = [item["url"] for item in batch]
        results = await crawler.crawl(urls, country="us")
        successful = [r for r in results if r.get("success")]
        monitor.end_stage("crawl", len(successful))
        # 3. Parse
        monitor.start_stage("parse")
        products = []
        for result in successful:
            try:
                product = parser.parse(result["url"], result["body"])
                products.append(product)
            except Exception as e:
                logging.error(f"Parse error: {e}")
        monitor.end_stage("parse", len(products))
        # 4. Store
        monitor.start_stage("store")
        store.upsert_products(products)
        monitor.end_stage("store", len(products))
        # 5. Report
        logging.info(f"Pipeline: {monitor.report()}")
        logging.info(f"Crawler stats: {crawler.stats}")
        # Wait before next batch
        await asyncio.sleep(5)
asyncio.run(run_pipeline())

Wzory wdrażania produkcji

Poziome skalowanie

Scale crawlers niezależnie od harmonogramów i parserów. Uruchom wiele przypadków Crawler pochłaniających z tej samej kolejki Redis.

SkładnikStrategia skalowaniaTypowa skala
SchedulerPojedyncza instancja (wybory liderów)1 przypadek
Kolejka URLKlaster Redis lub zarządzana kolejka1 klaster
Crawler PoolPoziome skalowanie pokosu2-20 instancji
Proxy LayerZarządzany przez ProxyHatN / A (zewnętrzne)
ParserKolokacja z pełzaniem lub oddzielnym1: 1 z pełzaczami
PrzechowywanieReplikacja bazy danych1 podstawowe + repliki
MonitorowanieScentralizowana agregacja1 przypadek

Błąd odzyskiwania

  • Crawler crash: URL pozostają w kolejce Redis. Nowy Crawler odbiera je automatycznie.
  • Niesprawność proxy: W warstwa middleware ponownie próbuje ze świeżymi IP. Utrzymane awarie uruchamiają alarmy.
  • Awarie parserów: Surowy HTML jest przechowywany w kolejce liter do ręcznej kontroli i aktualizacji parsera.
  • Niepowodzenie bazy danych: Przekazywane bufory danych w pamięci / dysku z logowaniem przed zapisem do czasu odzyskania DB.

Kontrole jakości danych

class DataQualityChecker:
    """Validates parsed data before storage."""
    def check(self, product: ProductData) -> list[str]:
        issues = []
        if not product.title:
            issues.append("missing_title")
        if product.price is not None and product.price <= 0:
            issues.append("invalid_price")
        if product.price is not None and product.price > 100000:
            issues.append("suspicious_price")
        if product.rating is not None and (product.rating < 0 or product.rating > 5):
            issues.append("invalid_rating")
        return issues
    def filter_valid(self, products: list[ProductData]) -> list[ProductData]:
        valid = []
        for product in products:
            issues = self.check(product)
            if issues:
                logger.warning(f"Data quality issues for {product.url}: {issues}")
            else:
                valid.append(product)
        return valid
# Integrate into pipeline
checker = DataQualityChecker()
valid_products = checker.filter_valid(products)
store.upsert_products(valid_products)
Najlepszą architekturą scrating jest jeden można rozumować. Każdy składnik powinien odpowiedzieć na trzy pytania: Co to robi? Jak to może się nie udać? Jak się odzyskuje?

Dla komponentu warstwy pośredniczącej patrz: Building a Proxy Middleware LayerDo optymalizacji przepustowości pełzania, czytaj Skalowanie żądań proxy za pomocą kontroli konkursowej. For browser- based crawling, see Rotacyjne efekty z PlaywrightDla strategii antydetekcji, odkryj jak skrobać bez blokowania.

Zacznij od Python SDK, Węzeł SDKlub Go SDK dla integracji proxy. Poznaj Ceny proksyHat oraz dokumentacja by zasilić twoją rzeźbiącą architekturę.

Często zadawane pytania

Gotowy, aby zacząć?

Dostęp do ponad 50 mln rezydencjalnych IP w ponad 148 krajach z filtrowaniem AI.

Zobacz cenyProxy rezydencjalne
← Powrót do Bloga