实时对批量价格监测
大部分价格监控系统都以批量模式运行:每小时(或每几个小时)检查所有产品,存储结果,并发送变化警报. 这对于许多使用案例是有效的,但在快速移动的市场——闪存销售、动态定价、市场竞争——批量监测却错过了在检查之间发生的重大价格变化。
实时价格监测将检测滞后时间从小时降至分钟甚至秒. 实时系统不是在固定的时间内检查每个产品,而是不断监测高优先级目标,并随时对变化作出反应. 本指南涵盖构建实时监测系统所需的架构、代理基础设施和实施细节。 关于基本价格监测概念,见我们关于 自动监测竞争者价格。 。 。
| 侧面 | 批量监测 | 实时监测 |
|---|---|---|
| 检查频率 | 每1-24小时 | 优先项目每1-5分钟 |
| 检测滞后 | 最多一个完整的间隔 | 5分钟以下 |
| 代理用户 | 集中连发 | 稳定、分布式溪流 |
| 基础设施 | 简单的 cron 工作 | 由工人集体参与的活动驱动 |
| 费用 | 下调 | 更高( 更多请求, 更多代理) |
| 最佳办法 | 每日报告、趋势分析 | 翻新、闪售检测、竞标 |
实时监测架构
实时价格监测系统有五个核心组成部分,作为连续的管道共同发挥作用。
1. 优先排队
产品被指定优先等级,以确定检查频率。 优先排队( 重新排序的集) 确保高值产品总是先检查 。
import redis
import time
import json
r = redis.Redis(host="localhost", port=6379)
def add_product(product_id, url, priority_minutes):
"""Add a product to the monitoring queue."""
next_check = time.time() # Check immediately on first add
r.zadd("price_queue", {json.dumps({
"product_id": product_id,
"url": url,
"interval": priority_minutes * 60,
}): next_check})
def get_next_batch(batch_size=10):
"""Get the next batch of products due for checking."""
now = time.time()
items = r.zrangebyscore("price_queue", 0, now, start=0, num=batch_size)
products = []
for item in items:
data = json.loads(item)
r.zadd("price_queue", {item: now + data["interval"]})
products.append(data)
return products
# Example: Add products with different priorities
add_product("SKU001", "https://www.amazon.com/dp/B0CHX3QBCH", priority_minutes=2)
add_product("SKU002", "https://www.amazon.com/dp/B0D5BKRY4R", priority_minutes=5)
add_product("SKU003", "https://www.amazon.com/dp/B0CRMZHDG7", priority_minutes=15)2. 工人人才库
多个工人流程从优先排队中拉动,通过代理获取价格,并将结果推向数据管道. 工人独立运作,每个人都有自己的代理联系。
import requests
import random
from concurrent.futures import ThreadPoolExecutor
from bs4 import BeautifulSoup
PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
]
def fetch_price(product):
"""Fetch the current price for a product."""
headers = {
"User-Agent": random.choice(USER_AGENTS),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
}
proxies = {"http": PROXY_URL, "https": PROXY_URL}
try:
response = requests.get(
product["url"], headers=headers,
proxies=proxies, timeout=30
)
if response.status_code == 200:
soup = BeautifulSoup(response.text, "html.parser")
price_el = soup.select_one("span.a-price-whole")
if price_el:
price = float(price_el.get_text(strip=True).replace(",", ""))
return {
"product_id": product["product_id"],
"price": price,
"timestamp": time.time(),
"status": "success",
}
except Exception as e:
pass
return {
"product_id": product["product_id"],
"price": None,
"timestamp": time.time(),
"status": "failed",
}
def run_workers(num_workers=10):
"""Run the monitoring worker pool."""
with ThreadPoolExecutor(max_workers=num_workers) as executor:
while True:
batch = get_next_batch(batch_size=num_workers)
if not batch:
time.sleep(1)
continue
futures = [executor.submit(fetch_price, product) for product in batch]
for future in futures:
result = future.result()
process_result(result)
time.sleep(random.uniform(0.5, 2))3. 改变探测引擎
更改检测引擎不是存储每一次价格检查,而是将当前价格与最后已知值进行比较,只触发实际变化的事件.
class ChangeDetector:
def __init__(self, redis_client):
self.redis = redis_client
def check_change(self, product_id, new_price):
"""Compare new price against last known and detect changes."""
key = f"last_price:{product_id}"
last_data = self.redis.get(key)
if last_data:
last = json.loads(last_data)
old_price = last["price"]
if old_price and new_price and old_price != new_price:
change_pct = ((new_price - old_price) / old_price) * 100
event = {
"product_id": product_id,
"old_price": old_price,
"new_price": new_price,
"change_pct": round(change_pct, 2),
"timestamp": time.time(),
}
# Publish change event
self.redis.publish("price_changes", json.dumps(event))
return event
# Update last known price
self.redis.set(key, json.dumps({
"price": new_price,
"timestamp": time.time(),
}))
return None4. 活动流
价格变动被公布到一个Redis Pub/Sub频道(或大系统的Kafka主题). 下游消费者——提醒服务,重塑引擎,仪表板——订阅这些事件并独立反应.
import redis
import json
def subscribe_to_changes():
"""Subscribe to price change events."""
r = redis.Redis(host="localhost", port=6379)
pubsub = r.pubsub()
pubsub.subscribe("price_changes")
for message in pubsub.listen():
if message["type"] == "message":
event = json.loads(message["data"])
handle_price_change(event)
def handle_price_change(event):
"""Process a price change event."""
change = event["change_pct"]
product = event["product_id"]
if change < -10:
send_urgent_alert(event) # Major price drop
elif change < -5:
send_alert(event) # Moderate drop
elif change > 10:
send_alert(event) # Significant increase
# Always log to time-series database
store_price_change(event)5. 挂板和警报
实时数据需要实时可视化. 使用WebSocket连接将价格更新立即推向仪表板.
节点.js 执行
实时监控引擎的节点版 代理哈特节点 SDK。 。 。
const axios = require("axios");
const { HttpsProxyAgent } = require("https-proxy-agent");
const Redis = require("ioredis");
const cheerio = require("cheerio");
const PROXY_URL = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080";
const redis = new Redis();
class RealTimePriceMonitor {
constructor(concurrency = 10) {
this.concurrency = concurrency;
this.running = false;
this.agent = new HttpsProxyAgent(PROXY_URL);
}
async fetchPrice(product) {
try {
const { data } = await axios.get(product.url, {
httpsAgent: new HttpsProxyAgent(PROXY_URL),
headers: {
"User-Agent":
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/124.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
},
timeout: 30000,
});
const $ = cheerio.load(data);
const priceText = $("span.a-price-whole").first().text().trim();
const price = parseFloat(priceText.replace(/,/g, "")) || null;
return { productId: product.productId, price, timestamp: Date.now(), status: "success" };
} catch (err) {
return { productId: product.productId, price: null, timestamp: Date.now(), status: "failed" };
}
}
async checkChange(productId, newPrice) {
const key = `last_price:${productId}`;
const lastData = await redis.get(key);
if (lastData) {
const last = JSON.parse(lastData);
if (last.price && newPrice && last.price !== newPrice) {
const changePct = ((newPrice - last.price) / last.price) * 100;
const event = {
productId,
oldPrice: last.price,
newPrice,
changePct: Math.round(changePct * 100) / 100,
timestamp: Date.now(),
};
await redis.publish("price_changes", JSON.stringify(event));
return event;
}
}
await redis.set(key, JSON.stringify({ price: newPrice, timestamp: Date.now() }));
return null;
}
async processProduct(product) {
const result = await this.fetchPrice(product);
if (result.price) {
const change = await this.checkChange(result.productId, result.price);
if (change) {
console.log(
`Price change: ${change.productId} $${change.oldPrice} -> $${change.newPrice} (${change.changePct}%)`
);
}
}
// Random delay
await new Promise((r) => setTimeout(r, 500 + Math.random() * 1500));
}
async start() {
this.running = true;
console.log(`Starting monitor with ${this.concurrency} workers`);
while (this.running) {
const batch = await this.getNextBatch(this.concurrency);
if (batch.length === 0) {
await new Promise((r) => setTimeout(r, 1000));
continue;
}
await Promise.all(batch.map((p) => this.processProduct(p)));
}
}
async getNextBatch(size) {
const now = Date.now() / 1000;
const items = await redis.zrangebyscore("price_queue", 0, now, "LIMIT", 0, size);
const products = [];
for (const item of items) {
const data = JSON.parse(item);
await redis.zadd("price_queue", now + data.interval, item);
products.push(data);
}
return products;
}
}
const monitor = new RealTimePriceMonitor(10);
monitor.start();用于持续监测的代理管理
实时监控对您的代理基础设施提出了独特的要求, 相对于批量的报废。
稳态请求模式
与发送连续请求的批量刮刮不同,实时监测会产生恒流. 这实际上对代理健康来说更好——每秒5-10个请求的稳步流动,在2分钟的爆破中看起来比1000个请求更自然.
实时代理哈特配置
# Per-request rotation (default, recommended for most checks)
http://USERNAME:PASSWORD@gate.proxyhat.com:8080
# Geo-targeted for marketplace-specific monitoring
http://USERNAME-country-US:PASSWORD@gate.proxyhat.com:8080
http://USERNAME-country-DE:PASSWORD@gate.proxyhat.com:8080
# SOCKS5 for lower-level protocol control
socks5://USERNAME:PASSWORD@gate.proxyhat.com:1080IP 健康监测
追踪每个目标地点的成功率,并动态调整您的方法。 如果某一具体市场的成功率下降,就会增加延误或转换到不同的地理目标池。 代理Hat的大型住宅池确保你总是有新鲜的IP可用. 检查我们 代理地址 为全覆盖。
关键外卖:实时监测需要一个稳定、可持续的代理战略。 目标是在许多实施伙伴中持续提出低量请求,而不是少数实施伙伴提出的高量请求。
实时数据存储
实时价格数据需要优化高频插入和时程查询的存储解决方案.
时间尺度DB Schema
-- TimescaleDB hypertable for price data
CREATE TABLE price_ticks (
time TIMESTAMPTZ NOT NULL,
product_id TEXT NOT NULL,
price DECIMAL(10,2),
currency VARCHAR(3) DEFAULT 'USD',
source_url TEXT,
status VARCHAR(20)
);
SELECT create_hypertable('price_ticks', 'time');
-- Continuous aggregate for hourly summaries
CREATE MATERIALIZED VIEW price_hourly
WITH (timescaledb.continuous) AS
SELECT
time_bucket('1 hour', time) AS hour,
product_id,
AVG(price) AS avg_price,
MIN(price) AS min_price,
MAX(price) AS max_price,
COUNT(*) AS check_count
FROM price_ticks
WHERE status = 'success'
GROUP BY hour, product_id;
-- Retention policy: keep raw ticks for 30 days
SELECT add_retention_policy('price_ticks', INTERVAL '30 days');
-- Keep hourly aggregates for 1 year
SELECT add_retention_policy('price_hourly', INTERVAL '365 days');扩大考虑
- 水平工人规模 : 在多台机器上增加工人,每人从相同的雷迪斯队列中抽出. 不需要协调——队列处理分发.
- 优先节制: 当代理预算有限时,会自动减少低优先级产品的检查频率,同时保持关键项目的实时覆盖.
- 适应间隔 : 如果某一产品的价格稳定了24小时,则增加检查间隔。 如果一个小时内改变两次,减少.
- 特定地点的货币: 不同的目标网站具有不同的耐受性. 为Shopify(更宽容)和Amazon(更积极的检测)经营更多的同时工人。
更多关于支持高频监测的代理战略,请探索我们的指南。 网络刮刮的最佳代理 和 代理哈特的定价计划 用于大量使用。
关键外卖
- 实时监测将价格变化的检测从小时降至分钟,对重新评价及竞争反应至关重要.
- 使用优先排队将资源集中到高值产品上,同时仍覆盖长尾.
- 具有并行代理连接的工人池提供不破裂模式的吞吐量。
- 改变检测引擎过滤噪声——只处理和警惕实际价格变化.
- 将原始数据储存在时间序列数据库(TimescaleDB)中,并保留成本管理政策。
- 具有稳定状态旋转的住宅代用品对持续监测至关重要。 开始 代理汉特 为可靠的访问。
建设实时基础设施? 读我们 电子商务刮刮指南 并检查我们的指南 使用 Python 中的代理 和 节点.js 详细执行情况。






