Perché costruire uno strato di middleware proxy
Quando il tuo progetto di demolizione cresce oltre un singolo script, hai bisogno di uno strato di astrazione tra il tuo codice di applicazione e il provider proxy. Uno strato middleware gestisce la selezione del proxy, l'autenticazione, la logica di riprovazione, il failover e la registrazione - in modo che i raschiatori si preoccupino solo di estrarre i dati.
Senza middleware, ogni scraper nella base di codice duplica la configurazione del proxy, la gestione degli errori e la logica di rotazione. Questa guida ti mostra come progettare e costruire un middleware proxy di livello di produzione sia in Python che in Node.js, utilizzando Prossi residenziali rotanti di ProxyHat.
Un middleware proxy si trova tra la vostra applicazione e Internet. Intercetta le richieste in uscita, le indirizza attraverso i proxy, gestisce i guasti e restituisce risposte pulite al vostro codice di applicazione.
Panoramica sull'architettura
Un middleware proxy ben progettato ha quattro componenti principali:
| Componenti | Responsabilità | Perché Matters |
|---|---|---|
| Gestione proxy | Gestione delle credenziali, sessioni e geo-targeting dei proxy | Centralizza la configurazione del proxy |
| Richiesta Interceptor | Avvolgi richieste in uscita con impostazioni proxy e intestazioni | Elimina la caldaia in raschietti |
| Motore di recupero | Retries richieste fallite con backoff e rotazione proxy | Aumenta il tasso di successo dal 70% al 95%+ |
| Raccoglitore di metri | Traccia la latenza, i tassi di successo e i modelli di errore | Consente l'ottimizzazione data-driven |
Python Middleware Attuazione
Core Proxy Manager
import uuid
import time
import logging
from dataclasses import dataclass, field
from typing import Optional
logger = logging.getLogger(__name__)
@dataclass
class ProxyConfig:
gateway: str = "gate.proxyhat.com"
http_port: int = 8080
socks5_port: int = 1080
username: str = "USERNAME"
password: str = "PASSWORD"
@dataclass
class RequestMetrics:
total: int = 0
success: int = 0
failed: int = 0
retries: int = 0
total_latency: float = 0.0
@property
def success_rate(self) -> float:
return (self.success / self.total * 100) if self.total > 0 else 0.0
@property
def avg_latency(self) -> float:
return (self.total_latency / self.success) if self.success > 0 else 0.0
class ProxyManager:
"""Manages proxy sessions, geo-targeting, and protocol selection."""
def __init__(self, config: Optional[ProxyConfig] = None):
self.config = config or ProxyConfig()
self.metrics = RequestMetrics()
def get_proxy_url(
self,
protocol: str = "http",
country: Optional[str] = None,
session_id: Optional[str] = None,
sticky: bool = False,
) -> str:
username = self.config.username
# Add session for sticky IPs
if sticky and not session_id:
session_id = uuid.uuid4().hex[:8]
if session_id:
username = f"{username}-session-{session_id}"
# Add country targeting
if country:
username = f"{username}-country-{country}"
if protocol == "socks5":
port = self.config.socks5_port
scheme = "socks5h"
else:
port = self.config.http_port
scheme = "http"
return f"{scheme}://{username}:{self.config.password}@{self.config.gateway}:{port}"
def get_requests_proxies(self, **kwargs) -> dict:
"""Return a proxies dict compatible with the requests library."""
url = self.get_proxy_url(**kwargs)
return {"http": url, "https": url}
Retry Engine con Backoff
import requests
from requests.exceptions import RequestException
RETRYABLE_STATUS_CODES = {403, 429, 500, 502, 503, 504}
class RetryEngine:
"""Retries failed requests with exponential backoff and proxy rotation."""
def __init__(
self,
proxy_manager: ProxyManager,
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 30.0,
rotate_on_failure: bool = True,
):
self.proxy_manager = proxy_manager
self.max_retries = max_retries
self.base_delay = base_delay
self.max_delay = max_delay
self.rotate_on_failure = rotate_on_failure
def _should_retry(self, response: Optional[requests.Response], error: Optional[Exception]) -> bool:
if error:
return True
if response and response.status_code in RETRYABLE_STATUS_CODES:
return True
return False
def _get_delay(self, attempt: int) -> float:
delay = self.base_delay * (2 ** attempt)
return min(delay, self.max_delay)
def execute(
self,
method: str,
url: str,
country: Optional[str] = None,
sticky: bool = False,
**request_kwargs,
) -> requests.Response:
session_id = uuid.uuid4().hex[:8] if sticky else None
last_error = None
for attempt in range(self.max_retries + 1):
# Rotate proxy on retry if enabled
if attempt > 0 and self.rotate_on_failure:
session_id = uuid.uuid4().hex[:8]
self.proxy_manager.metrics.retries += 1
proxies = self.proxy_manager.get_requests_proxies(
country=country, session_id=session_id
)
self.proxy_manager.metrics.total += 1
start_time = time.time()
try:
response = requests.request(
method, url,
proxies=proxies,
timeout=request_kwargs.pop("timeout", 30),
**request_kwargs,
)
latency = time.time() - start_time
self.proxy_manager.metrics.total_latency += latency
if self._should_retry(response, None) and attempt < self.max_retries:
logger.warning(
f"Retryable status {response.status_code} for {url} "
f"(attempt {attempt + 1}/{self.max_retries + 1})"
)
time.sleep(self._get_delay(attempt))
continue
self.proxy_manager.metrics.success += 1
return response
except RequestException as e:
last_error = e
latency = time.time() - start_time
logger.error(f"Request failed for {url}: {e} (attempt {attempt + 1})")
if attempt < self.max_retries:
time.sleep(self._get_delay(attempt))
continue
self.proxy_manager.metrics.failed += 1
raise last_error or Exception(f"All retries exhausted for {url}")
Classe completa di Middleware
class ProxyMiddleware:
"""High-level middleware that wraps all proxy logic."""
def __init__(
self,
config: Optional[ProxyConfig] = None,
max_retries: int = 3,
default_country: Optional[str] = None,
):
self.proxy_manager = ProxyManager(config)
self.retry_engine = RetryEngine(
proxy_manager=self.proxy_manager,
max_retries=max_retries,
)
self.default_country = default_country
self.default_headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.9",
"Accept-Encoding": "gzip, deflate, br",
}
def get(self, url: str, **kwargs) -> requests.Response:
kwargs.setdefault("headers", self.default_headers)
kwargs.setdefault("country", self.default_country)
return self.retry_engine.execute("GET", url, **kwargs)
def post(self, url: str, **kwargs) -> requests.Response:
kwargs.setdefault("headers", self.default_headers)
kwargs.setdefault("country", self.default_country)
return self.retry_engine.execute("POST", url, **kwargs)
@property
def stats(self) -> dict:
m = self.proxy_manager.metrics
return {
"total_requests": m.total,
"successful": m.success,
"failed": m.failed,
"retries": m.retries,
"success_rate": f"{m.success_rate:.1f}%",
"avg_latency": f"{m.avg_latency:.3f}s",
}
# Usage
middleware = ProxyMiddleware(max_retries=3, default_country="us")
# Simple GET — all proxy logic is hidden
response = middleware.get("https://httpbin.org/ip")
print(response.json())
# Geo-targeted request
response = middleware.get("https://example.com/pricing", country="de")
print(response.text[:200])
# Check performance
print(middleware.stats)
Node.js Middleware Attuazione
Core Proxy Manager
const crypto = require('crypto');
const { HttpsProxyAgent } = require('https-proxy-agent');
const { SocksProxyAgent } = require('socks-proxy-agent');
class ProxyManager {
constructor(config = {}) {
this.gateway = config.gateway || 'gate.proxyhat.com';
this.httpPort = config.httpPort || 8080;
this.socks5Port = config.socks5Port || 1080;
this.username = config.username || 'USERNAME';
this.password = config.password || 'PASSWORD';
this.metrics = {
total: 0,
success: 0,
failed: 0,
retries: 0,
totalLatency: 0,
};
}
getProxyUrl({ protocol = 'http', country, sessionId, sticky = false } = {}) {
let username = this.username;
if (sticky && !sessionId) {
sessionId = crypto.randomBytes(4).toString('hex');
}
if (sessionId) username += `-session-${sessionId}`;
if (country) username += `-country-${country}`;
const port = protocol === 'socks5' ? this.socks5Port : this.httpPort;
const scheme = protocol === 'socks5' ? 'socks5h' : 'http';
return `${scheme}://${username}:${this.password}@${this.gateway}:${port}`;
}
createAgent(options = {}) {
const url = this.getProxyUrl(options);
if (options.protocol === 'socks5') {
return new SocksProxyAgent(url);
}
return new HttpsProxyAgent(url);
}
get stats() {
const m = this.metrics;
return {
total: m.total,
success: m.success,
failed: m.failed,
retries: m.retries,
successRate: m.total > 0 ? ((m.success / m.total) * 100).toFixed(1) + '%' : '0%',
avgLatency: m.success > 0 ? (m.totalLatency / m.success).toFixed(3) + 's' : '0s',
};
}
}
Motore di recupero
const RETRYABLE_STATUS_CODES = new Set([403, 429, 500, 502, 503, 504]);
class RetryEngine {
constructor(proxyManager, options = {}) {
this.proxyManager = proxyManager;
this.maxRetries = options.maxRetries || 3;
this.baseDelay = options.baseDelay || 1000;
this.maxDelay = options.maxDelay || 30000;
this.rotateOnFailure = options.rotateOnFailure !== false;
}
_getDelay(attempt) {
return Math.min(this.baseDelay * Math.pow(2, attempt), this.maxDelay);
}
_sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
async execute(url, options = {}) {
const { country, sticky, ...fetchOptions } = options;
let sessionId = sticky ? crypto.randomBytes(4).toString('hex') : undefined;
let lastError;
for (let attempt = 0; attempt <= this.maxRetries; attempt++) {
if (attempt > 0 && this.rotateOnFailure) {
sessionId = crypto.randomBytes(4).toString('hex');
this.proxyManager.metrics.retries++;
}
const agent = this.proxyManager.createAgent({ country, sessionId });
this.proxyManager.metrics.total++;
const startTime = Date.now();
try {
const response = await fetch(url, {
...fetchOptions,
agent,
signal: AbortSignal.timeout(fetchOptions.timeout || 30000),
});
const latency = (Date.now() - startTime) / 1000;
this.proxyManager.metrics.totalLatency += latency;
if (RETRYABLE_STATUS_CODES.has(response.status) && attempt < this.maxRetries) {
console.warn(
`Retryable status ${response.status} for ${url} ` +
`(attempt ${attempt + 1}/${this.maxRetries + 1})`
);
await this._sleep(this._getDelay(attempt));
continue;
}
this.proxyManager.metrics.success++;
return response;
} catch (err) {
lastError = err;
console.error(`Request failed for ${url}: ${err.message} (attempt ${attempt + 1})`);
if (attempt < this.maxRetries) {
await this._sleep(this._getDelay(attempt));
continue;
}
this.proxyManager.metrics.failed++;
}
}
throw lastError || new Error(`All retries exhausted for ${url}`);
}
}
Classe completa di Middleware
class ProxyMiddleware {
constructor(options = {}) {
this.proxyManager = new ProxyManager(options.proxy);
this.retryEngine = new RetryEngine(this.proxyManager, {
maxRetries: options.maxRetries || 3,
});
this.defaultCountry = options.defaultCountry || null;
this.defaultHeaders = {
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.9',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
};
}
async get(url, options = {}) {
return this.retryEngine.execute(url, {
method: 'GET',
headers: { ...this.defaultHeaders, ...options.headers },
country: options.country || this.defaultCountry,
...options,
});
}
async post(url, body, options = {}) {
return this.retryEngine.execute(url, {
method: 'POST',
headers: { ...this.defaultHeaders, ...options.headers },
body: typeof body === 'string' ? body : JSON.stringify(body),
country: options.country || this.defaultCountry,
...options,
});
}
get stats() {
return this.proxyManager.stats;
}
}
// Usage
const middleware = new ProxyMiddleware({
maxRetries: 3,
defaultCountry: 'us',
});
// Simple GET
const response = await middleware.get('https://httpbin.org/ip');
const data = await response.json();
console.log(data);
// Check stats
console.log(middleware.stats);
Aggiungi Provider Failover
Per la demolizione mission-critical, è possibile estendere il middleware per supportare più provider proxy con failover automatico.
class MultiProviderMiddleware:
"""Falls back to secondary proxy providers on failure."""
def __init__(self, providers: list[ProxyConfig], max_retries: int = 3):
self.providers = [
ProxyMiddleware(config=cfg, max_retries=max_retries)
for cfg in providers
]
def get(self, url: str, **kwargs) -> requests.Response:
last_error = None
for i, provider in enumerate(self.providers):
try:
response = provider.get(url, **kwargs)
if response.status_code < 400:
return response
except Exception as e:
last_error = e
logger.warning(f"Provider {i} failed for {url}: {e}")
continue
raise last_error or Exception(f"All providers failed for {url}")
# Primary: ProxyHat | Fallback: backup provider
providers = [
ProxyConfig(
gateway="gate.proxyhat.com",
username="USERNAME",
password="PASSWORD",
),
ProxyConfig(
gateway="backup-proxy.example.com",
username="BACKUP_USER",
password="BACKUP_PASS",
),
]
middleware = MultiProviderMiddleware(providers)
response = middleware.get("https://example.com/data")
Async Middleware (Python)
Per la raschiatura ad alto rendimento, utilizzare la versione asincrona con aiohttp. Per più modelli di convalutazione, vedere la nostra guida su richiesta del proxy di scaling con controllo di convalutazione.
import aiohttp
import asyncio
class AsyncProxyMiddleware:
def __init__(self, config: Optional[ProxyConfig] = None, max_retries: int = 3):
self.proxy_manager = ProxyManager(config)
self.max_retries = max_retries
async def get(self, url: str, country: Optional[str] = None) -> str:
for attempt in range(self.max_retries + 1):
session_id = uuid.uuid4().hex[:8]
proxy_url = self.proxy_manager.get_proxy_url(
country=country, session_id=session_id
)
try:
async with aiohttp.ClientSession() as session:
async with session.get(
url, proxy=proxy_url, timeout=aiohttp.ClientTimeout(total=30)
) as response:
if response.status in RETRYABLE_STATUS_CODES and attempt < self.max_retries:
await asyncio.sleep(2 ** attempt)
continue
self.proxy_manager.metrics.success += 1
return await response.text()
except Exception as e:
if attempt == self.max_retries:
self.proxy_manager.metrics.failed += 1
raise
await asyncio.sleep(2 ** attempt)
# Usage
async def main():
middleware = AsyncProxyMiddleware(max_retries=3)
urls = [f"https://example.com/page/{i}" for i in range(100)]
tasks = [middleware.get(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = sum(1 for r in results if not isinstance(r, Exception))
print(f"Success: {success}/{len(results)}")
print(middleware.proxy_manager.metrics)
asyncio.run(main())
Integrazione con i Quadri di Scraping
Il middleware si integra perfettamente con gli schemi di raschiatura comuni. Ecco come usarlo con Playwright e con lo standard Richieste di Python.
# With Playwright (Node.js)
const { chromium } = require('playwright');
class PlaywrightProxyMiddleware {
constructor(proxyManager) {
this.proxyManager = proxyManager;
this.browser = null;
}
async init() {
this.browser = await chromium.launch();
}
async scrape(url, options = {}) {
const proxyUrl = this.proxyManager.getProxyUrl({
sessionId: crypto.randomBytes(4).toString('hex'),
country: options.country,
});
const parsed = new URL(proxyUrl);
const context = await this.browser.newContext({
proxy: {
server: `${parsed.protocol}//${parsed.hostname}:${parsed.port}`,
username: decodeURIComponent(parsed.username),
password: decodeURIComponent(parsed.password),
},
});
const page = await context.newPage();
try {
await page.goto(url, { timeout: 30000 });
return await page.content();
} finally {
await context.close();
}
}
async close() {
if (this.browser) await this.browser.close();
}
}
Per i modelli di proxy specifici di Playwright, vedere il nostro Guida di rotazione del proxy Playwright. Per il monitoraggio delle metriche il tuo middleware raccoglie, vedi Monitoraggio delle prestazioni del proxy.
Esplorare la Python SDK♪ Node SDKo Vai SDK per funzioni middleware integrate e controllo Prezzo di ProxyHat per iniziare. Per la documentazione completa di integrazione, visita Docs.proxyhat.com.






