Real- Time vs.
Większość systemów monitorowania cen działa w trybie wsadowym: sprawdzać wszystkie produkty co godzinę (lub co kilka godzin), przechowywać wyniki i wysyłać powiadomienia o zmianach. Działa to w wielu przypadkach zastosowania, ale na szybko zmieniających się rynkach - sprzedaży flash, dynamicznych cen, konkurencji rynkowej - monitorowanie partii pomija krytyczne zmiany cen, które zdarzają się między kontrolami.
Monitorowanie cen w czasie rzeczywistym zmniejsza opóźnienie wykrywania z godzin na minuty lub nawet sekundy. Zamiast sprawdzać każdy produkt w ustalonym harmonogramie, system real- time stale monitoruje cele o wysokim priorytecie i reaguje na zmiany w ich trakcie. Niniejszy przewodnik obejmuje architekturę, infrastrukturę proxy oraz szczegóły wdrażania niezbędne do budowy systemu monitorowania w czasie rzeczywistym. Podstawowe koncepcje monitorowania cen znajdują się w naszym przewodniku Automatyczne monitorowanie cen konkurencyjnych.
| Aspekt | Monitorowanie partii | Monitorowanie czasu rzeczywistego |
|---|---|---|
| Częstotliwość kontroli | Co 1- 24 godziny | Co 1- 5 minut dla elementów priorytetowych |
| Opóźnienie wykrywania | Do jednego pełnego odstępu czasu | Niecałe 5 minut |
| Wykorzystanie proxy | Skoncentrowane pęknięcia | Strumień stały, rozproszony |
| Infrastruktura | Proste zadania cron | Event- driven z basenów pracowników |
| Koszt | Niższe | Wyższe (więcej wniosków, więcej proxy) |
| Najlepsze dla | Dzienne raporty, analiza tendencji | Poprawianie, wykrywanie sprzedaży flash, przetargi konkurencyjne |
Architektura monitorowania rzeczywistego czasu
System monitorowania cen w czasie rzeczywistym posiada pięć podstawowych elementów, które współpracują jako ciągły rurociąg.
1. Kolejka priorytetowa
Produkty są przypisane do poziomów priorytetowych, które określają częstotliwość kontroli. Kolejka priorytetowa (Redis Sorted Sets działa dobrze) zapewnia, że produkty o wysokiej wartości są zawsze sprawdzane najpierw.
import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
"""Add a product to the monitoring queue."""
next_check = time.time() # Check immediately on first add
r.zadd("price_queue", {json.dumps({
"product_id": product_id,
"url": url,
"interval": priority_minutes * 60,
}): next_check})
def get_next_batch(batch_size=10):
"""Get the next batch of products due for checking."""
now = time.time()
items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
products = []
for item in items:
data = json.loads(item)
r.zadd("price_queue", {item: now + data["interval"]})
products.append(data)
return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)2. Pool robotniczy
Wiele procesów pracy wyciągnąć z kolejki priorytetowej, pobrać ceny poprzez proxy, i przenieść wyniki do rurociągu danych. Pracownicy działają niezależnie, każdy z własnym połączeniem proxy.
import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
"""Fetch the current price for a product."""
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
proxies = {"http": PROXY_URL, "https": PROXY_URL}
try:
response = requests.get(
product["url"], headers=headers,
proxies=proxies, timeout=30
)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
price_el = soup.select_one("span.a-price-whole")
if price_el:
price = float(price_el.get_text(strip=True).replace(",", ""))
return {
"product_id": product["product_id"],
"price": price,
"timestamp": time.time(),
"status": "success",
}
except Exception as e:
pass
return {
"product_id": product["product_id"],
"price": None,
"timestamp": time.time(),
"status": "failed",
}
def run_workers(num_workers=10):
"""Run the monitoring worker pool."""
with ThreadPoolExecutor(max_workers=num_workers) as executor:
while True:
batch = get_next_batch(batch_size=num_workers)
if not batch:
time.sleep(1)
continue
futures = [executor.submit(fetch_price, product) for product in batch]
for future in futures:
result = future.result()
process_result(result)
time.sleep(random.uniform(0.5, 2))3. Zmień silnik detekcji
Zamiast zapisywać każdą kontrolę cen, silnik wykrywania zmian porównuje ceny bieżące z ostatnimi znanymi wartościami i uruchamia tylko zdarzenia dotyczące rzeczywistych zmian.
class ChangeDetector:
def __init__(self, redis_client):
self.redis = redis_client
def check_change(self, product_id, new_price):
"""Compare new price against last known and detect changes."""
key = f"last_price:{product_id}"
last_data = self.redis.get(key)
if last_data:
last = json.loads(last_data)
old_price = last["price"]
if old_price and new_price and old_price != new_price:
change_pct = ((new_price - old_price) / old_price) * 100
event = {
"product_id": product_id,
"old_price": old_price,
"new_price": new_price,
"change_pct": round(change_pct, 2),
"timestamp": time.time(),
}
# Publish change event
self.redis.publish("price_changes", json.dumps(event))
return event
# Update last known price
self.redis.set(key, json.dumps({
"price": new_price,
"timestamp": time.time(),
}))
return None4. Strumień zdarzeń
Zmiany cen są publikowane na kanale Redis Pub / Sub (lub temat Kafka dla większych systemów). Konsumenci niższego szczebla - ostrzegający o usługach, zmieniający ceny silników, deski rozdzielcze - subskrybują te wydarzenia i reagują niezależnie.
import redis
import json
def subscribe_to_changes():
"""Subscribe to price change events."""
r = redis.Redis(host="localhost", port=6379)
pubsub = r.pubsub()
pubsub.subscribe("price_changes")
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
handle_price_change(event)
def handle_price_change(event):
"""Process a price change event."""
change = event["change_pct"]
product = event["product_id"]
if change < -10:
send_urgent_alert(event) # Major price drop
elif change < -5:
send_alert(event) # Moderate drop
elif change > 10:
send_alert(event) # Significant increase
# Always log to time-series database
store_price_change(event)5. Deska rozdzielcza i alarmy
Dane w czasie rzeczywistym wymagają wizualizacji w czasie rzeczywistym. Użyj połączeń WebSocket, aby natychmiast wcisnąć aktualizacje cen do desek rozdzielczych.
Wdrażanie Node.js
Wersja Node.js monitorującego w czasie rzeczywistym silnika Węzeł ProxyHat SDK.
const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
constructor(concurrency = 10) {
this.concurrency = concurrency;
this.running = false;
this.agent = new HttpsProxyAgent(PROXY_URL);
}
async fetchPrice(product) {
try {
const { data } = await axios.get(product.url, {
httpsAgent: new HttpsProxyAgent(PROXY_URL),
headers: {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
},
timeout: 30000,
});
const $ = cheerio.load(data);
const priceText = $("span.a-price-whole").first().text().trim();
const price = parseFloat(priceText.replace(/,/g, "")) || null;
return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
} catch (err) {
return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
}
}
async checkChange(productId, newPrice) {
const key = `last_price:${productId}`;
const lastData = await redis.get(key);
if (lastData) {
const last = JSON.parse(lastData);
if (last.price && newPrice && last.price !== newPrice) {
const changePct = ((newPrice - last.price) / last.price) * 100;
const event = {
productId,
oldPrice: last.price,
newPrice,
changePct: Math.round(changePct * 100) / 100,
timestamp: Date.now(),
};
await redis.publish("price_changes", JSON.stringify(event));
return event;
}
}
await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
return null;
}
async processProduct(product) {
const result = await this.fetchPrice(product);
if (result.price) {
const change = await this.checkChange(result.productId, result.price);
if (change) {
console.log(
`Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
);
}
}
// Random delay
await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
}
async start() {
this.running = true;
console.log(`Starting monitor with ${this.concurrency} workers`);
while (this.running) {
const batch = await this.getNextBatch(this.concurrency);
if (batch.length === 0) {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
await Promise.all(batch.map((p) => this.processProduct(p)));
}
}
async getNextBatch(size) {
const now = Date.now() / 1000;
const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
const products = [];
for (const item of items) {
const data = JSON.parse(item);
await redis.zadd("price_queue", now + data.interval, item);
products.push(data);
}
return products;
}
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();Zarządzanie proxy dla ciągłego monitorowania
Real- time monitoring stawia unikalne wymagania na Twojej infrastrukturze proxy w porównaniu do drapania partii.
Wzór wniosku Steady- State Request
W przeciwieństwie do zeskrobywania partii, które wysyła wybuchy żądań, monitorowanie czasu rzeczywistego tworzy stały strumień. Jest to rzeczywiście lepsze dla zdrowia proxy - stały przepływ 5- 10 wniosków na sekundę wygląda bardziej naturalne niż 1000 wniosków w 2-minutowym wybuchu.
Konfiguracja ProxyHat dla czasu rzeczywistego
# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080Monitorowanie zdrowia IP
Track wskaźnik sukcesu na miejscu docelowym i dostosować podejście dynamicznie. Jeśli wskaźniki sukcesu spadną na określonym rynku, zwiększą się opóźnienia lub przejdą do innej puli geodocelowej. Duży basen mieszkalny ProxyHat zapewnia zawsze świeże IP dostępne. Sprawdź nasze lokalizacje proxy dla pełnego pokrycia.
Kluczowe podejście: Monitorowanie czasu rzeczywistego wymaga stałej, zrównoważonej strategii proxy. Celem jest konsekwentny niski wolumen wniosków w wielu IP, a nie wysoki wolumen pęka z kilku IP.
Przechowywanie danych dla danych real- time
Dane ceny real- time potrzebują rozwiązania pamięci masowej zoptymalizowanego dla wkładek wysokiej częstotliwości i zapytań o zakres czasowy.
Schemat czasowy DB
-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
time TIMESTAMPTZ NOT NULL,
product_id TEXT NOT NULL,
price DECIMAL(10,2),
currency VARCHAR(3) DEFAULT 'USD',
source_url TEXT,
status VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
product_id,
AVG(price) AS avg_price,
MIN(price) AS min_price,
MAX(price) AS max_price,
COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');Rozważania skalujące
- Poziome skalowanie pracowników: Dodawanie pracowników do wielu maszyn, każde ciągnięcie z tej samej kolejki Redis. Brak koordynacji - kolejka obsługuje dystrybucję.
- Tłoczenie oparte na priorytetach: Gdy budżet proxy jest ograniczony, automatycznie zmniejszyć częstotliwość kontroli dla produktów o niskim priorytecie, przy jednoczesnym utrzymaniu real- time pokrycie dla pozycji krytycznych.
- Przedziały adaptacyjne: Jeżeli cena produktu jest stabilna przez 24 godziny, należy zwiększyć przedział kontrolny. Jeśli zmienił się dwa razy w ciągu godziny, zmniejsz go.
- Specyficzna dla danej pozycji zmienność: Różne miejsca docelowe mają różne tolerancje. Uruchom więcej współdziałających pracowników dla Shopify (bardziej liberalne) i mniej dla Amazon (bardziej agresywne wykrywanie).
Aby uzyskać więcej informacji na temat strategii proxy, które wspierają monitorowanie wysokiej częstotliwości, zapoznaj się z naszym przewodnikiem najlepsze proxy do skrobania stron internetowych oraz Plany cenowe ProxyHat dla dużych zastosowań.
Key Takeaways
- Monitorowanie czasu rzeczywistego zmniejsza wykrywanie zmian cen z godzin do minut, co ma kluczowe znaczenie dla przeszacowania cen i reakcji konkurencyjnej.
- Użyj kolejki priorytetowej, aby skoncentrować zasoby na produktach wysokiej wartości, jednocześnie pokrywając długi ogon.
- Basen pracowników z równoczesnymi połączeniami proxy zapewnia przepustowość bez pęknięć.
- Zmiana silników wykrywających hałas filtra - tylko proces i ostrzeżenie o rzeczywistych zmianach cen.
- Przechowywanie danych nieprzetworzonych w bazie danych szeregów czasowych (TimescoleDB) z zasadami retencji zarządzania kosztami.
- Proxy mieszkaniowe z trwałą rotacją są niezbędne do ciągłego monitorowania. Zacznij od ProxyHat za niezawodny dostęp.
Budowa infrastruktury czasu rzeczywistego? Przeczytaj nasz Przewodnik po e-handlu dla pełnej strategii i sprawdzić nasze przewodniki na przy użyciu proxy w Python oraz Node.js szczegóły wdrażania.






