Dlaczego Conkurse Control Matters for Proxy Scraping
Wysyłanie wniosków kolejno przez proxy odpady przepustowości i czasu. Wysyłanie ich wszystkich jednocześnie przytłacza bramkę proxy, serwer docelowy i własny system. Concurrency control uderza w saldo - maksymalizacja przepustowości podczas pozostawania w granicach puli proxy, tolerancja miejsca docelowego i dostępnych zasobów.
Przewodnik ten obejmuje wzorce współwalutowe klasy produkcyjnej w trzech językach: Python (asyncio), Node.js (pule obietnic) i Go (goroutynki z semaforami). Każdy przykład używa Rotacyjne proxy mieszkaniowe ProxyHat i jest gotowy do skopiowania do swoich projektów.
Celem concurrency control jest prosta: zmaksymalizować żądania na sekundę bez uruchamiania bloków, wyczerpującej pamięci lub awarii procesu. Właściwy wzór zależy od języka, docelowej strony i skali.
Wzory konkursowe w porównaniu
| Wzór | Język | Najlepsze dla | Max Konwaluta |
|---|---|---|---|
| Asyncio. Semafora | Python | I / O-bound scrating | 50- 200 na proces |
| Basen robotniczy (asyncio) | Python | Kolejki zadaniowe z przeciwciśnieniem | 10- 100 pracowników |
| Promise.all + batching | Node.js | Proste równoległe pobieranie | 50- 500 na każdy proces |
| p- limit / p- kolejka | Node.js | Kwartalna waluta | 10- 200 na kolejkę |
| Gorotyny + Semafor | Idź. | Wysokoprzepustowość drapania | 100- 1000 + |
| Worker Pool (Idź kanałów) | Idź. | Strukturalna dystrybucja zadań | 10- 500 pracowników |
Python: Asyncio Semaphore
Najprostszy i najskuteczniejszy wzór współzależności w Pythonie. Semafora ogranicza liczbę korotynów, które mogą wykonywać jednocześnie, zapobiegając wyczerpaniu zasobów.
import asyncio
import aiohttp
import uuid
import time
PROXY_GATEWAY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
MAX_CONCURRENCY = 50
TIMEOUT = aiohttp.ClientTimeout(total=30)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
start = time.time()
try:
async with session.get(url, proxy=proxy, timeout=TIMEOUT) as response:
body = await response.text()
return {
"url": url,
"status": response.status,
"length": len(body),
"latency": round(time.time() - start, 3),
}
except Exception as e:
return {"url": url, "error": str(e), "latency": round(time.time() - start, 3)}
async def scrape_all(urls: list[str]) -> list[dict]:
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [f"https://example.com/product/{i}" for i in range(1000)]
results = asyncio.run(scrape_all(urls))
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(results)} successful")
print(f"Avg latency: {sum(r['latency'] for r in results) / len(results):.3f}s")
Python: Worker Pool with Backpressure
Gdy potrzebujesz większej kontroli - ograniczenie prędkości, backpressure lub priorytetowy harmonogram - użyj puli pracowników z asyncio. Kolejka.
import asyncio
import aiohttp
import uuid
class WorkerPool:
"""Fixed-size worker pool with backpressure via bounded queue."""
def __init__(self, num_workers: int = 20, queue_size: int = 100):
self.num_workers = num_workers
self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
self.results: list = []
self.stats = {"success": 0, "failed": 0, "total_latency": 0.0}
self._stop = False
async def worker(self, session: aiohttp.ClientSession, worker_id: int):
while not self._stop:
try:
url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
except asyncio.TimeoutError:
if self._stop:
break
continue
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
import time
start = time.time()
try:
async with session.get(
url, proxy=proxy,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
body = await response.text()
latency = time.time() - start
self.stats["success"] += 1
self.stats["total_latency"] += latency
self.results.append({
"url": url, "status": response.status,
"length": len(body), "worker": worker_id,
})
except Exception as e:
self.stats["failed"] += 1
self.results.append({"url": url, "error": str(e), "worker": worker_id})
finally:
self.queue.task_done()
async def run(self, urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
# Start workers
workers = [
asyncio.create_task(self.worker(session, i))
for i in range(self.num_workers)
]
# Feed URLs into the queue (backpressure: blocks when queue is full)
for url in urls:
await self.queue.put(url)
# Wait for all tasks to complete
await self.queue.join()
self._stop = True
# Cancel workers
for w in workers:
w.cancel()
return self.results
# Usage
pool = WorkerPool(num_workers=30, queue_size=50)
urls = [f"https://example.com/item/{i}" for i in range(500)]
results = asyncio.run(pool.run(urls))
print(f"Success: {pool.stats['success']}, Failed: {pool.stats['failed']}")
avg_lat = pool.stats["total_latency"] / max(pool.stats["success"], 1)
print(f"Avg latency: {avg_lat:.3f}s")
Python: Oceń limit
Niektóre cele egzekwują surowe limity stawek. Ogranicznik kursu token-bucket integruje się ze wzorami współwalutowymi powyżej.
import asyncio
import time
class RateLimiter:
"""Token-bucket rate limiter for async operations."""
def __init__(self, rate: float, burst: int = 1):
"""
Args:
rate: Requests per second
burst: Maximum burst size
"""
self.rate = rate
self.burst = burst
self.tokens = burst
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
# Combined with semaphore
async def fetch_rate_limited(session, url, semaphore, limiter):
async with semaphore:
await limiter.acquire()
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
async with session.get(url, proxy=proxy, timeout=TIMEOUT) as resp:
return await resp.text()
# 10 requests/second, max 30 concurrent
async def main():
semaphore = asyncio.Semaphore(30)
limiter = RateLimiter(rate=10.0, burst=5)
urls = [f"https://example.com/page/{i}" for i in range(200)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_rate_limited(session, u, semaphore, limiter) for u in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = sum(1 for r in results if not isinstance(r, Exception))
print(f"Done: {success}/{len(results)}")
asyncio.run(main())
Node.js: Łatanie obietnic
Najprostsze Node.js concurrency procesory wzorca URL w ustawieniach wielkości partii.
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const BATCH_SIZE = 20;
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function fetchUrl(url) {
const agent = createAgent();
const start = Date.now();
try {
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
const text = await response.text();
return {
url,
status: response.status,
length: text.length,
latency: Date.now() - start,
};
} catch (err) {
return { url, error: err.message, latency: Date.now() - start };
}
}
async function scrapeInBatches(urls) {
const results = [];
for (let i = 0; i < urls.length; i += BATCH_SIZE) {
const batch = urls.slice(i, i + BATCH_SIZE);
const batchResults = await Promise.all(batch.map(fetchUrl));
results.push(...batchResults);
const success = batchResults.filter(r => !r.error).length;
console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${success}/${batch.length} OK`);
}
return results;
}
// Usage
const urls = Array.from({ length: 200 }, (_, i) =>
`https://example.com/product/${i + 1}`
);
scrapeInBatches(urls).then(results => {
const success = results.filter(r => !r.error).length;
console.log(`Total: ${success}/${results.length} successful`);
});
Node.js: limit p- dla kontroli ziaren Fine-
W celu uzyskania dokładnych limitów wzajemnych bez ręcznego łączenia, należy użyć p-limit Biblioteka.
// npm install p-limit
const pLimit = require('p-limit');
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const limit = pLimit(30); // Max 30 concurrent requests
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function fetchWithLimit(url) {
return limit(async () => {
const agent = createAgent();
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
return {
url,
status: response.status,
body: await response.text(),
};
});
}
// All 500 URLs start immediately, but only 30 run concurrently
const urls = Array.from({ length: 500 }, (_, i) =>
`https://example.com/item/${i + 1}`
);
Promise.all(urls.map(fetchWithLimit)).then(results => {
const success = results.filter(r => r.status === 200).length;
console.log(`Success: ${success}/${results.length}`);
});
Node.js: kolejka robotnicza z ciśnieniem wstecznym
// npm install p-queue
const PQueue = require('p-queue').default;
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const queue = new PQueue({
concurrency: 25,
intervalCap: 10, // Max 10 requests...
interval: 1000, // ...per second (rate limiting)
});
queue.on('active', () => {
console.log(`Active: ${queue.pending} pending, ${queue.size} queued`);
});
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function processUrl(url) {
const agent = createAgent();
const response = await fetch(url, { agent, signal: AbortSignal.timeout(30000) });
return { url, status: response.status, body: await response.text() };
}
// Add URLs to the queue
const urls = Array.from({ length: 1000 }, (_, i) =>
`https://example.com/page/${i + 1}`
);
const results = await Promise.all(
urls.map(url => queue.add(() => processUrl(url)))
);
console.log(`Completed: ${results.filter(r => r.status === 200).length}/${results.length}`);
Go: Goroutines with Semaphore
Goroutines Go są lekkie, ale nadal trzeba ograniczyć współzależności, aby uniknąć przytłaczających połączeń proxy. Semafora oparta na kanałach to podejście idiomatyczne.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
const maxConcurrency = 50
type Result struct {
URL string
Status int
Length int
Latency time.Duration
Error error
}
func newProxyClient() *http.Client {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
proxyURL, _ := url.Parse(proxyStr)
return &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
}
func fetchURL(target string, sem chan struct{}, wg *sync.WaitGroup, results chan<- Result) {
defer wg.Done()
sem <- struct{}{} // Acquire semaphore
defer func() { <-sem }() // Release semaphore
client := newProxyClient()
start := time.Now()
resp, err := client.Get(target)
if err != nil {
results <- Result{URL: target, Error: err, Latency: time.Since(start)}
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
results <- Result{
URL: target,
Status: resp.StatusCode,
Length: len(body),
Latency: time.Since(start),
}
}
func main() {
urls := make([]string, 500)
for i := range urls {
urls[i] = fmt.Sprintf("https://example.com/item/%d", i+1)
}
sem := make(chan struct{}, maxConcurrency)
results := make(chan Result, len(urls))
var wg sync.WaitGroup
start := time.Now()
for _, u := range urls {
wg.Add(1)
go fetchURL(u, sem, &wg, results)
}
// Close results channel when all goroutines finish
go func() {
wg.Wait()
close(results)
}()
var success, failed int
var totalLatency time.Duration
for r := range results {
if r.Error != nil {
failed++
} else {
success++
totalLatency += r.Latency
}
}
elapsed := time.Since(start)
fmt.Printf("Completed in %s\n", elapsed)
fmt.Printf("Success: %d, Failed: %d\n", success, failed)
fmt.Printf("Avg latency: %s\n", totalLatency/time.Duration(max(success, 1)))
fmt.Printf("Throughput: %.1f req/s\n", float64(success+failed)/elapsed.Seconds())
}
Go: Worker Pool with Channels
Dla bardziej ustrukturyzowanego przetwarzania, używać stałej puli pracowników spożywających z kanału.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type Job struct {
URL string
}
type JobResult struct {
URL string
Status int
Body string
Latency time.Duration
Err error
}
func worker(id int, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
proxyURL, _ := url.Parse(proxyStr)
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
start := time.Now()
resp, err := client.Get(job.URL)
latency := time.Since(start)
if err != nil {
results <- JobResult{URL: job.URL, Err: err, Latency: latency}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- JobResult{
URL: job.URL,
Status: resp.StatusCode,
Body: string(body),
Latency: latency,
}
}
}
func main() {
numWorkers := 30
urls := make([]string, 300)
for i := range urls {
urls[i] = fmt.Sprintf("https://example.com/page/%d", i+1)
}
jobs := make(chan Job, len(urls))
results := make(chan JobResult, len(urls))
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Send jobs
for _, u := range urls {
jobs <- Job{URL: u}
}
close(jobs)
// Collect results
go func() {
wg.Wait()
close(results)
}()
var success, failed int
for r := range results {
if r.Err != nil {
failed++
} else {
success++
}
}
fmt.Printf("Success: %d, Failed: %d\n", success, failed)
}
Wybór odpowiedniego poziomu wymiany walut
Optymalna współzależność zależy od kilku czynników. Oto praktyczny przewodnik startowy:
| Typ docelowy | Zalecane połączenie | Uzasadnienie |
|---|---|---|
| Lekkie API (JSON) | 50- 200 | Szybkie odpowiedzi, niska pamięć na żądanie |
| Standardowe strony internetowe | 20- 50 | Rozmiary odpowiedzi umiarkowanej, niektóre ograniczenia częstości |
| Ciężkie strony JS-renderowane | 5- 15 | Kontekst przeglądarki używa znaczącej pamięci |
| Agresywne strony anty-bot | 5- 10 | Potrzeba realistycznego harmonogramu pomiędzy wnioskami |
| Duże pliki do pobrania | 5- 20 | Związane z widmem, nie związane z CPU- |
Zacznij od 10 równoległych wniosków i stopniowo wzrastać, monitorując wskaźniki sukcesu. Jeśli wskaźnik sukcesu spadnie poniżej 90%, zmniejszyć współwalutę lub dodać opóźnienia pomiędzy wnioskami. Więcej informacji na temat tych wskaźników, zobacz nasz Monitorowanie wydajności proxy Przewodnik.
Dla wielokrotnego wykorzystania abstrakcji proxy z built- in concurrency, patrz Building a Proxy Middleware Layer. Dla end- to- end scrating architecture, read Projektowanie niezawodnej architektury skraplaniaZbadaj Python SDK, Węzeł SDKoraz Go SDK do integracji proxy gotowej do produkcji lub sprawdzania Ceny proksyHat Na początek.






