构建实时价格监控基础设施

设计和建设具有重点排队,工人池,变化检测,住宅代理轮换等功能的实时价格监测系统. 完成Python和Node.js执行指南.

构建实时价格监控基础设施

实时对批量价格监测

大部分价格监控系统都以批量模式运行:每小时(或每几个小时)检查所有产品,存储结果,并发送变化警报. 这对于许多使用案例是有效的,但在快速移动的市场——闪存销售、动态定价、市场竞争——批量监测却错过了在检查之间发生的重大价格变化。

实时价格监测将检测滞后时间从小时降至分钟甚至秒. 实时系统不是在固定的时间内检查每个产品,而是不断监测高优先级目标,并随时对变化作出反应. 本指南涵盖构建实时监测系统所需的架构、代理基础设施和实施细节。 关于基本价格监测概念,见我们关于 自动监测竞争者价格。 。 。

侧面批量监测实时监测
检查频率每1-24小时优先项目每1-5分钟
检测滞后最多一个完整的间隔5分钟以下
代理用户集中连发稳定、分布式溪流
基础设施简单的 cron 工作由工人集体参与的活动驱动
费用下调更高( 更多请求, 更多代理)
最佳办法每日报告、趋势分析翻新、闪售检测、竞标

实时监测架构

实时价格监测系统有五个核心组成部分,作为连续的管道共同发挥作用。

1. 优先排队

产品被指定优先等级,以确定检查频率。 优先排队( 重新排序的集) 确保高值产品总是先检查 。

import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
    """Add a product to the monitoring queue."""
    next_check = time.time()  # Check immediately on first add
    r.zadd("price_queue", {json.dumps({
        "product_id": product_id,
        "url": url,
        "interval": priority_minutes * 60,
    }): next_check})
def get_next_batch(batch_size=10):
    """Get the next batch of products due for checking."""
    now = time.time()
    items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
    products = []
    for item in items:
        data = json.loads(item)
        r.zadd("price_queue", {item: now + data["interval"]})
        products.append(data)
    return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)

2. 工人人才库

多个工人流程从优先排队中拉动,通过代理获取价格,并将结果推向数据管道. 工人独立运作,每个人都有自己的代理联系。

import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
    """Fetch the current price for a product."""
    headers = {
        "User-Agent": random.choice(USER_AGENTS),
        "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
        "Accept-Language": "en-US,en;q=0.9",
    }
    proxies = {"http": PROXY_URL, "https": PROXY_URL}
    try:
        response = requests.get(
            product["url"], headers=headers,
            proxies=proxies, timeout=30
        )
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, "html.parser")
            price_el = soup.select_one("span.a-price-whole")
            if price_el:
                price = float(price_el.get_text(strip=True).replace(",", ""))
                return {
                    "product_id": product["product_id"],
                    "price": price,
                    "timestamp": time.time(),
                    "status": "success",
                }
    except Exception as e:
        pass
    return {
        "product_id": product["product_id"],
        "price": None,
        "timestamp": time.time(),
        "status": "failed",
    }
def run_workers(num_workers=10):
    """Run the monitoring worker pool."""
    with ThreadPoolExecutor(max_workers=num_workers) as executor:
        while True:
            batch = get_next_batch(batch_size=num_workers)
            if not batch:
                time.sleep(1)
                continue
            futures = [executor.submit(fetch_price, product) for product in batch]
            for future in futures:
                result = future.result()
                process_result(result)
            time.sleep(random.uniform(0.5, 2))

3. 改变探测引擎

更改检测引擎不是存储每一次价格检查,而是将当前价格与最后已知值进行比较,只触发实际变化的事件.

class ChangeDetector:
    def __init__(self, redis_client):
        self.redis = redis_client
    def check_change(self, product_id, new_price):
        """Compare new price against last known and detect changes."""
        key = f"last_price:{product_id}"
        last_data = self.redis.get(key)
        if last_data:
            last = json.loads(last_data)
            old_price = last["price"]
            if old_price and new_price and old_price != new_price:
                change_pct = ((new_price - old_price) / old_price) * 100
                event = {
                    "product_id": product_id,
                    "old_price": old_price,
                    "new_price": new_price,
                    "change_pct": round(change_pct, 2),
                    "timestamp": time.time(),
                }
                # Publish change event
                self.redis.publish("price_changes", json.dumps(event))
                return event
        # Update last known price
        self.redis.set(key, json.dumps({
            "price": new_price,
            "timestamp": time.time(),
        }))
        return None

4. 活动流

价格变动被公布到一个Redis Pub/Sub频道(或大系统的Kafka主题). 下游消费者——提醒服务,重塑引擎,仪表板——订阅这些事件并独立反应.

import redis
import json
def subscribe_to_changes():
    """Subscribe to price change events."""
    r = redis.Redis(host="localhost", port=6379)
    pubsub = r.pubsub()
    pubsub.subscribe("price_changes")
    for message in pubsub.listen():
        if message["type"] == "message":
            event = json.loads(message["data"])
            handle_price_change(event)
def handle_price_change(event):
    """Process a price change event."""
    change = event["change_pct"]
    product = event["product_id"]
    if change < -10:
        send_urgent_alert(event)  # Major price drop
    elif change < -5:
        send_alert(event)         # Moderate drop
    elif change > 10:
        send_alert(event)         # Significant increase
    # Always log to time-series database
    store_price_change(event)

5. 挂板和警报

实时数据需要实时可视化. 使用WebSocket连接将价格更新立即推向仪表板.

节点.js 执行

实时监控引擎的节点版 代理哈特节点 SDK。 。 。

const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
  constructor(concurrency = 10) {
    this.concurrency = concurrency;
    this.running = false;
    this.agent = new HttpsProxyAgent(PROXY_URL);
  }
  async fetchPrice(product) {
    try {
      const { data } = await axios.get(product.url, {
        httpsAgent: new HttpsProxyAgent(PROXY_URL),
        headers: {
          "User-Agent":
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
          "Accept-Language": "en-US,en;q=0.9",
        },
        timeout: 30000,
      });
      const $ = cheerio.load(data);
      const priceText = $("span.a-price-whole").first().text().trim();
      const price = parseFloat(priceText.replace(/,/g, "")) || null;
      return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
    } catch (err) {
      return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
    }
  }
  async checkChange(productId, newPrice) {
    const key = `last_price:${productId}`;
    const lastData = await redis.get(key);
    if (lastData) {
      const last = JSON.parse(lastData);
      if (last.price && newPrice && last.price !== newPrice) {
        const changePct = ((newPrice - last.price) / last.price) * 100;
        const event = {
          productId,
          oldPrice: last.price,
          newPrice,
          changePct: Math.round(changePct * 100) / 100,
          timestamp: Date.now(),
        };
        await redis.publish("price_changes", JSON.stringify(event));
        return event;
      }
    }
    await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
    return null;
  }
  async processProduct(product) {
    const result = await this.fetchPrice(product);
    if (result.price) {
      const change = await this.checkChange(result.productId, result.price);
      if (change) {
        console.log(
          `Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
        );
      }
    }
    // Random delay
    await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
  }
  async start() {
    this.running = true;
    console.log(`Starting monitor with ${this.concurrency} workers`);
    while (this.running) {
      const batch = await this.getNextBatch(this.concurrency);
      if (batch.length === 0) {
        await new Promise((r) => setTimeout(r, 1000));
        continue;
      }
      await Promise.all(batch.map((p) => this.processProduct(p)));
    }
  }
  async getNextBatch(size) {
    const now = Date.now() / 1000;
    const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
    const products = [];
    for (const item of items) {
      const data = JSON.parse(item);
      await redis.zadd("price_queue", now + data.interval, item);
      products.push(data);
    }
    return products;
  }
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();

用于持续监测的代理管理

实时监控对您的代理基础设施提出了独特的要求, 相对于批量的报废。

稳态请求模式

与发送连续请求的批量刮刮不同,实时监测会产生恒流. 这实际上对代理健康来说更好——每秒5-10个请求的稳步流动,在2分钟的爆破中看起来比1000个请求更自然.

实时代理哈特配置

# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080

IP 健康监测

追踪每个目标地点的成功率,并动态调整您的方法。 如果某一具体市场的成功率下降,就会增加延误或转换到不同的地理目标池。 代理Hat的大型住宅池确保你总是有新鲜的IP可用. 检查我们 代理地址 为全覆盖。

关键外卖:实时监测需要一个稳定、可持续的代理战略。 目标是在许多实施伙伴中持续提出低量请求,而不是少数实施伙伴提出的高量请求。

实时数据存储

实时价格数据需要优化高频插入和时程查询的存储解决方案.

时间尺度DB Schema

-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
    time        TIMESTAMPTZ NOT NULL,
    product_id  TEXT NOT NULL,
    price       DECIMAL(10,2),
    currency    VARCHAR(3) DEFAULT 'USD',
    source_url  TEXT,
    status      VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
    time_bucket('1 hour', time) AS hour,
    product_id,
    AVG(price) AS avg_price,
    MIN(price) AS min_price,
    MAX(price) AS max_price,
    COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');

扩大考虑

  • 水平工人规模 : 在多台机器上增加工人,每人从相同的雷迪斯队列中抽出. 不需要协调——队列处理分发.
  • 优先节制: 当代理预算有限时,会自动减少低优先级产品的检查频率,同时保持关键项目的实时覆盖.
  • 适应间隔 : 如果某一产品的价格稳定了24小时,则增加检查间隔。 如果一个小时内改变两次,减少.
  • 特定地点的货币: 不同的目标网站具有不同的耐受性. 为Shopify(更宽容)和Amazon(更积极的检测)经营更多的同时工人。

更多关于支持高频监测的代理战略,请探索我们的指南。 网络刮刮的最佳代理代理哈特的定价计划 用于大量使用。

关键外卖

  • 实时监测将价格变化的检测从小时降至分钟,对重新评价及竞争反应至关重要.
  • 使用优先排队将资源集中到高值产品上,同时仍覆盖长尾.
  • 具有并行代理连接的工人池提供不破裂模式的吞吐量。
  • 改变检测引擎过滤噪声——只处理和警惕实际价格变化.
  • 将原始数据储存在时间序列数据库(TimescaleDB)中,并保留成本管理政策。
  • 具有稳定状态旋转的住宅代用品对持续监测至关重要。 开始 代理汉特 为可靠的访问。

建设实时基础设施? 读我们 电子商务刮刮指南 并检查我们的指南 使用 Python 中的代理节点.js 详细执行情况。

准备开始了吗?

通过AI过滤访问148多个国家的5000多万个住宅IP。

查看价格住宅代理
← 返回博客