Perché regolatore di frequenza Matters per il filtraggio del proxy
Inviare richieste sequenziali attraverso un proxy spreca larghezza di banda e tempo. Inviarli tutti in una volta sopraffatti il gateway proxy, il server di destinazione e il proprio sistema. Il controllo della concorrenza colpisce l'equilibrio — massimizzando il throughput rimanendo nei limiti del vostro pool proxy, la tolleranza del sito di destinazione e le risorse disponibili.
Questa guida copre modelli di convalutazione di produzione in tre lingue: Python (asyncio), Node.js (piscine Promise), e Go (goroutine con semaphores). Ogni esempio utilizza Prossi residenziali rotanti di ProxyHat ed è pronto a copiare nei vostri progetti.
L'obiettivo del controllo di convalutazione è semplice: massimizzare le richieste al secondo senza attivare blocchi, memoria estenuante, o crash del processo. Il modello giusto dipende dalla lingua, dal sito di destinazione e dalla scala.
Modelli di concorrenza confrontati
| Modello | Lingua | Migliore per | Concorrenza massima |
|---|---|---|---|
| Asyncio. Semaforo | Python | Raschiatura I/O | 50-200 per processo |
| Vasca da lavoro (asincrona) | Python | Coda delle attività con backpressure | 10100 lavoratori |
| Promise.all + lotto | Node.js | Semplice incisione parallela | 50-500 per processo |
| p-limit / p-queue | Node.js | Convalutazione | 10-200 per coda |
| Goroutines + Semaphore | Vai. | Raschiatura ad alta produttività | 100-1000+ |
| Vasca da lavoro (Vai canali) | Vai. | Distribuzione delle attività strutturate | 10.500 lavoratori |
Python: asyncio Semaphore
Il modello di convalutazione più semplice ed efficace in Python. Un semaforo limita quanti coroutini possono eseguire contemporaneamente, impedendo esaurimento delle risorse.
import asyncio
import aiohttp
import uuid
import time
PROXY_GATEWAY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
MAX_CONCURRENCY = 50
TIMEOUT = aiohttp.ClientTimeout(total=30)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
start = time.time()
try:
async with session.get(url, proxy=proxy, timeout=TIMEOUT) as response:
body = await response.text()
return {
"url": url,
"status": response.status,
"length": len(body),
"latency": round(time.time() - start, 3),
}
except Exception as e:
return {"url": url, "error": str(e), "latency": round(time.time() - start, 3)}
async def scrape_all(urls: list[str]) -> list[dict]:
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [f"https://example.com/product/{i}" for i in range(1000)]
results = asyncio.run(scrape_all(urls))
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(results)} successful")
print(f"Avg latency: {sum(r['latency'] for r in results) / len(results):.3f}s")
Python: Worker Pool con Backpressure
Quando avete bisogno di più controllo — limite di tasso, backpressure o pianificazione prioritaria — utilizzare un pool di lavoratori con un asincsio. Queue.
import asyncio
import aiohttp
import uuid
class WorkerPool:
"""Fixed-size worker pool with backpressure via bounded queue."""
def __init__(self, num_workers: int = 20, queue_size: int = 100):
self.num_workers = num_workers
self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
self.results: list = []
self.stats = {"success": 0, "failed": 0, "total_latency": 0.0}
self._stop = False
async def worker(self, session: aiohttp.ClientSession, worker_id: int):
while not self._stop:
try:
url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
except asyncio.TimeoutError:
if self._stop:
break
continue
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
import time
start = time.time()
try:
async with session.get(
url, proxy=proxy,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
body = await response.text()
latency = time.time() - start
self.stats["success"] += 1
self.stats["total_latency"] += latency
self.results.append({
"url": url, "status": response.status,
"length": len(body), "worker": worker_id,
})
except Exception as e:
self.stats["failed"] += 1
self.results.append({"url": url, "error": str(e), "worker": worker_id})
finally:
self.queue.task_done()
async def run(self, urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
# Start workers
workers = [
asyncio.create_task(self.worker(session, i))
for i in range(self.num_workers)
]
# Feed URLs into the queue (backpressure: blocks when queue is full)
for url in urls:
await self.queue.put(url)
# Wait for all tasks to complete
await self.queue.join()
self._stop = True
# Cancel workers
for w in workers:
w.cancel()
return self.results
# Usage
pool = WorkerPool(num_workers=30, queue_size=50)
urls = [f"https://example.com/item/{i}" for i in range(500)]
results = asyncio.run(pool.run(urls))
print(f"Success: {pool.stats['success']}, Failed: {pool.stats['failed']}")
avg_lat = pool.stats["total_latency"] / max(pool.stats["success"], 1)
print(f"Avg latency: {avg_lat:.3f}s")
Python: Rate Limiter
Alcuni obiettivi applicano limiti di velocità rigorosi. Questo limitatore di tasso di token-bucket si integra con i modelli di convergenza sopra.
import asyncio
import time
class RateLimiter:
"""Token-bucket rate limiter for async operations."""
def __init__(self, rate: float, burst: int = 1):
"""
Args:
rate: Requests per second
burst: Maximum burst size
"""
self.rate = rate
self.burst = burst
self.tokens = burst
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
# Combined with semaphore
async def fetch_rate_limited(session, url, semaphore, limiter):
async with semaphore:
await limiter.acquire()
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
async with session.get(url, proxy=proxy, timeout=TIMEOUT) as resp:
return await resp.text()
# 10 requests/second, max 30 concurrent
async def main():
semaphore = asyncio.Semaphore(30)
limiter = RateLimiter(rate=10.0, burst=5)
urls = [f"https://example.com/page/{i}" for i in range(200)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_rate_limited(session, u, semaphore, limiter) for u in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = sum(1 for r in results if not isinstance(r, Exception))
print(f"Done: {success}/{len(results)}")
asyncio.run(main())
Node.js: Promise Batching
Il modello più semplice Node.js concurrency elabora URL in lotti a dimensione fissa.
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const BATCH_SIZE = 20;
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function fetchUrl(url) {
const agent = createAgent();
const start = Date.now();
try {
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
const text = await response.text();
return {
url,
status: response.status,
length: text.length,
latency: Date.now() - start,
};
} catch (err) {
return { url, error: err.message, latency: Date.now() - start };
}
}
async function scrapeInBatches(urls) {
const results = [];
for (let i = 0; i < urls.length; i += BATCH_SIZE) {
const batch = urls.slice(i, i + BATCH_SIZE);
const batchResults = await Promise.all(batch.map(fetchUrl));
results.push(...batchResults);
const success = batchResults.filter(r => !r.error).length;
console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${success}/${batch.length} OK`);
}
return results;
}
// Usage
const urls = Array.from({ length: 200 }, (_, i) =>
`https://example.com/product/${i + 1}`
);
scrapeInBatches(urls).then(results => {
const success = results.filter(r => !r.error).length;
console.log(`Total: ${success}/${results.length} successful`);
});
Node.js: p-limit for Fine-Grained Control
Per precisi limiti di concurrenza senza batch manuale, utilizzare p-limit biblioteca.
// npm install p-limit
const pLimit = require('p-limit');
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const limit = pLimit(30); // Max 30 concurrent requests
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function fetchWithLimit(url) {
return limit(async () => {
const agent = createAgent();
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
return {
url,
status: response.status,
body: await response.text(),
};
});
}
// All 500 URLs start immediately, but only 30 run concurrently
const urls = Array.from({ length: 500 }, (_, i) =>
`https://example.com/item/${i + 1}`
);
Promise.all(urls.map(fetchWithLimit)).then(results => {
const success = results.filter(r => r.status === 200).length;
console.log(`Success: ${success}/${results.length}`);
});
Node.js: Worker Queue con Backpressure
// npm install p-queue
const PQueue = require('p-queue').default;
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const queue = new PQueue({
concurrency: 25,
intervalCap: 10, // Max 10 requests...
interval: 1000, // ...per second (rate limiting)
});
queue.on('active', () => {
console.log(`Active: ${queue.pending} pending, ${queue.size} queued`);
});
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function processUrl(url) {
const agent = createAgent();
const response = await fetch(url, { agent, signal: AbortSignal.timeout(30000) });
return { url, status: response.status, body: await response.text() };
}
// Add URLs to the queue
const urls = Array.from({ length: 1000 }, (_, i) =>
`https://example.com/page/${i + 1}`
);
const results = await Promise.all(
urls.map(url => queue.add(() => processUrl(url)))
);
console.log(`Completed: ${results.filter(r => r.status === 200).length}/${results.length}`);
Vai: Goroutines con Semaphore
Go's goroutines sono leggeri, ma è ancora necessario limitare la convalutazione per evitare connessioni proxy schiaccianti. Un semaforo basato sul canale è l'approccio idiomatico.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
const maxConcurrency = 50
type Result struct {
URL string
Status int
Length int
Latency time.Duration
Error error
}
func newProxyClient() *http.Client {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
proxyURL, _ := url.Parse(proxyStr)
return &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
}
func fetchURL(target string, sem chan struct{}, wg *sync.WaitGroup, results chan<- Result) {
defer wg.Done()
sem <- struct{}{} // Acquire semaphore
defer func() { <-sem }() // Release semaphore
client := newProxyClient()
start := time.Now()
resp, err := client.Get(target)
if err != nil {
results <- Result{URL: target, Error: err, Latency: time.Since(start)}
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
results <- Result{
URL: target,
Status: resp.StatusCode,
Length: len(body),
Latency: time.Since(start),
}
}
func main() {
urls := make([]string, 500)
for i := range urls {
urls[i] = fmt.Sprintf("https://example.com/item/%d", i+1)
}
sem := make(chan struct{}, maxConcurrency)
results := make(chan Result, len(urls))
var wg sync.WaitGroup
start := time.Now()
for _, u := range urls {
wg.Add(1)
go fetchURL(u, sem, &wg, results)
}
// Close results channel when all goroutines finish
go func() {
wg.Wait()
close(results)
}()
var success, failed int
var totalLatency time.Duration
for r := range results {
if r.Error != nil {
failed++
} else {
success++
totalLatency += r.Latency
}
}
elapsed := time.Since(start)
fmt.Printf("Completed in %s\n", elapsed)
fmt.Printf("Success: %d, Failed: %d\n", success, failed)
fmt.Printf("Avg latency: %s\n", totalLatency/time.Duration(max(success, 1)))
fmt.Printf("Throughput: %.1f req/s\n", float64(success+failed)/elapsed.Seconds())
}
Vai: Vasca da lavoro con canali
Per la lavorazione più strutturata, utilizzare un pool fisso di lavoratori che consumano da un canale.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type Job struct {
URL string
}
type JobResult struct {
URL string
Status int
Body string
Latency time.Duration
Err error
}
func worker(id int, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
proxyURL, _ := url.Parse(proxyStr)
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
start := time.Now()
resp, err := client.Get(job.URL)
latency := time.Since(start)
if err != nil {
results <- JobResult{URL: job.URL, Err: err, Latency: latency}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- JobResult{
URL: job.URL,
Status: resp.StatusCode,
Body: string(body),
Latency: latency,
}
}
}
func main() {
numWorkers := 30
urls := make([]string, 300)
for i := range urls {
urls[i] = fmt.Sprintf("https://example.com/page/%d", i+1)
}
jobs := make(chan Job, len(urls))
results := make(chan JobResult, len(urls))
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Send jobs
for _, u := range urls {
jobs <- Job{URL: u}
}
close(jobs)
// Collect results
go func() {
wg.Wait()
close(results)
}()
var success, failed int
for r := range results {
if r.Err != nil {
failed++
} else {
success++
}
}
fmt.Printf("Success: %d, Failed: %d\n", success, failed)
}
Scegliere il giusto livello di competitività
La convalutazione ottimale dipende da diversi fattori. Ecco una pratica guida a punto di partenza:
| Tipo di destinazione | Convalida consigliata | Motivazione |
|---|---|---|
| API leggere (JSON) | 50-200 | Risposte veloci, memoria bassa per richiesta |
| Pagine web standard | 20-50 | Dimensioni di risposta moderata, alcuni limiti di velocità |
| Pagine pesanti JS | 5-15 | I contesti del browser utilizzano una memoria significativa |
| Aggressivi siti anti-bot | 5-10 | Bisogno di tempistiche realistiche tra le richieste |
| Grandi download di file | 5-20 | Bandwidth-bound, non CPU-bound |
Inizia con 10 richieste concorrenziali e aumenta gradualmente monitorando i tassi di successo. Se il tasso di successo scende al di sotto del 90%, ridurre la concorrenza o aggiungere ritardi tra le richieste. Per ulteriori informazioni sul tracciamento di queste metriche, vedere il nostro Monitoraggio delle prestazioni del proxy Guida.
Per un'astrazione proxy riutilizzabile con una convalutazione integrata, vedi Costruire un livello Proxy Middleware. Per l'architettura di scraping end-to-end, leggere Progettazione di un'Architettura di Scraping affidabile. Esplorare il Python SDK♪ Node SDKe Vai SDK per l'integrazione proxy pronta alla produzione o il controllo Prezzo di ProxyHat per iniziare.






