Мониторинг в реальном времени vs пакетный мониторинг
Большинство систем мониторинга цен работают в пакетном режиме: проверяют все товары каждый час (или каждые несколько часов), сохраняют результаты и отправляют оповещения об изменениях. Это работает для многих задач, но на быстро меняющихся рынках — флеш-распродажи, динамическое ценообразование, конкуренция на маркетплейсах — пакетный мониторинг пропускает критические изменения цен между проверками.
Мониторинг в реальном времени сокращает задержку обнаружения с часов до минут или даже секунд. Вместо проверки каждого товара по фиксированному расписанию система реального времени непрерывно мониторит высокоприоритетные цели и реагирует на изменения по мере их возникновения. В этом руководстве рассматриваются архитектура, прокси-инфраструктура и детали реализации, необходимые для построения системы мониторинга в реальном времени. Для базовых концепций мониторинга цен смотрите наше руководство по автоматическому мониторингу цен конкурентов.
| Аспект | Пакетный мониторинг | Мониторинг в реальном времени |
|---|---|---|
| Частота проверок | Каждые 1-24 часа | Каждые 1-5 минут для приоритетных позиций |
| Задержка обнаружения | До одного полного интервала | Менее 5 минут |
| Использование прокси | Концентрированные пакеты | Равномерный распределённый поток |
| Инфраструктура | Простые cron-задачи | Событийная архитектура с пулом воркеров |
| Стоимость | Ниже | Выше (больше запросов, больше прокси) |
| Лучше для | Ежедневные отчёты, анализ трендов | Репрайсинг, обнаружение флеш-распродаж, конкурентные торги |
Архитектура мониторинга в реальном времени
Система мониторинга цен в реальном времени состоит из пяти основных компонентов, работающих как непрерывный конвейер.
1. Приоритетная очередь
Товарам назначаются приоритетные уровни, определяющие частоту проверок. Приоритетная очередь (отлично подходят Redis Sorted Sets) обеспечивает проверку высокоценных товаров в первую очередь.
import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
"""Add a product to the monitoring queue."""
next_check = time.time()
r.zadd("price_queue", {json.dumps({
"product_id": product_id,
"url": url,
"interval": priority_minutes * 60,
}): next_check})
def get_next_batch(batch_size=10):
"""Get the next batch of products due for checking."""
now = time.time()
items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
products = []
for item in items:
data = json.loads(item)
r.zadd("price_queue", {item: now + data["interval"]})
products.append(data)
return products
# Пример: добавление товаров с разными приоритетами
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)
2. Пул воркеров
Несколько рабочих процессов извлекают задачи из приоритетной очереди, получают цены через прокси и передают результаты в конвейер данных. Воркеры работают независимо, каждый со своим подключением к прокси.
import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
"""Fetch the current price for a product."""
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
proxies = {"http": PROXY_URL, "https": PROXY_URL}
try:
response = requests.get(
product["url"], headers=headers,
proxies=proxies, timeout=30
)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
price_el = soup.select_one("span.a-price-whole")
if price_el:
price = float(price_el.get_text(strip=True).replace(",", ""))
return {
"product_id": product["product_id"],
"price": price,
"timestamp": time.time(),
"status": "success",
}
except Exception as e:
pass
return {
"product_id": product["product_id"],
"price": None,
"timestamp": time.time(),
"status": "failed",
}
def run_workers(num_workers=10):
"""Run the monitoring worker pool."""
with ThreadPoolExecutor(max_workers=num_workers) as executor:
while True:
batch = get_next_batch(batch_size=num_workers)
if not batch:
time.sleep(1)
continue
futures = [executor.submit(fetch_price, product) for product in batch]
for future in futures:
result = future.result()
process_result(result)
time.sleep(random.uniform(0.5, 2))
3. Движок обнаружения изменений
Вместо сохранения каждой проверки движок обнаружения изменений сравнивает текущие цены с последними известными значениями и генерирует события только при фактических изменениях.
class ChangeDetector:
def __init__(self, redis_client):
self.redis = redis_client
def check_change(self, product_id, new_price):
"""Compare new price against last known and detect changes."""
key = f"last_price:{product_id}"
last_data = self.redis.get(key)
if last_data:
last = json.loads(last_data)
old_price = last["price"]
if old_price and new_price and old_price != new_price:
change_pct = ((new_price - old_price) / old_price) * 100
event = {
"product_id": product_id,
"old_price": old_price,
"new_price": new_price,
"change_pct": round(change_pct, 2),
"timestamp": time.time(),
}
self.redis.publish("price_changes", json.dumps(event))
return event
self.redis.set(key, json.dumps({
"price": new_price,
"timestamp": time.time(),
}))
return None
4. Поток событий
Изменения цен публикуются в канал Redis Pub/Sub (или топик Kafka для более крупных систем). Нижестоящие потребители — сервисы оповещений, движки репрайсинга, дашборды — подписываются на эти события и реагируют независимо.
import redis
import json
def subscribe_to_changes():
"""Subscribe to price change events."""
r = redis.Redis(host="localhost", port=6379)
pubsub = r.pubsub()
pubsub.subscribe("price_changes")
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
handle_price_change(event)
def handle_price_change(event):
"""Process a price change event."""
change = event["change_pct"]
product = event["product_id"]
if change < -10:
send_urgent_alert(event)
elif change < -5:
send_alert(event)
elif change > 10:
send_alert(event)
store_price_change(event)
5. Дашборд и оповещения
Данные в реальном времени требуют визуализации в реальном времени. Используйте WebSocket-подключения для мгновенной отправки обновлений цен на дашборды.
Реализация на Node.js
Версия движка мониторинга в реальном времени на Node.js с использованием Node SDK от ProxyHat.
const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
constructor(concurrency = 10) {
this.concurrency = concurrency;
this.running = false;
this.agent = new HttpsProxyAgent(PROXY_URL);
}
async fetchPrice(product) {
try {
const { data } = await axios.get(product.url, {
httpsAgent: new HttpsProxyAgent(PROXY_URL),
headers: {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
},
timeout: 30000,
});
const $ = cheerio.load(data);
const priceText = $("span.a-price-whole").first().text().trim();
const price = parseFloat(priceText.replace(/,/g, "")) || null;
return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
} catch (err) {
return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
}
}
async checkChange(productId, newPrice) {
const key = `last_price:${productId}`;
const lastData = await redis.get(key);
if (lastData) {
const last = JSON.parse(lastData);
if (last.price && newPrice && last.price !== newPrice) {
const changePct = ((newPrice - last.price) / last.price) * 100;
const event = {
productId,
oldPrice: last.price,
newPrice,
changePct: Math.round(changePct * 100) / 100,
timestamp: Date.now(),
};
await redis.publish("price_changes", JSON.stringify(event));
return event;
}
}
await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
return null;
}
async processProduct(product) {
const result = await this.fetchPrice(product);
if (result.price) {
const change = await this.checkChange(result.productId, result.price);
if (change) {
console.log(
`Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
);
}
}
await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
}
async start() {
this.running = true;
console.log(`Starting monitor with ${this.concurrency} workers`);
while (this.running) {
const batch = await this.getNextBatch(this.concurrency);
if (batch.length === 0) {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
await Promise.all(batch.map((p) => this.processProduct(p)));
}
}
async getNextBatch(size) {
const now = Date.now() / 1000;
const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
const products = [];
for (const item of items) {
const data = JSON.parse(item);
await redis.zadd("price_queue", now + data.interval, item);
products.push(data);
}
return products;
}
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();
Управление прокси для непрерывного мониторинга
Мониторинг в реальном времени предъявляет уникальные требования к прокси-инфраструктуре по сравнению с пакетным скрапингом.
Стабильный паттерн запросов
В отличие от пакетного скрапинга, отправляющего пакеты запросов, мониторинг в реальном времени создаёт постоянный поток. Это на самом деле лучше для здоровья прокси — равномерный поток в 5-10 запросов в секунду выглядит естественнее, чем 1000 запросов за 2 минуты.
Конфигурация ProxyHat для реального времени
# Ротация при каждом запросе (по умолчанию, рекомендуется для большинства проверок)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Геотаргетинг для мониторинга конкретных маркетплейсов
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 для контроля на уровне протокола
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080
Мониторинг здоровья IP
Отслеживайте процент успешных запросов по каждому целевому сайту и динамически корректируйте подход. Если процент успеха падает на конкретном маркетплейсе, увеличьте задержки или переключитесь на другой геотаргетированный пул. Большой резидентный пул ProxyHat гарантирует наличие свежих IP. Полный охват смотрите в разделе локации прокси.
Ключевой вывод: мониторинг в реальном времени требует стабильной, устойчивой стратегии прокси. Цель — постоянные низкообъёмные запросы через множество IP, а не высокообъёмные пакеты с немногих.
Хранение данных реального времени
Данные о ценах в реальном времени требуют решения для хранения, оптимизированного для частых вставок и запросов по временным диапазонам.
Схема TimescaleDB
-- TimescaleDB hypertable для данных о ценах
CREATE TABLE price_ticks (
time TIMESTAMPTZ NOT NULL,
product_id TEXT NOT NULL,
price DECIMAL(10,2),
currency VARCHAR(3) DEFAULT 'USD',
source_url TEXT,
status VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Непрерывный агрегат для часовых сводок
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
product_id,
AVG(price) AS avg_price,
MIN(price) AS min_price,
MAX(price) AS max_price,
COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Политика хранения: сырые данные — 30 дней
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Часовые агрегаты — 1 год
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');
Вопросы масштабирования
- Горизонтальное масштабирование воркеров: добавляйте воркеры на разных машинах, все извлекающие из одной очереди Redis. Координация не нужна — очередь сама распределяет задачи.
- Ограничение на основе приоритетов: при ограниченном бюджете на прокси автоматически снижайте частоту проверок низкоприоритетных товаров, сохраняя мониторинг в реальном времени для критических позиций.
- Адаптивные интервалы: если цена товара стабильна 24 часа — увеличьте интервал проверки. Если менялась дважды за час — уменьшите.
- Параллельность по сайтам: разные целевые сайты имеют разную толерантность. Запускайте больше параллельных воркеров для Shopify (более лояльный) и меньше для Amazon (более агрессивное обнаружение).
Подробнее о стратегиях прокси для высокочастотного мониторинга читайте в нашем руководстве лучшие прокси для веб-скрапинга и ознакомьтесь с тарифными планами ProxyHat для больших объёмов.
Ключевые выводы
- Мониторинг в реальном времени сокращает обнаружение изменений цен с часов до минут — критично для репрайсинга и конкурентного реагирования.
- Используйте приоритетную очередь для концентрации ресурсов на высокоценных товарах при охвате длинного хвоста.
- Пул воркеров с параллельными прокси-подключениями обеспечивает пропускную способность без пиковых паттернов.
- Движки обнаружения изменений фильтруют шум — обрабатывайте и оповещайте только о фактических изменениях цен.
- Храните сырые данные в базе временных рядов (TimescaleDB) с политиками хранения для управления стоимостью.
- Резидентные прокси со стабильной ротацией необходимы для непрерывного мониторинга. Начните с ProxyHat для надёжного доступа.
Строите инфраструктуру реального времени? Прочитайте наше руководство по скрапингу e-commerce для полной стратегии и ознакомьтесь с руководствами по использованию прокси в Python и Node.js для деталей реализации.






