Costruzione Infrastrutture di monitoraggio dei prezzi in tempo reale

Progettare e costruire un sistema di monitoraggio dei prezzi in tempo reale con code prioritarie, pool di lavoratori, rilevamento dei cambiamenti e rotazione dei proxy residenziali. Guida completa di implementazione Python e Node.js.

Costruzione Infrastrutture di monitoraggio dei prezzi in tempo reale

Monitoraggio dei prezzi in tempo reale e batch

La maggior parte dei sistemi di monitoraggio dei prezzi operano in modalità batch: controlla tutti i prodotti ogni ora (o ogni poche ore), memorizza i risultati e invia avvisi sulle modifiche. Questo funziona per molti casi di utilizzo, ma nei mercati in rapida evoluzione — vendite flash, prezzi dinamici, concorrenza di mercato — monitoraggio batch manca i cambiamenti di prezzo critici che avvengono tra i controlli.

Il monitoraggio dei prezzi in tempo reale riduce il ritardo di rilevamento da ore a minuti o anche secondi. Invece di controllare ogni prodotto su un programma fisso, un sistema in tempo reale monitora continuamente gli obiettivi di alta priorità e reagisce ai cambiamenti che si verificano. Questa guida copre l'architettura, l'infrastruttura proxy e i dettagli di implementazione necessari per costruire un sistema di monitoraggio in tempo reale. Per i concetti fondamentali di monitoraggio dei prezzi, vedere la nostra guida monitoraggio dei prezzi dei concorrenti automaticamente.

AspettiMonitoraggio di BatchMonitoraggio in tempo reale
Controllare la frequenzaOgni 1-24 oreOgni 1-5 minuti per oggetti prioritari
Lag di rilevamentoFino a un intervallo completoMeno di 5 minuti
Uso del proxyColpi concentratiCorrente, flusso distribuito
InfrastruttureLavori sempliciOrganizzato con piscine di lavoro
CostoIn bassoPiù alto (più richieste, più proxy)
Il meglio perReport giornalieri, analisi della tendenzaRimborso, rilevamento di vendita flash, offerta competitiva

Architettura per il monitoraggio in tempo reale

Un sistema di monitoraggio dei prezzi in tempo reale ha cinque componenti principali che lavorano insieme come pipeline continua.

1. Priorità

I prodotti sono assegnati livelli prioritari che determinano la frequenza di controllo. Una coda prioritaria (Redis Sorted Sets funziona bene) assicura che i prodotti ad alto valore siano sempre controllati prima.

import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
    """Add a product to the monitoring queue."""
    next_check = time.time()  # Check immediately on first add
    r.zadd("price_queue", {json.dumps({
        "product_id": product_id,
        "url": url,
        "interval": priority_minutes * 60,
    }): next_check})
def get_next_batch(batch_size=10):
    """Get the next batch of products due for checking."""
    now = time.time()
    items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
    products = []
    for item in items:
        data = json.loads(item)
        r.zadd("price_queue", {item: now + data["interval"]})
        products.append(data)
    return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)

2. Piscina di lavoro

Processi di lavoro multipli tirano dalla coda prioritaria, mettono i prezzi attraverso i proxy e spingono i risultati al data pipeline. I lavoratori operano in modo indipendente, ciascuno con il proprio collegamento proxy.

import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
    """Fetch the current price for a product."""
    headers = {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
    }
    proxies = {"http": PROXY_URL, "https": PROXY_URL}
    try:
        response = requests.get(
            product["url"], headers=headers,
            proxies=proxies, timeout=30
        )
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, "html.parser")
            price_el = soup.select_one("span.a-price-whole")
            if price_el:
                price = float(price_el.get_text(strip=True).replace(",", ""))
                return {
                    "product_id": product["product_id"],
                    "price": price,
                    "timestamp": time.time(),
                    "status": "success",
                }
    except Exception as e:
        pass
    return {
        "product_id": product["product_id"],
        "price": None,
        "timestamp": time.time(),
        "status": "failed",
    }
def run_workers(num_workers=10):
    """Run the monitoring worker pool."""
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        while True:
            batch = get_next_batch(batch_size=num_workers)
            if not batch:
                time.sleep(1)
                continue
            futures = [executor.submit(fetch_price, product) for product in batch]
            for future in futures:
                result = future.result()
                process_result(result)
            time.sleep(random.uniform(0.5, 2))

3. Cambia motore di rilevamento

Invece di memorizzare ogni controllo dei prezzi, il motore di rilevamento dei cambiamenti confronta i prezzi correnti contro gli ultimi valori conosciuti e innesca solo eventi sui cambiamenti effettivi.

class ChangeDetector:
    def __init__(self, redis_client):
        self.redis = redis_client
    def check_change(self, product_id, new_price):
        """Compare new price against last known and detect changes."""
        key = f"last_price:{product_id}"
        last_data = self.redis.get(key)
        if last_data:
            last = json.loads(last_data)
            old_price = last["price"]
            if old_price and new_price and old_price != new_price:
                change_pct = ((new_price - old_price) / old_price) * 100
                event = {
                    "product_id": product_id,
                    "old_price": old_price,
                    "new_price": new_price,
                    "change_pct": round(change_pct, 2),
                    "timestamp": time.time(),
                }
                # Publish change event
                self.redis.publish("price_changes", json.dumps(event))
                return event
        # Update last known price
        self.redis.set(key, json.dumps({
            "price": new_price,
            "timestamp": time.time(),
        }))
        return None

4. Stream eventi

I cambiamenti di prezzo sono pubblicati su un canale Redis Pub/Sub (o argomento Kafka per sistemi più grandi). I consumatori a valle — servizi di avviso, motori di ripetizione, dashboard — abbonarsi a questi eventi e reagire in modo indipendente.

import redis
import json
def subscribe_to_changes():
    """Subscribe to price change events."""
    r = redis.Redis(host="localhost", port=6379)
    pubsub = r.pubsub()
    pubsub.subscribe("price_changes")
    for message in pubsub.listen():
        if message["type"] == "message":
            event = json.loads(message["data"])
            handle_price_change(event)
def handle_price_change(event):
    """Process a price change event."""
    change = event["change_pct"]
    product = event["product_id"]
    if change < -10:
        send_urgent_alert(event)  # Major price drop
    elif change < -5:
        send_alert(event)         # Moderate drop
    elif change > 10:
        send_alert(event)         # Significant increase
    # Always log to time-series database
    store_price_change(event)

5. Dashboard e avvisi

I dati in tempo reale hanno bisogno di una visualizzazione in tempo reale. Utilizzare connessioni WebSocket per spingere gli aggiornamenti dei prezzi alle dashboard istantaneamente.

Node.js Attuazione

Una versione Node.js del motore di monitoraggio in tempo reale utilizzando SDK del nodo di ProxyHat.

const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
  constructor(concurrency = 10) {
    this.concurrency = concurrency;
    this.running = false;
    this.agent = new HttpsProxyAgent(PROXY_URL);
  }
  async fetchPrice(product) {
    try {
      const { data } = await axios.get(product.url, {
        httpsAgent: new HttpsProxyAgent(PROXY_URL),
        headers: {
          "User-Agent":
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
          "Accept-Language": "en-US,en;q=0.9",
        },
        timeout: 30000,
      });
      const $ = cheerio.load(data);
      const priceText = $("span.a-price-whole").first().text().trim();
      const price = parseFloat(priceText.replace(/,/g, "")) || null;
      return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
    } catch (err) {
      return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
    }
  }
  async checkChange(productId, newPrice) {
    const key = `last_price:${productId}`;
    const lastData = await redis.get(key);
    if (lastData) {
      const last = JSON.parse(lastData);
      if (last.price && newPrice && last.price !== newPrice) {
        const changePct = ((newPrice - last.price) / last.price) * 100;
        const event = {
          productId,
          oldPrice: last.price,
          newPrice,
          changePct: Math.round(changePct * 100) / 100,
          timestamp: Date.now(),
        };
        await redis.publish("price_changes", JSON.stringify(event));
        return event;
      }
    }
    await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
    return null;
  }
  async processProduct(product) {
    const result = await this.fetchPrice(product);
    if (result.price) {
      const change = await this.checkChange(result.productId, result.price);
      if (change) {
        console.log(
          `Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
        );
      }
    }
    // Random delay
    await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
  }
  async start() {
    this.running = true;
    console.log(`Starting monitor with ${this.concurrency} workers`);
    while (this.running) {
      const batch = await this.getNextBatch(this.concurrency);
      if (batch.length === 0) {
        await new Promise((r) => setTimeout(r, 1000));
        continue;
      }
      await Promise.all(batch.map((p) => this.processProduct(p)));
    }
  }
  async getNextBatch(size) {
    const now = Date.now() / 1000;
    const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
    const products = [];
    for (const item of items) {
      const data = JSON.parse(item);
      await redis.zadd("price_queue", now + data.interval, item);
      products.push(data);
    }
    return products;
  }
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();

Gestione del proxy per il monitoraggio continuo

Il monitoraggio in tempo reale pone richieste uniche sulla vostra infrastruttura proxy rispetto alla demolizione batch.

Modello di richiesta stabile

A differenza del raschio di lotto che invia esplosioni di richieste, il monitoraggio in tempo reale crea un flusso costante. Questo è effettivamente migliore per la salute del proxy — un flusso costante di 5-10 richieste al secondo sembra più naturale di 1.000 richieste in un esplosione di 2 minuti.

Configurazione ProxyHat per il tempo reale

# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080

Monitoraggio della salute IP

Traccia i tassi di successo per il sito di destinazione e regola il tuo approccio dinamicamente. Se i tassi di successo scendono su un mercato specifico, aumentano i ritardi o passano a un diverso pool geo-targeted. La grande piscina residenziale di ProxyHat garantisce sempre la disponibilità di IP freschi. Controlla il nostro posizioni proxy per la copertura completa.

Asporto chiave: il monitoraggio in tempo reale richiede una strategia proxy costante e sostenibile. L'obiettivo è coerente richieste a basso volume su molti IP, non esplosioni ad alto volume da pochi IP.

Conservazione dei dati per i dati in tempo reale

I dati dei prezzi in tempo reale necessitano di una soluzione di archiviazione ottimizzata per gli inserti ad alta frequenza e le query a fascia di tempo.

TimescaleDB Schema

-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
    time        TIMESTAMPTZ NOT NULL,
    product_id  TEXT NOT NULL,
    price       DECIMAL(10,2),
    currency    VARCHAR(3) DEFAULT 'USD',
    source_url  TEXT,
    status      VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS hour,
    product_id,
    AVG(price) AS avg_price,
    MIN(price) AS min_price,
    MAX(price) AS max_price,
    COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');

Considerazioni di scala

  • Ridimensionamento del lavoratore orizzontale: Aggiungere i lavoratori attraverso più macchine, ciascuno tirando dalla stessa coda Redis. Nessun coordinamento necessario — la coda gestisce la distribuzione.
  • Ripartizione prioritaria: Quando il budget proxy è limitato, ridurre automaticamente la frequenza di controllo per i prodotti a bassa priorità, mantenendo la copertura in tempo reale per gli elementi critici.
  • Intervalli adattivi: Se il prezzo di un prodotto è stabile per 24 ore, aumenta l'intervallo di controllo. Se è cambiato due volte in un'ora, diminuirlo.
  • Convalutazione site-specific: Diversi siti di destinazione hanno tolleranze diverse. Eseguire lavoratori più contemporaneamente per Shopify (più permissivi) e meno per Amazon (rilevamento più aggressivo).

Per ulteriori informazioni sulle strategie proxy che supportano il monitoraggio ad alta frequenza, esplorare la nostra guida su migliori proxy per la raschiatura web e Piani di prezzi di ProxyHat per uso ad alto volume.

Asporto chiave

  • Il monitoraggio in tempo reale riduce il rilevamento dei cambiamenti di prezzo da ore a minuti, critico per la ripetizione e la risposta competitiva.
  • Utilizzare una coda prioritaria per concentrare le risorse sui prodotti ad alto valore, coprendo ancora la coda lunga.
  • Un pool di lavoratori con connessioni proxy concorrenti fornisce throughput senza schemi di scoppio.
  • Cambiare il rumore del filtro dei motori di rilevamento — solo il processo e l'allarme sui cambiamenti di prezzo reali.
  • Memorizza i dati grezzi in un database di serie temporali (TimescaleDB) con le politiche di conservazione per la gestione dei costi.
  • I proxy residenziali con rotazione costante dello stato sono essenziali per il monitoraggio continuo. Inizia con ProxyHat per un accesso affidabile.

Costruzione di infrastrutture in tempo reale? Leggi il nostro e-commerce scraping guida per la strategia completa e controllare le nostre guide usando i proxy in Python e Node.js per dettagli di implementazione.

Pronto per iniziare?

Accedi a oltre 50M di IP residenziali in oltre 148 paesi con filtraggio AI.

Vedi i prezziProxy residenziali
← Torna al Blog