监控代理性能:延迟、成功率和告警

学习如何对代理性能进行仪表、监控和警报——跟踪延迟百分位数、成功率、错误模式和带宽。 Python、Node.js和Go的代码示例.

监控代理性能:延迟、成功率和告警

为什么监视代理性能

代理基础设施无声失败. 你的刮刮机可能会运行数小时,成功率在任何人注意到之前达到40%. 反应时间逐渐上升,块速率上升,数据质量下降——这一切都没有引起明显的错误. 监测将这些看不见的问题变成可采取行动的警报。

本指南向您介绍如何记录您的代理请求, 收集有意义的度量衡, 构建仪表板, 并设置提醒, 捕获退化 之前影响您的数据管道。 所有实例都使用 代理哈特的住宅代理 并做好了生产准备。

如果你不是在测量你的代理性能,你是在猜测。 估计规模需要钱,产生不可靠的数据。

音轨密钥量表

度量衡它告诉你什么警戒阈值
成功率回复2xx状态的申请百分比低于90%
应急反应(p50/p95/p99)代理请求完成的速度p95以上10个
按类型分列的错误率哪些错误占主导地位( 超时, 403, 429, 连接)15%以上的任何单一类型
每秒请求数管道的流量低于预期基线
带宽使用通过代理传输的数据接近计划限额
按目标分列的区块率哪个目标最能挡住你目标超过20%
重试率需要重审的请求数超过30%
会话重复使用效率粘糊糊的会话能活多久5项请求平均数以下

Python: 仪器代理客户端

这个客户端以时间,状态跟踪,以及结构化的日志来包裹每个请求.

import time
import uuid
import logging
import statistics
from dataclasses import dataclass, field
from collections import defaultdict
from typing import Optional
import requests
logger = logging.getLogger("proxy_monitor")
@dataclass
class ProxyMetrics:
    """Collects and aggregates proxy performance metrics."""
    total_requests: int = 0
    successful: int = 0
    failed: int = 0
    retries: int = 0
    latencies: list = field(default_factory=list)
    status_codes: dict = field(default_factory=lambda: defaultdict(int))
    errors_by_type: dict = field(default_factory=lambda: defaultdict(int))
    bytes_transferred: int = 0
    requests_by_target: dict = field(default_factory=lambda: defaultdict(lambda: {"success": 0, "failed": 0}))
    @property
    def success_rate(self) -> float:
        return (self.successful / self.total_requests * 100) if self.total_requests > 0 else 0.0
    @property
    def p50_latency(self) -> float:
        return statistics.median(self.latencies) if self.latencies else 0.0
    @property
    def p95_latency(self) -> float:
        if not self.latencies:
            return 0.0
        sorted_lat = sorted(self.latencies)
        idx = int(len(sorted_lat) * 0.95)
        return sorted_lat[min(idx, len(sorted_lat) - 1)]
    @property
    def p99_latency(self) -> float:
        if not self.latencies:
            return 0.0
        sorted_lat = sorted(self.latencies)
        idx = int(len(sorted_lat) * 0.99)
        return sorted_lat[min(idx, len(sorted_lat) - 1)]
    def summary(self) -> dict:
        return {
            "total_requests": self.total_requests,
            "success_rate": f"{self.success_rate:.1f}%",
            "p50_latency": f"{self.p50_latency:.3f}s",
            "p95_latency": f"{self.p95_latency:.3f}s",
            "p99_latency": f"{self.p99_latency:.3f}s",
            "retries": self.retries,
            "bytes_transferred": self.bytes_transferred,
            "top_errors": dict(sorted(
                self.errors_by_type.items(),
                key=lambda x: x[1], reverse=True
            )[:5]),
            "status_distribution": dict(self.status_codes),
        }
class MonitoredProxyClient:
    """HTTP client with built-in proxy monitoring."""
    def __init__(self, max_retries: int = 3):
        self.metrics = ProxyMetrics()
        self.max_retries = max_retries
        self._alert_callbacks = []
    def on_alert(self, callback):
        """Register a callback for metric alerts."""
        self._alert_callbacks.append(callback)
    def _check_alerts(self):
        if self.metrics.total_requests < 10:
            return
        alerts = []
        if self.metrics.success_rate < 90:
            alerts.append(f"Low success rate: {self.metrics.success_rate:.1f}%")
        if self.metrics.p95_latency > 10:
            alerts.append(f"High p95 latency: {self.metrics.p95_latency:.1f}s")
        if self.metrics.retries / max(self.metrics.total_requests, 1) > 0.3:
            alerts.append(f"High retry rate: {self.metrics.retries}/{self.metrics.total_requests}")
        for alert in alerts:
            logger.warning(f"ALERT: {alert}")
            for cb in self._alert_callbacks:
                cb(alert)
    def fetch(self, url: str, country: Optional[str] = None) -> Optional[requests.Response]:
        from urllib.parse import urlparse
        target_domain = urlparse(url).netloc
        for attempt in range(self.max_retries + 1):
            session_id = uuid.uuid4().hex[:8]
            username = f"USERNAME-session-{session_id}"
            if country:
                username += f"-country-{country}"
            proxy = f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
            self.metrics.total_requests += 1
            if attempt > 0:
                self.metrics.retries += 1
            start = time.time()
            try:
                response = requests.get(
                    url,
                    proxies={"http": proxy, "https": proxy},
                    timeout=30,
                )
                latency = time.time() - start
                self.metrics.latencies.append(latency)
                self.metrics.status_codes[response.status_code] += 1
                if response.status_code >= 400:
                    self.metrics.errors_by_type[f"HTTP_{response.status_code}"] += 1
                    self.metrics.requests_by_target[target_domain]["failed"] += 1
                    if response.status_code in (403, 429, 503) and attempt < self.max_retries:
                        time.sleep(2 ** attempt)
                        continue
                    self.metrics.failed += 1
                else:
                    self.metrics.successful += 1
                    self.metrics.bytes_transferred += len(response.content)
                    self.metrics.requests_by_target[target_domain]["success"] += 1
                self._check_alerts()
                return response
            except requests.exceptions.Timeout:
                self.metrics.errors_by_type["timeout"] += 1
                self.metrics.latencies.append(time.time() - start)
                self.metrics.requests_by_target[target_domain]["failed"] += 1
            except requests.exceptions.ConnectionError:
                self.metrics.errors_by_type["connection_error"] += 1
                self.metrics.latencies.append(time.time() - start)
                self.metrics.requests_by_target[target_domain]["failed"] += 1
            except Exception as e:
                self.metrics.errors_by_type[type(e).__name__] += 1
                self.metrics.latencies.append(time.time() - start)
            if attempt < self.max_retries:
                time.sleep(2 ** attempt)
        self.metrics.failed += 1
        self._check_alerts()
        return None
# Usage
client = MonitoredProxyClient(max_retries=3)
client.on_alert(lambda msg: print(f"[ALERT] {msg}"))
urls = [f"https://example.com/product/{i}" for i in range(100)]
for url in urls:
    response = client.fetch(url)
print(client.metrics.summary())

节点.js:有仪器的代理客户端

const crypto = require('crypto');
const { HttpsProxyAgent } = require('https-proxy-agent');
const { EventEmitter } = require('events');
class ProxyMetrics {
  constructor() {
    this.totalRequests = 0;
    this.successful = 0;
    this.failed = 0;
    this.retries = 0;
    this.latencies = [];
    this.statusCodes = {};
    this.errorsByType = {};
    this.bytesTransferred = 0;
    this.requestsByTarget = {};
  }
  get successRate() {
    return this.totalRequests > 0
      ? ((this.successful / this.totalRequests) * 100).toFixed(1)
      : '0.0';
  }
  percentile(p) {
    if (this.latencies.length === 0) return 0;
    const sorted = [...this.latencies].sort((a, b) => a - b);
    const idx = Math.min(
      Math.floor(sorted.length * (p / 100)),
      sorted.length - 1
    );
    return sorted[idx];
  }
  summary() {
    return {
      totalRequests: this.totalRequests,
      successRate: `${this.successRate}%`,
      p50Latency: `${this.percentile(50).toFixed(3)}s`,
      p95Latency: `${this.percentile(95).toFixed(3)}s`,
      p99Latency: `${this.percentile(99).toFixed(3)}s`,
      retries: this.retries,
      bytesTransferred: this.bytesTransferred,
      statusDistribution: { ...this.statusCodes },
      topErrors: Object.entries(this.errorsByType)
        .sort(([, a], [, b]) => b - a)
        .slice(0, 5)
        .reduce((obj, [k, v]) => ({ ...obj, [k]: v }), {}),
    };
  }
}
class MonitoredProxyClient extends EventEmitter {
  constructor({ maxRetries = 3 } = {}) {
    super();
    this.metrics = new ProxyMetrics();
    this.maxRetries = maxRetries;
  }
  _checkAlerts() {
    if (this.metrics.totalRequests < 10) return;
    if (parseFloat(this.metrics.successRate) < 90) {
      this.emit('alert', `Low success rate: ${this.metrics.successRate}%`);
    }
    if (this.metrics.percentile(95) > 10) {
      this.emit('alert', `High p95 latency: ${this.metrics.percentile(95).toFixed(1)}s`);
    }
  }
  async fetch(url, { country } = {}) {
    const targetDomain = new URL(url).hostname;
    for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
      const sessionId = crypto.randomBytes(4).toString('hex');
      let username = `USERNAME-session-${sessionId}`;
      if (country) username += `-country-${country}`;
      const agent = new HttpsProxyAgent(
        `http://${username}:PASSWORD@gate.proxyhat.com:8080`
      );
      this.metrics.totalRequests++;
      if (attempt > 0) this.metrics.retries++;
      const startTime = Date.now();
      try {
        const response = await fetch(url, {
          agent,
          signal: AbortSignal.timeout(30000),
        });
        const latency = (Date.now() - startTime) / 1000;
        this.metrics.latencies.push(latency);
        this.metrics.statusCodes[response.status] =
          (this.metrics.statusCodes[response.status] || 0) + 1;
        if (response.status >= 400) {
          this.metrics.errorsByType[`HTTP_${response.status}`] =
            (this.metrics.errorsByType[`HTTP_${response.status}`] || 0) + 1;
          if ([403, 429, 503].includes(response.status) && attempt < this.maxRetries) {
            await new Promise(r => setTimeout(r, 1000 * Math.pow(2, attempt)));
            continue;
          }
          this.metrics.failed++;
        } else {
          this.metrics.successful++;
          const body = await response.text();
          this.metrics.bytesTransferred += body.length;
        }
        this._checkAlerts();
        return response;
      } catch (err) {
        const latency = (Date.now() - startTime) / 1000;
        this.metrics.latencies.push(latency);
        this.metrics.errorsByType[err.name] =
          (this.metrics.errorsByType[err.name] || 0) + 1;
        if (attempt < this.maxRetries) {
          await new Promise(r => setTimeout(r, 1000 * Math.pow(2, attempt)));
          continue;
        }
        this.metrics.failed++;
      }
    }
    this._checkAlerts();
    return null;
  }
}
// Usage
const client = new MonitoredProxyClient({ maxRetries: 3 });
client.on('alert', msg => console.warn(`[ALERT] ${msg}`));
const urls = Array.from({ length: 100 }, (_, i) =>
  `https://example.com/product/${i + 1}`
);
for (const url of urls) {
  await client.fetch(url);
}
console.log(client.metrics.summary());

Go: 设备化代理客户端

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"math"
	"net/http"
	"net/url"
	"sort"
	"sync"
	"time"
)
type Metrics struct {
	mu             sync.Mutex
	TotalRequests  int
	Successful     int
	Failed         int
	Retries        int
	Latencies      []float64
	StatusCodes    map[int]int
	ErrorsByType   map[string]int
	BytesTransferred int64
}
func NewMetrics() *Metrics {
	return &Metrics{
		StatusCodes:  make(map[int]int),
		ErrorsByType: make(map[string]int),
	}
}
func (m *Metrics) RecordSuccess(latency float64, status int, bytes int) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.TotalRequests++
	m.Successful++
	m.Latencies = append(m.Latencies, latency)
	m.StatusCodes[status]++
	m.BytesTransferred += int64(bytes)
}
func (m *Metrics) RecordFailure(latency float64, errType string) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.TotalRequests++
	m.Failed++
	m.Latencies = append(m.Latencies, latency)
	m.ErrorsByType[errType]++
}
func (m *Metrics) Percentile(p float64) float64 {
	m.mu.Lock()
	defer m.mu.Unlock()
	if len(m.Latencies) == 0 {
		return 0
	}
	sorted := make([]float64, len(m.Latencies))
	copy(sorted, m.Latencies)
	sort.Float64s(sorted)
	idx := int(math.Min(float64(len(sorted)-1), float64(len(sorted))*p/100))
	return sorted[idx]
}
func (m *Metrics) SuccessRate() float64 {
	m.mu.Lock()
	defer m.mu.Unlock()
	if m.TotalRequests == 0 {
		return 0
	}
	return float64(m.Successful) / float64(m.TotalRequests) * 100
}
func (m *Metrics) Summary() string {
	return fmt.Sprintf(
		"Requests: %d | Success: %.1f%% | p50: %.3fs | p95: %.3fs | p99: %.3fs | Retries: %d",
		m.TotalRequests, m.SuccessRate(),
		m.Percentile(50), m.Percentile(95), m.Percentile(99),
		m.Retries,
	)
}
type MonitoredClient struct {
	metrics    *Metrics
	maxRetries int
}
func NewMonitoredClient(maxRetries int) *MonitoredClient {
	return &MonitoredClient{
		metrics:    NewMetrics(),
		maxRetries: maxRetries,
	}
}
func (c *MonitoredClient) Fetch(target string) (*http.Response, error) {
	for attempt := 0; attempt <= c.maxRetries; attempt++ {
		b := make([]byte, 4)
		rand.Read(b)
		sessionID := hex.EncodeToString(b)
		proxyStr := fmt.Sprintf(
			"http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080",
			sessionID,
		)
		proxyURL, _ := url.Parse(proxyStr)
		client := &http.Client{
			Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
			Timeout:   30 * time.Second,
		}
		if attempt > 0 {
			c.metrics.mu.Lock()
			c.metrics.Retries++
			c.metrics.mu.Unlock()
		}
		start := time.Now()
		resp, err := client.Get(target)
		latency := time.Since(start).Seconds()
		if err != nil {
			c.metrics.RecordFailure(latency, "connection_error")
			if attempt < c.maxRetries {
				time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Second)
				continue
			}
			return nil, err
		}
		body, _ := io.ReadAll(resp.Body)
		resp.Body.Close()
		if resp.StatusCode >= 400 {
			c.metrics.RecordFailure(latency, fmt.Sprintf("HTTP_%d", resp.StatusCode))
			if attempt < c.maxRetries {
				time.Sleep(time.Duration(math.Pow(2, float64(attempt))) * time.Second)
				continue
			}
		} else {
			c.metrics.RecordSuccess(latency, resp.StatusCode, len(body))
		}
		return resp, nil
	}
	return nil, fmt.Errorf("all retries exhausted for %s", target)
}
func main() {
	client := NewMonitoredClient(3)
	for i := 0; i < 50; i++ {
		url := fmt.Sprintf("https://example.com/product/%d", i+1)
		client.Fetch(url)
	}
	fmt.Println(client.metrics.Summary())
}

代理请求的结构化日志

JSON结构的日志便于汇总和分析分布式刮刮机的代理性能.

import json
import logging
import time
import uuid
import requests
class JSONProxyLogger:
    """Logs every proxy request as structured JSON."""
    def __init__(self, log_file: str = "proxy_requests.jsonl"):
        self.logger = logging.getLogger("proxy_json")
        handler = logging.FileHandler(log_file)
        handler.setFormatter(logging.Formatter("%(message)s"))
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    def log_request(self, entry: dict):
        self.logger.info(json.dumps(entry))
    def fetch(self, url: str, country: str = None) -> requests.Response:
        session_id = uuid.uuid4().hex[:8]
        username = f"USERNAME-session-{session_id}"
        if country:
            username += f"-country-{country}"
        proxy = f"http://{username}:PASSWORD@gate.proxyhat.com:8080"
        start = time.time()
        try:
            response = requests.get(
                url,
                proxies={"http": proxy, "https": proxy},
                timeout=30,
            )
            latency = time.time() - start
            self.log_request({
                "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
                "url": url,
                "status": response.status_code,
                "latency_ms": round(latency * 1000),
                "bytes": len(response.content),
                "session_id": session_id,
                "country": country,
                "success": response.status_code < 400,
            })
            return response
        except Exception as e:
            latency = time.time() - start
            self.log_request({
                "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
                "url": url,
                "error": str(e),
                "error_type": type(e).__name__,
                "latency_ms": round(latency * 1000),
                "session_id": session_id,
                "country": country,
                "success": False,
            })
            raise
# Usage — logs produce JSONL like:
# {"timestamp":"2026-02-26T10:30:00Z","url":"https://...","status":200,"latency_ms":1234,...}
proxy_logger = JSONProxyLogger("proxy_requests.jsonl")
response = proxy_logger.fetch("https://example.com/data", country="us")

定期保健报告

对于长期报废的刮刮机,编写定期的健康报告,总结固定窗口上的性能。

import time
import threading
from datetime import datetime
class PeriodicReporter:
    """Generates periodic performance reports from proxy metrics."""
    def __init__(self, metrics: ProxyMetrics, interval_seconds: int = 60):
        self.metrics = metrics
        self.interval = interval_seconds
        self._running = False
        self._thread = None
        self._last_snapshot = None
    def start(self):
        self._running = True
        self._last_snapshot = self._snapshot()
        self._thread = threading.Thread(target=self._report_loop, daemon=True)
        self._thread.start()
    def stop(self):
        self._running = False
    def _snapshot(self) -> dict:
        return {
            "total": self.metrics.total_requests,
            "success": self.metrics.successful,
            "failed": self.metrics.failed,
            "retries": self.metrics.retries,
            "time": time.time(),
        }
    def _report_loop(self):
        while self._running:
            time.sleep(self.interval)
            current = self._snapshot()
            prev = self._last_snapshot
            elapsed = current["time"] - prev["time"]
            requests_delta = current["total"] - prev["total"]
            success_delta = current["success"] - prev["success"]
            failed_delta = current["failed"] - prev["failed"]
            rps = requests_delta / elapsed if elapsed > 0 else 0
            window_success_rate = (
                (success_delta / requests_delta * 100)
                if requests_delta > 0 else 0
            )
            report = {
                "window": f"{self.interval}s",
                "timestamp": datetime.utcnow().isoformat(),
                "requests": requests_delta,
                "rps": round(rps, 1),
                "success_rate": f"{window_success_rate:.1f}%",
                "failed": failed_delta,
                "cumulative_success_rate": f"{self.metrics.success_rate:.1f}%",
                "p95_latency": f"{self.metrics.p95_latency:.3f}s",
            }
            print(f"[REPORT] {report}")
            self._last_snapshot = current
# Usage with MonitoredProxyClient
client = MonitoredProxyClient(max_retries=3)
reporter = PeriodicReporter(client.metrics, interval_seconds=30)
reporter.start()
# Scrape away — reports print every 30 seconds
for url in urls:
    client.fetch(url)
reporter.stop()

警报规则和门槛

设置智能预警,避免在暖热期出现假阳性,瞬间发泡.

警报条件冷却行动
低成功率5分钟的窗口低于90%10分钟调查目标块, 检查代理池
高纬度p95以上 2分钟窗口5分钟减少货币,检查目标健康
错误 Spike单一错误类型超过请求的20%5分钟检查目标是否更改, 旋转地理位置
带宽斯派克转移率比基线增加一倍15分钟校验预期行为, 检查方向循环
零通量2分钟内无成功请求2分钟检查代理连接, 验证证书
良好的监测是连续数月可靠运行的刮刮管道与默默产生垃圾数据的刮刮管道的区别. 投资前置的仪器, 它支付自己 在第一次生产事件 你早期发现。

建一个中间器件供这些度量衡时参考 构建代理软件中层将 " 在监测的同时优化吞吐量 " 改为 以货币控制放大代理请求。关于完整的系统设计,见 设计可靠的碎纸结构。 。 。

探索 Python SDK 键盘, (中文). 节点 SDK,以及 冲啊 SDK 用于代理集成,或检查 代理用户定价文档 开始吧

经常被问到的问题

准备开始了吗?

通过AI过滤访问148多个国家的5000多万个住宅IP。

查看价格住宅代理
← 返回博客