为什么监视代理性能
代理基础设施无声失败. 你的刮刮机可能会运行数小时,成功率在任何人注意到之前达到40%. 反应时间逐渐上升,块速率上升,数据质量下降——这一切都没有引起明显的错误. 监测将这些看不见的问题变成可采取行动的警报。
本指南向您介绍如何记录您的代理请求, 收集有意义的度量衡, 构建仪表板, 并设置提醒, 捕获退化 之前影响您的数据管道。 所有实例都使用 代理哈特的住宅代理 并做好了生产准备。
如果你不是在测量你的代理性能,你是在猜测。 估计规模需要钱,产生不可靠的数据。
音轨密钥量表
| 度量衡 | 它告诉你什么 | 警戒阈值 |
|---|---|---|
| 成功率 | 回复2xx状态的申请百分比 | 低于90% |
| 应急反应(p50/p95/p99) | 代理请求完成的速度 | p95以上10个 |
| 按类型分列的错误率 | 哪些错误占主导地位( 超时, 403, 429, 连接) | 15%以上的任何单一类型 |
| 每秒请求数 | 管道的流量 | 低于预期基线 |
| 带宽使用 | 通过代理传输的数据 | 接近计划限额 |
| 按目标分列的区块率 | 哪个目标最能挡住你 | 目标超过20% |
| 重试率 | 需要重审的请求数 | 超过30% |
| 会话重复使用效率 | 粘糊糊的会话能活多久 | 5项请求平均数以下 |
Python: 仪器代理客户端
这个客户端以时间,状态跟踪,以及结构化的日志来包裹每个请求.
import time
import uuid
import logging
import statistics
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Optional
import requests
logger = logging.getLogger("proxy_monitor")
@dataclass
class ProxyMetrics:
"""Collects and aggregates proxy performance metrics."""
total_requests: int = 0
successful: int = 0
failed: int = 0
retries: int = 0
latencies: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
errors_by_type: dict = field(default_factory=lambda: defaultdict(int))
bytes_transferred: int = 0
requests_by_target: dict = field(default_factory=lambda: defaultdict(lambda: {"success": 0, "failed": 0}))
@property
def success_rate(self) -> float:
return (self.successful / self.total_requests * 100) if self.total_requests > 0 else 0.0
@property
def p50_latency(self) -> float:
return statistics.median(self.latencies) if self.latencies else 0.0
@property
def p95_latency(self) -> float:
if not self.latencies:
return 0.0
sorted_lat = sorted(self.latencies)
idx = int(len(sorted_lat) * 0.95)
return sorted_lat[min(idx, len(sorted_lat) - 1)]
@property
def p99_latency(self) -> float:
if not self.latencies:
return 0.0
sorted_lat = sorted(self.latencies)
idx = int(len(sorted_lat) * 0.99)
return sorted_lat[min(idx, len(sorted_lat) - 1)]
def summary(self) -> dict:
return {
"total_requests": self.total_requests,
"success_rate": f"{self.success_rate:.1f}%",
"p50_latency": f"{self.p50_latency:.3f}s",
"p95_latency": f"{self.p95_latency:.3f}s",
"p99_latency": f"{self.p99_latency:.3f}s",
"retries": self.retries,
"bytes_transferred": self.bytes_transferred,
"top_errors": dict(sorted(
self.errors_by_type.items(),
key=lambda x: x[1], reverse=True
)[:5]),
"status_distribution": dict(self.status_codes),
}
class MonitoredProxyClient:
"""HTTP client with built-in proxy monitoring."""
def __init__(self, max_retries: int = 3):
self.metrics = ProxyMetrics()
self.max_retries = max_retries
self._alert_callbacks = []
def on_alert(self, callback):
"""Register a callback for metric alerts."""
self._alert_callbacks.append(callback)
def _check_alerts(self):
if self.metrics.total_requests < 10:
return
alerts = []
if self.metrics.success_rate < 90:
alerts.append(f"Low success rate: {self.metrics.success_rate:.1f}%")
if self.metrics.p95_latency > 10:
alerts.append(f"High p95 latency: {self.metrics.p95_latency:.1f}s")
if self.metrics.retries / max(self.metrics.total_requests, 1) > 0.3:
alerts.append(f"High retry rate: {self.metrics.retries}/{self.metrics.total_requests}")
for alert in alerts:
logger.warning(f"ALERT: {alert}")
for cb in self._alert_callbacks:
cb(alert)
def fetch(self, url: str, country: Optional[str] = None) -> Optional[requests.Response]:
from urllib.parse import urlparse
target_domain = urlparse(url).netloc
for attempt in range(self.max_retries + 1):
session_id = uuid.uuid4().hex[:8]
username = f"USERNAME-session-{session_id}"
if country:
username += f"-country-{country}"
proxy = f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
self.metrics.total_requests += 1
if attempt > 0:
self.metrics.retries += 1
start = time.time()
try:
response = requests.get(
url,
proxies={"http": proxy, "https": proxy},
timeout=30,
)
latency = time.time() - start
self.metrics.latencies.append(latency)
self.metrics.status_codes[response.status_code] += 1
if response.status_code >= 400:
self.metrics.errors_by_type[f"HTTP_{response.status_code}"] += 1
self.metrics.requests_by_target[target_domain]["failed"] += 1
if response.status_code in (403, 429, 503) and attempt < self.max_retries:
time.sleep(2 ** attempt)
continue
self.metrics.failed += 1
else:
self.metrics.successful += 1
self.metrics.bytes_transferred += len(response.content)
self.metrics.requests_by_target[target_domain]["success"] += 1
self._check_alerts()
return response
except requests.exceptions.Timeout:
self.metrics.errors_by_type["timeout"] += 1
self.metrics.latencies.append(time.time() - start)
self.metrics.requests_by_target[target_domain]["failed"] += 1
except requests.exceptions.ConnectionError:
self.metrics.errors_by_type["connection_error"] += 1
self.metrics.latencies.append(time.time() - start)
self.metrics.requests_by_target[target_domain]["failed"] += 1
except Exception as e:
self.metrics.errors_by_type[type(e).__name__] += 1
self.metrics.latencies.append(time.time() - start)
if attempt < self.max_retries:
time.sleep(2 ** attempt)
self.metrics.failed += 1
self._check_alerts()
return None
# Usage
client = MonitoredProxyClient(max_retries=3)
client.on_alert(lambda msg: print(f"[ALERT] {msg}"))
urls = [f"https://example.com/product/{i}" for i in range(100)]
for url in urls:
response = client.fetch(url)
print(client.metrics.summary())
节点.js:有仪器的代理客户端
const crypto = require('crypto');
const { HttpsProxyAgent } = require('https-proxy-agent');
const { EventEmitter } = require('events');
class ProxyMetrics {
constructor() {
this.totalRequests = 0;
this.successful = 0;
this.failed = 0;
this.retries = 0;
this.latencies = [];
this.statusCodes = {};
this.errorsByType = {};
this.bytesTransferred = 0;
this.requestsByTarget = {};
}
get successRate() {
return this.totalRequests > 0
? ((this.successful / this.totalRequests) * 100).toFixed(1)
: '0.0';
}
percentile(p) {
if (this.latencies.length === 0) return 0;
const sorted = [...this.latencies].sort((a, b) => a - b);
const idx = Math.min(
Math.floor(sorted.length * (p / 100)),
sorted.length - 1
);
return sorted[idx];
}
summary() {
return {
totalRequests: this.totalRequests,
successRate: `${this.successRate}%`,
p50Latency: `${this.percentile(50).toFixed(3)}s`,
p95Latency: `${this.percentile(95).toFixed(3)}s`,
p99Latency: `${this.percentile(99).toFixed(3)}s`,
retries: this.retries,
bytesTransferred: this.bytesTransferred,
statusDistribution: { ...this.statusCodes },
topErrors: Object.entries(this.errorsByType)
.sort(([, a], [, b]) => b - a)
.slice(0, 5)
.reduce((obj, [k, v]) => ({ ...obj, [k]: v }), {}),
};
}
}
class MonitoredProxyClient extends EventEmitter {
constructor({ maxRetries = 3 } = {}) {
super();
this.metrics = new ProxyMetrics();
this.maxRetries = maxRetries;
}
_checkAlerts() {
if (this.metrics.totalRequests < 10) return;
if (parseFloat(this.metrics.successRate) < 90) {
this.emit('alert', `Low success rate: ${this.metrics.successRate}%`);
}
if (this.metrics.percentile(95) > 10) {
this.emit('alert', `High p95 latency: ${this.metrics.percentile(95).toFixed(1)}s`);
}
}
async fetch(url, { country } = {}) {
const targetDomain = new URL(url).hostname;
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
const sessionId = crypto.randomBytes(4).toString('hex');
let username = `USERNAME-session-${sessionId}`;
if (country) username += `-country-${country}`;
const agent = new HttpsProxyAgent(
`http://${username}:PASSWORD@gate.proxyhat.com:8080`
);
this.metrics.totalRequests++;
if (attempt > 0) this.metrics.retries++;
const startTime = Date.now();
try {
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
const latency = (Date.now() - startTime) / 1000;
this.metrics.latencies.push(latency);
this.metrics.statusCodes[response.status] =
(this.metrics.statusCodes[response.status] || 0) + 1;
if (response.status >= 400) {
this.metrics.errorsByType[`HTTP_${response.status}`] =
(this.metrics.errorsByType[`HTTP_${response.status}`] || 0) + 1;
if ([403, 429, 503].includes(response.status) && attempt < this.maxRetries) {
await new Promise(r => setTimeout(r, 1000 * Math.pow(2, attempt)));
continue;
}
this.metrics.failed++;
} else {
this.metrics.successful++;
const body = await response.text();
this.metrics.bytesTransferred += body.length;
}
this._checkAlerts();
return response;
} catch (err) {
const latency = (Date.now() - startTime) / 1000;
this.metrics.latencies.push(latency);
this.metrics.errorsByType[err.name] =
(this.metrics.errorsByType[err.name] || 0) + 1;
if (attempt < this.maxRetries) {
await new Promise(r => setTimeout(r, 1000 * Math.pow(2, attempt)));
continue;
}
this.metrics.failed++;
}
}
this._checkAlerts();
return null;
}
}
// Usage
const client = new MonitoredProxyClient({ maxRetries: 3 });
client.on('alert', msg => console.warn(`[ALERT] ${msg}`));
const urls = Array.from({ length: 100 }, (_, i) =>
`https://example.com/product/${i + 1}`
);
for (const url of urls) {
await client.fetch(url);
}
console.log(client.metrics.summary());
Go: 设备化代理客户端
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"math"
"net/http"
"net/url"
"sort"
"sync"
"time"
)
type Metrics struct {
mu sync.Mutex
TotalRequests int
Successful int
Failed int
Retries int
Latencies []float64
StatusCodes map[int]int
ErrorsByType map[string]int
BytesTransferred int64
}
func NewMetrics() *Metrics {
return &Metrics{
StatusCodes: make(map[int]int),
ErrorsByType: make(map[string]int),
}
}
func (m *Metrics) RecordSuccess(latency float64, status int, bytes int) {
m.mu.Lock()
defer m.mu.Unlock()
m.TotalRequests++
m.Successful++
m.Latencies = append(m.Latencies, latency)
m.StatusCodes[status]++
m.BytesTransferred += int64(bytes)
}
func (m *Metrics) RecordFailure(latency float64, errType string) {
m.mu.Lock()
defer m.mu.Unlock()
m.TotalRequests++
m.Failed++
m.Latencies = append(m.Latencies, latency)
m.ErrorsByType[errType]++
}
func (m *Metrics) Percentile(p float64) float64 {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.Latencies) == 0 {
return 0
}
sorted := make([]float64, len(m.Latencies))
copy(sorted, m.Latencies)
sort.Float64s(sorted)
idx := int(math.Min(float64(len(sorted)-1), float64(len(sorted))*p/100))
return sorted[idx]
}
func (m *Metrics) SuccessRate() float64 {
m.mu.Lock()
defer m.mu.Unlock()
if m.TotalRequests == 0 {
return 0
}
return float64(m.Successful) / float64(m.TotalRequests) * 100
}
func (m *Metrics) Summary() string {
return fmt.Sprintf(
"Requests: %d | Success: %.1f%% | p50: %.3fs | p95: %.3fs | p99: %.3fs | Retries: %d",
m.TotalRequests, m.SuccessRate(),
m.Percentile(50), m.Percentile(95), m.Percentile(99),
m.Retries,
)
}
type MonitoredClient struct {
metrics *Metrics
maxRetries int
}
func NewMonitoredClient(maxRetries int) *MonitoredClient {
return &MonitoredClient{
metrics: NewMetrics(),
maxRetries: maxRetries,
}
}
func (c *MonitoredClient) Fetch(target string) (*http.Response, error) {
for attempt := 0; attempt <= c.maxRetries; attempt++ {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf(
"http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080",
sessionID,
)
proxyURL, _ := url.Parse(proxyStr)
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
if attempt > 0 {
c.metrics.mu.Lock()
c.metrics.Retries++
c.metrics.mu.Unlock()
}
start := time.Now()
resp, err := client.Get(target)
latency := time.Since(start).Seconds()
if err != nil {
c.metrics.RecordFailure(latency, "connection_error")
if attempt < c.maxRetries {
time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Second)
continue
}
return nil, err
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
if resp.StatusCode >= 400 {
c.metrics.RecordFailure(latency, fmt.Sprintf("HTTP_%d", resp.StatusCode))
if attempt < c.maxRetries {
time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Second)
continue
}
} else {
c.metrics.RecordSuccess(latency, resp.StatusCode, len(body))
}
return resp, nil
}
return nil, fmt.Errorf("all retries exhausted for %s", target)
}
func main() {
client := NewMonitoredClient(3)
for i := 0; i < 50; i++ {
url := fmt.Sprintf("https://example.com/product/%d", i+1)
client.Fetch(url)
}
fmt.Println(client.metrics.Summary())
}
代理请求的结构化日志
JSON结构的日志便于汇总和分析分布式刮刮机的代理性能.
import json
import logging
import time
import uuid
import requests
class JSONProxyLogger:
"""Logs every proxy request as structured JSON."""
def __init__(self, log_file: str = "proxy_requests.jsonl"):
self.logger = logging.getLogger("proxy_json")
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_request(self, entry: dict):
self.logger.info(json.dumps(entry))
def fetch(self, url: str, country: str = None) -> requests.Response:
session_id = uuid.uuid4().hex[:8]
username = f"USERNAME-session-{session_id}"
if country:
username += f"-country-{country}"
proxy = f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
start = time.time()
try:
response = requests.get(
url,
proxies={"http": proxy, "https": proxy},
timeout=30,
)
latency = time.time() - start
self.log_request({
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"url": url,
"status": response.status_code,
"latency_ms": round(latency * 1000),
"bytes": len(response.content),
"session_id": session_id,
"country": country,
"success": response.status_code < 400,
})
return response
except Exception as e:
latency = time.time() - start
self.log_request({
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"url": url,
"error": str(e),
"error_type": type(e).__name__,
"latency_ms": round(latency * 1000),
"session_id": session_id,
"country": country,
"success": False,
})
raise
# Usage — logs produce JSONL like:
# {"timestamp":"2026-02-26T10:30:00Z","url":"https://...","status":200,"latency_ms":1234,...}
proxy_logger = JSONProxyLogger("proxy_requests.jsonl")
response = proxy_logger.fetch("https://example.com/data", country="us")
定期保健报告
对于长期报废的刮刮机,编写定期的健康报告,总结固定窗口上的性能。
import time
import threading
from datetime import datetime
class PeriodicReporter:
"""Generates periodic performance reports from proxy metrics."""
def __init__(self, metrics: ProxyMetrics, interval_seconds: int = 60):
self.metrics = metrics
self.interval = interval_seconds
self._running = False
self._thread = None
self._last_snapshot = None
def start(self):
self._running = True
self._last_snapshot = self._snapshot()
self._thread = threading.Thread(target=self._report_loop, daemon=True)
self._thread.start()
def stop(self):
self._running = False
def _snapshot(self) -> dict:
return {
"total": self.metrics.total_requests,
"success": self.metrics.successful,
"failed": self.metrics.failed,
"retries": self.metrics.retries,
"time": time.time(),
}
def _report_loop(self):
while self._running:
time.sleep(self.interval)
current = self._snapshot()
prev = self._last_snapshot
elapsed = current["time"] - prev["time"]
requests_delta = current["total"] - prev["total"]
success_delta = current["success"] - prev["success"]
failed_delta = current["failed"] - prev["failed"]
rps = requests_delta / elapsed if elapsed > 0 else 0
window_success_rate = (
(success_delta / requests_delta * 100)
if requests_delta > 0 else 0
)
report = {
"window": f"{self.interval}s",
"timestamp": datetime.utcnow().isoformat(),
"requests": requests_delta,
"rps": round(rps, 1),
"success_rate": f"{window_success_rate:.1f}%",
"failed": failed_delta,
"cumulative_success_rate": f"{self.metrics.success_rate:.1f}%",
"p95_latency": f"{self.metrics.p95_latency:.3f}s",
}
print(f"[REPORT] {report}")
self._last_snapshot = current
# Usage with MonitoredProxyClient
client = MonitoredProxyClient(max_retries=3)
reporter = PeriodicReporter(client.metrics, interval_seconds=30)
reporter.start()
# Scrape away — reports print every 30 seconds
for url in urls:
client.fetch(url)
reporter.stop()
警报规则和门槛
设置智能预警,避免在暖热期出现假阳性,瞬间发泡.
| 警报 | 条件 | 冷却 | 行动 |
|---|---|---|---|
| 低成功率 | 5分钟的窗口低于90% | 10分钟 | 调查目标块, 检查代理池 |
| 高纬度 | p95以上 2分钟窗口 | 5分钟 | 减少货币,检查目标健康 |
| 错误 Spike | 单一错误类型超过请求的20% | 5分钟 | 检查目标是否更改, 旋转地理位置 |
| 带宽斯派克 | 转移率比基线增加一倍 | 15分钟 | 校验预期行为, 检查方向循环 |
| 零通量 | 2分钟内无成功请求 | 2分钟 | 检查代理连接, 验证证书 |
良好的监测是连续数月可靠运行的刮刮管道与默默产生垃圾数据的刮刮管道的区别. 投资前置的仪器, 它支付自己 在第一次生产事件 你早期发现。
建一个中间器件供这些度量衡时参考 构建代理软件中层将 " 在监测的同时优化吞吐量 " 改为 以货币控制放大代理请求。关于完整的系统设计,见 设计可靠的碎纸结构。 。 。
探索 Python SDK 键盘, (中文). 节点 SDK,以及 冲啊 SDK 用于代理集成,或检查 代理用户定价 和 文档 开始吧






