Infrastruktura monitorowania cen nieruchomości - czas

Projektowanie i budowa systemu monitorowania cen w czasie rzeczywistym z kolejkami priorytetowymi, pulami pracowników, wykrywaniem zmian oraz rotacją proxy mieszkaniowej. Kompletny przewodnik implementacji Pythona i Node.js.

Infrastruktura monitorowania cen nieruchomości - czas

Real- Time vs.

Większość systemów monitorowania cen działa w trybie wsadowym: sprawdzać wszystkie produkty co godzinę (lub co kilka godzin), przechowywać wyniki i wysyłać powiadomienia o zmianach. Działa to w wielu przypadkach zastosowania, ale na szybko zmieniających się rynkach - sprzedaży flash, dynamicznych cen, konkurencji rynkowej - monitorowanie partii pomija krytyczne zmiany cen, które zdarzają się między kontrolami.

Monitorowanie cen w czasie rzeczywistym zmniejsza opóźnienie wykrywania z godzin na minuty lub nawet sekundy. Zamiast sprawdzać każdy produkt w ustalonym harmonogramie, system real- time stale monitoruje cele o wysokim priorytecie i reaguje na zmiany w ich trakcie. Niniejszy przewodnik obejmuje architekturę, infrastrukturę proxy oraz szczegóły wdrażania niezbędne do budowy systemu monitorowania w czasie rzeczywistym. Podstawowe koncepcje monitorowania cen znajdują się w naszym przewodniku Automatyczne monitorowanie cen konkurencyjnych.

AspektMonitorowanie partiiMonitorowanie czasu rzeczywistego
Częstotliwość kontroliCo 1- 24 godzinyCo 1- 5 minut dla elementów priorytetowych
Opóźnienie wykrywaniaDo jednego pełnego odstępu czasuNiecałe 5 minut
Wykorzystanie proxySkoncentrowane pęknięciaStrumień stały, rozproszony
InfrastrukturaProste zadania cronEvent- driven z basenów pracowników
KosztNiższeWyższe (więcej wniosków, więcej proxy)
Najlepsze dlaDzienne raporty, analiza tendencjiPoprawianie, wykrywanie sprzedaży flash, przetargi konkurencyjne

Architektura monitorowania rzeczywistego czasu

System monitorowania cen w czasie rzeczywistym posiada pięć podstawowych elementów, które współpracują jako ciągły rurociąg.

1. Kolejka priorytetowa

Produkty są przypisane do poziomów priorytetowych, które określają częstotliwość kontroli. Kolejka priorytetowa (Redis Sorted Sets działa dobrze) zapewnia, że produkty o wysokiej wartości są zawsze sprawdzane najpierw.

import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
    """Add a product to the monitoring queue."""
    next_check = time.time()  # Check immediately on first add
    r.zadd("price_queue", {json.dumps({
        "product_id": product_id,
        "url": url,
        "interval": priority_minutes * 60,
    }): next_check})
def get_next_batch(batch_size=10):
    """Get the next batch of products due for checking."""
    now = time.time()
    items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
    products = []
    for item in items:
        data = json.loads(item)
        r.zadd("price_queue", {item: now + data["interval"]})
        products.append(data)
    return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)

2. Pool robotniczy

Wiele procesów pracy wyciągnąć z kolejki priorytetowej, pobrać ceny poprzez proxy, i przenieść wyniki do rurociągu danych. Pracownicy działają niezależnie, każdy z własnym połączeniem proxy.

import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
    """Fetch the current price for a product."""
    headers = {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
    }
    proxies = {"http": PROXY_URL, "https": PROXY_URL}
    try:
        response = requests.get(
            product["url"], headers=headers,
            proxies=proxies, timeout=30
        )
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, "html.parser")
            price_el = soup.select_one("span.a-price-whole")
            if price_el:
                price = float(price_el.get_text(strip=True).replace(",", ""))
                return {
                    "product_id": product["product_id"],
                    "price": price,
                    "timestamp": time.time(),
                    "status": "success",
                }
    except Exception as e:
        pass
    return {
        "product_id": product["product_id"],
        "price": None,
        "timestamp": time.time(),
        "status": "failed",
    }
def run_workers(num_workers=10):
    """Run the monitoring worker pool."""
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        while True:
            batch = get_next_batch(batch_size=num_workers)
            if not batch:
                time.sleep(1)
                continue
            futures = [executor.submit(fetch_price, product) for product in batch]
            for future in futures:
                result = future.result()
                process_result(result)
            time.sleep(random.uniform(0.5, 2))

3. Zmień silnik detekcji

Zamiast zapisywać każdą kontrolę cen, silnik wykrywania zmian porównuje ceny bieżące z ostatnimi znanymi wartościami i uruchamia tylko zdarzenia dotyczące rzeczywistych zmian.

class ChangeDetector:
    def __init__(self, redis_client):
        self.redis = redis_client
    def check_change(self, product_id, new_price):
        """Compare new price against last known and detect changes."""
        key = f"last_price:{product_id}"
        last_data = self.redis.get(key)
        if last_data:
            last = json.loads(last_data)
            old_price = last["price"]
            if old_price and new_price and old_price != new_price:
                change_pct = ((new_price - old_price) / old_price) * 100
                event = {
                    "product_id": product_id,
                    "old_price": old_price,
                    "new_price": new_price,
                    "change_pct": round(change_pct, 2),
                    "timestamp": time.time(),
                }
                # Publish change event
                self.redis.publish("price_changes", json.dumps(event))
                return event
        # Update last known price
        self.redis.set(key, json.dumps({
            "price": new_price,
            "timestamp": time.time(),
        }))
        return None

4. Strumień zdarzeń

Zmiany cen są publikowane na kanale Redis Pub / Sub (lub temat Kafka dla większych systemów). Konsumenci niższego szczebla - ostrzegający o usługach, zmieniający ceny silników, deski rozdzielcze - subskrybują te wydarzenia i reagują niezależnie.

import redis
import json
def subscribe_to_changes():
    """Subscribe to price change events."""
    r = redis.Redis(host="localhost", port=6379)
    pubsub = r.pubsub()
    pubsub.subscribe("price_changes")
    for message in pubsub.listen():
        if message["type"] == "message":
            event = json.loads(message["data"])
            handle_price_change(event)
def handle_price_change(event):
    """Process a price change event."""
    change = event["change_pct"]
    product = event["product_id"]
    if change < -10:
        send_urgent_alert(event)  # Major price drop
    elif change < -5:
        send_alert(event)         # Moderate drop
    elif change > 10:
        send_alert(event)         # Significant increase
    # Always log to time-series database
    store_price_change(event)

5. Deska rozdzielcza i alarmy

Dane w czasie rzeczywistym wymagają wizualizacji w czasie rzeczywistym. Użyj połączeń WebSocket, aby natychmiast wcisnąć aktualizacje cen do desek rozdzielczych.

Wdrażanie Node.js

Wersja Node.js monitorującego w czasie rzeczywistym silnika Węzeł ProxyHat SDK.

const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
  constructor(concurrency = 10) {
    this.concurrency = concurrency;
    this.running = false;
    this.agent = new HttpsProxyAgent(PROXY_URL);
  }
  async fetchPrice(product) {
    try {
      const { data } = await axios.get(product.url, {
        httpsAgent: new HttpsProxyAgent(PROXY_URL),
        headers: {
          "User-Agent":
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
          "Accept-Language": "en-US,en;q=0.9",
        },
        timeout: 30000,
      });
      const $ = cheerio.load(data);
      const priceText = $("span.a-price-whole").first().text().trim();
      const price = parseFloat(priceText.replace(/,/g, "")) || null;
      return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
    } catch (err) {
      return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
    }
  }
  async checkChange(productId, newPrice) {
    const key = `last_price:${productId}`;
    const lastData = await redis.get(key);
    if (lastData) {
      const last = JSON.parse(lastData);
      if (last.price && newPrice && last.price !== newPrice) {
        const changePct = ((newPrice - last.price) / last.price) * 100;
        const event = {
          productId,
          oldPrice: last.price,
          newPrice,
          changePct: Math.round(changePct * 100) / 100,
          timestamp: Date.now(),
        };
        await redis.publish("price_changes", JSON.stringify(event));
        return event;
      }
    }
    await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
    return null;
  }
  async processProduct(product) {
    const result = await this.fetchPrice(product);
    if (result.price) {
      const change = await this.checkChange(result.productId, result.price);
      if (change) {
        console.log(
          `Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
        );
      }
    }
    // Random delay
    await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
  }
  async start() {
    this.running = true;
    console.log(`Starting monitor with ${this.concurrency} workers`);
    while (this.running) {
      const batch = await this.getNextBatch(this.concurrency);
      if (batch.length === 0) {
        await new Promise((r) => setTimeout(r, 1000));
        continue;
      }
      await Promise.all(batch.map((p) => this.processProduct(p)));
    }
  }
  async getNextBatch(size) {
    const now = Date.now() / 1000;
    const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
    const products = [];
    for (const item of items) {
      const data = JSON.parse(item);
      await redis.zadd("price_queue", now + data.interval, item);
      products.push(data);
    }
    return products;
  }
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();

Zarządzanie proxy dla ciągłego monitorowania

Real- time monitoring stawia unikalne wymagania na Twojej infrastrukturze proxy w porównaniu do drapania partii.

Wzór wniosku Steady- State Request

W przeciwieństwie do zeskrobywania partii, które wysyła wybuchy żądań, monitorowanie czasu rzeczywistego tworzy stały strumień. Jest to rzeczywiście lepsze dla zdrowia proxy - stały przepływ 5- 10 wniosków na sekundę wygląda bardziej naturalne niż 1000 wniosków w 2-minutowym wybuchu.

Konfiguracja ProxyHat dla czasu rzeczywistego

# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080

Monitorowanie zdrowia IP

Track wskaźnik sukcesu na miejscu docelowym i dostosować podejście dynamicznie. Jeśli wskaźniki sukcesu spadną na określonym rynku, zwiększą się opóźnienia lub przejdą do innej puli geodocelowej. Duży basen mieszkalny ProxyHat zapewnia zawsze świeże IP dostępne. Sprawdź nasze lokalizacje proxy dla pełnego pokrycia.

Kluczowe podejście: Monitorowanie czasu rzeczywistego wymaga stałej, zrównoważonej strategii proxy. Celem jest konsekwentny niski wolumen wniosków w wielu IP, a nie wysoki wolumen pęka z kilku IP.

Przechowywanie danych dla danych real- time

Dane ceny real- time potrzebują rozwiązania pamięci masowej zoptymalizowanego dla wkładek wysokiej częstotliwości i zapytań o zakres czasowy.

Schemat czasowy DB

-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
    time        TIMESTAMPTZ NOT NULL,
    product_id  TEXT NOT NULL,
    price       DECIMAL(10,2),
    currency    VARCHAR(3) DEFAULT 'USD',
    source_url  TEXT,
    status      VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS hour,
    product_id,
    AVG(price) AS avg_price,
    MIN(price) AS min_price,
    MAX(price) AS max_price,
    COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');

Rozważania skalujące

  • Poziome skalowanie pracowników: Dodawanie pracowników do wielu maszyn, każde ciągnięcie z tej samej kolejki Redis. Brak koordynacji - kolejka obsługuje dystrybucję.
  • Tłoczenie oparte na priorytetach: Gdy budżet proxy jest ograniczony, automatycznie zmniejszyć częstotliwość kontroli dla produktów o niskim priorytecie, przy jednoczesnym utrzymaniu real- time pokrycie dla pozycji krytycznych.
  • Przedziały adaptacyjne: Jeżeli cena produktu jest stabilna przez 24 godziny, należy zwiększyć przedział kontrolny. Jeśli zmienił się dwa razy w ciągu godziny, zmniejsz go.
  • Specyficzna dla danej pozycji zmienność: Różne miejsca docelowe mają różne tolerancje. Uruchom więcej współdziałających pracowników dla Shopify (bardziej liberalne) i mniej dla Amazon (bardziej agresywne wykrywanie).

Aby uzyskać więcej informacji na temat strategii proxy, które wspierają monitorowanie wysokiej częstotliwości, zapoznaj się z naszym przewodnikiem najlepsze proxy do skrobania stron internetowych oraz Plany cenowe ProxyHat dla dużych zastosowań.

Key Takeaways

  • Monitorowanie czasu rzeczywistego zmniejsza wykrywanie zmian cen z godzin do minut, co ma kluczowe znaczenie dla przeszacowania cen i reakcji konkurencyjnej.
  • Użyj kolejki priorytetowej, aby skoncentrować zasoby na produktach wysokiej wartości, jednocześnie pokrywając długi ogon.
  • Basen pracowników z równoczesnymi połączeniami proxy zapewnia przepustowość bez pęknięć.
  • Zmiana silników wykrywających hałas filtra - tylko proces i ostrzeżenie o rzeczywistych zmianach cen.
  • Przechowywanie danych nieprzetworzonych w bazie danych szeregów czasowych (TimescoleDB) z zasadami retencji zarządzania kosztami.
  • Proxy mieszkaniowe z trwałą rotacją są niezbędne do ciągłego monitorowania. Zacznij od ProxyHat za niezawodny dostęp.

Budowa infrastruktury czasu rzeczywistego? Przeczytaj nasz Przewodnik po e-handlu dla pełnej strategii i sprawdzić nasze przewodniki na przy użyciu proxy w Python oraz Node.js szczegóły wdrażania.

Gotowy, aby zacząć?

Dostęp do ponad 50 mln rezydencjalnych IP w ponad 148 krajach z filtrowaniem AI.

Zobacz cenyProxy rezydencjalne
← Powrót do Bloga