Skalowanie żądań proxy za pomocą kontroli konkursowej

Master wzorce konkursowe do skrobania na bazie proksydowej: asyncio semafora, baseny obietnic, baseny robotnicze Go, ograniczniki stóp i ciśnienie wsteczne. Kod produkcji w Pythonie, Node.js i Go.

Skalowanie żądań proxy za pomocą kontroli konkursowej

Dlaczego Conkurse Control Matters for Proxy Scraping

Wysyłanie wniosków kolejno przez proxy odpady przepustowości i czasu. Wysyłanie ich wszystkich jednocześnie przytłacza bramkę proxy, serwer docelowy i własny system. Concurrency control uderza w saldo - maksymalizacja przepustowości podczas pozostawania w granicach puli proxy, tolerancja miejsca docelowego i dostępnych zasobów.

Przewodnik ten obejmuje wzorce współwalutowe klasy produkcyjnej w trzech językach: Python (asyncio), Node.js (pule obietnic) i Go (goroutynki z semaforami). Każdy przykład używa Rotacyjne proxy mieszkaniowe ProxyHat i jest gotowy do skopiowania do swoich projektów.

Celem concurrency control jest prosta: zmaksymalizować żądania na sekundę bez uruchamiania bloków, wyczerpującej pamięci lub awarii procesu. Właściwy wzór zależy od języka, docelowej strony i skali.

Wzory konkursowe w porównaniu

WzórJęzykNajlepsze dlaMax Konwaluta
Asyncio. SemaforaPythonI / O-bound scrating50- 200 na proces
Basen robotniczy (asyncio)PythonKolejki zadaniowe z przeciwciśnieniem10- 100 pracowników
Promise.all + batchingNode.jsProste równoległe pobieranie50- 500 na każdy proces
p- limit / p- kolejkaNode.jsKwartalna waluta10- 200 na kolejkę
Gorotyny + SemaforIdź.Wysokoprzepustowość drapania100- 1000 +
Worker Pool (Idź kanałów)Idź.Strukturalna dystrybucja zadań10- 500 pracowników

Python: Asyncio Semaphore

Najprostszy i najskuteczniejszy wzór współzależności w Pythonie. Semafora ogranicza liczbę korotynów, które mogą wykonywać jednocześnie, zapobiegając wyczerpaniu zasobów.

import asyncio
import aiohttp
import uuid
import time
PROXY_GATEWAY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
MAX_CONCURRENCY = 50
TIMEOUT = aiohttp.ClientTimeout(total=30)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> dict:
    async with semaphore:
        session_id = uuid.uuid4().hex[:8]
        proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
        start = time.time()
        try:
            async with session.get(url, proxy=proxy, timeout=TIMEOUT) as response:
                body = await response.text()
                return {
                    "url": url,
                    "status": response.status,
                    "length": len(body),
                    "latency": round(time.time() - start, 3),
                }
        except Exception as e:
            return {"url": url, "error": str(e), "latency": round(time.time() - start, 3)}
async def scrape_all(urls: list[str]) -> list[dict]:
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
    return results
# Usage
urls = [f"https://example.com/product/{i}" for i in range(1000)]
results = asyncio.run(scrape_all(urls))
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(results)} successful")
print(f"Avg latency: {sum(r['latency'] for r in results) / len(results):.3f}s")

Python: Worker Pool with Backpressure

Gdy potrzebujesz większej kontroli - ograniczenie prędkości, backpressure lub priorytetowy harmonogram - użyj puli pracowników z asyncio. Kolejka.

import asyncio
import aiohttp
import uuid
class WorkerPool:
    """Fixed-size worker pool with backpressure via bounded queue."""
    def __init__(self, num_workers: int = 20, queue_size: int = 100):
        self.num_workers = num_workers
        self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
        self.results: list = []
        self.stats = {"success": 0, "failed": 0, "total_latency": 0.0}
        self._stop = False
    async def worker(self, session: aiohttp.ClientSession, worker_id: int):
        while not self._stop:
            try:
                url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
            except asyncio.TimeoutError:
                if self._stop:
                    break
                continue
            session_id = uuid.uuid4().hex[:8]
            proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
            import time
            start = time.time()
            try:
                async with session.get(
                    url, proxy=proxy,
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as response:
                    body = await response.text()
                    latency = time.time() - start
                    self.stats["success"] += 1
                    self.stats["total_latency"] += latency
                    self.results.append({
                        "url": url, "status": response.status,
                        "length": len(body), "worker": worker_id,
                    })
            except Exception as e:
                self.stats["failed"] += 1
                self.results.append({"url": url, "error": str(e), "worker": worker_id})
            finally:
                self.queue.task_done()
    async def run(self, urls: list[str]) -> list[dict]:
        async with aiohttp.ClientSession() as session:
            # Start workers
            workers = [
                asyncio.create_task(self.worker(session, i))
                for i in range(self.num_workers)
            ]
            # Feed URLs into the queue (backpressure: blocks when queue is full)
            for url in urls:
                await self.queue.put(url)
            # Wait for all tasks to complete
            await self.queue.join()
            self._stop = True
            # Cancel workers
            for w in workers:
                w.cancel()
        return self.results
# Usage
pool = WorkerPool(num_workers=30, queue_size=50)
urls = [f"https://example.com/item/{i}" for i in range(500)]
results = asyncio.run(pool.run(urls))
print(f"Success: {pool.stats['success']}, Failed: {pool.stats['failed']}")
avg_lat = pool.stats["total_latency"] / max(pool.stats["success"], 1)
print(f"Avg latency: {avg_lat:.3f}s")

Python: Oceń limit

Niektóre cele egzekwują surowe limity stawek. Ogranicznik kursu token-bucket integruje się ze wzorami współwalutowymi powyżej.

import asyncio
import time
class RateLimiter:
    """Token-bucket rate limiter for async operations."""
    def __init__(self, rate: float, burst: int = 1):
        """
        Args:
            rate: Requests per second
            burst: Maximum burst size
        """
        self.rate = rate
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()
    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now
            if self.tokens < 1:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1
# Combined with semaphore
async def fetch_rate_limited(session, url, semaphore, limiter):
    async with semaphore:
        await limiter.acquire()
        session_id = uuid.uuid4().hex[:8]
        proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
        async with session.get(url, proxy=proxy, timeout=TIMEOUT) as resp:
            return await resp.text()
# 10 requests/second, max 30 concurrent
async def main():
    semaphore = asyncio.Semaphore(30)
    limiter = RateLimiter(rate=10.0, burst=5)
    urls = [f"https://example.com/page/{i}" for i in range(200)]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_rate_limited(session, u, semaphore, limiter) for u in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    success = sum(1 for r in results if not isinstance(r, Exception))
    print(f"Done: {success}/{len(results)}")
asyncio.run(main())

Node.js: Łatanie obietnic

Najprostsze Node.js concurrency procesory wzorca URL w ustawieniach wielkości partii.

const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const BATCH_SIZE = 20;
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function fetchUrl(url) {
  const agent = createAgent();
  const start = Date.now();
  try {
    const response = await fetch(url, {
      agent,
      signal: AbortSignal.timeout(30000),
    });
    const text = await response.text();
    return {
      url,
      status: response.status,
      length: text.length,
      latency: Date.now() - start,
    };
  } catch (err) {
    return { url, error: err.message, latency: Date.now() - start };
  }
}
async function scrapeInBatches(urls) {
  const results = [];
  for (let i = 0; i < urls.length; i += BATCH_SIZE) {
    const batch = urls.slice(i, i + BATCH_SIZE);
    const batchResults = await Promise.all(batch.map(fetchUrl));
    results.push(...batchResults);
    const success = batchResults.filter(r => !r.error).length;
    console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${success}/${batch.length} OK`);
  }
  return results;
}
// Usage
const urls = Array.from({ length: 200 }, (_, i) =>
  `https://example.com/product/${i + 1}`
);
scrapeInBatches(urls).then(results => {
  const success = results.filter(r => !r.error).length;
  console.log(`Total: ${success}/${results.length} successful`);
});

Node.js: limit p- dla kontroli ziaren Fine-

W celu uzyskania dokładnych limitów wzajemnych bez ręcznego łączenia, należy użyć p-limit Biblioteka.

// npm install p-limit
const pLimit = require('p-limit');
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const limit = pLimit(30); // Max 30 concurrent requests
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function fetchWithLimit(url) {
  return limit(async () => {
    const agent = createAgent();
    const response = await fetch(url, {
      agent,
      signal: AbortSignal.timeout(30000),
    });
    return {
      url,
      status: response.status,
      body: await response.text(),
    };
  });
}
// All 500 URLs start immediately, but only 30 run concurrently
const urls = Array.from({ length: 500 }, (_, i) =>
  `https://example.com/item/${i + 1}`
);
Promise.all(urls.map(fetchWithLimit)).then(results => {
  const success = results.filter(r => r.status === 200).length;
  console.log(`Success: ${success}/${results.length}`);
});

Node.js: kolejka robotnicza z ciśnieniem wstecznym

// npm install p-queue
const PQueue = require('p-queue').default;
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const queue = new PQueue({
  concurrency: 25,
  intervalCap: 10,   // Max 10 requests...
  interval: 1000,    // ...per second (rate limiting)
});
queue.on('active', () => {
  console.log(`Active: ${queue.pending} pending, ${queue.size} queued`);
});
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function processUrl(url) {
  const agent = createAgent();
  const response = await fetch(url, { agent, signal: AbortSignal.timeout(30000) });
  return { url, status: response.status, body: await response.text() };
}
// Add URLs to the queue
const urls = Array.from({ length: 1000 }, (_, i) =>
  `https://example.com/page/${i + 1}`
);
const results = await Promise.all(
  urls.map(url => queue.add(() => processUrl(url)))
);
console.log(`Completed: ${results.filter(r => r.status === 200).length}/${results.length}`);

Go: Goroutines with Semaphore

Goroutines Go są lekkie, ale nadal trzeba ograniczyć współzależności, aby uniknąć przytłaczających połączeń proxy. Semafora oparta na kanałach to podejście idiomatyczne.

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"
)
const maxConcurrency = 50
type Result struct {
	URL     string
	Status  int
	Length  int
	Latency time.Duration
	Error   error
}
func newProxyClient() *http.Client {
	b := make([]byte, 4)
	rand.Read(b)
	sessionID := hex.EncodeToString(b)
	proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
	proxyURL, _ := url.Parse(proxyStr)
	return &http.Client{
		Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
		Timeout:   30 * time.Second,
	}
}
func fetchURL(target string, sem chan struct{}, wg *sync.WaitGroup, results chan<- Result) {
	defer wg.Done()
	sem <- struct{}{}        // Acquire semaphore
	defer func() { <-sem }() // Release semaphore
	client := newProxyClient()
	start := time.Now()
	resp, err := client.Get(target)
	if err != nil {
		results <- Result{URL: target, Error: err, Latency: time.Since(start)}
		return
	}
	defer resp.Body.Close()
	body, _ := io.ReadAll(resp.Body)
	results <- Result{
		URL:     target,
		Status:  resp.StatusCode,
		Length:  len(body),
		Latency: time.Since(start),
	}
}
func main() {
	urls := make([]string, 500)
	for i := range urls {
		urls[i] = fmt.Sprintf("https://example.com/item/%d", i+1)
	}
	sem := make(chan struct{}, maxConcurrency)
	results := make(chan Result, len(urls))
	var wg sync.WaitGroup
	start := time.Now()
	for _, u := range urls {
		wg.Add(1)
		go fetchURL(u, sem, &wg, results)
	}
	// Close results channel when all goroutines finish
	go func() {
		wg.Wait()
		close(results)
	}()
	var success, failed int
	var totalLatency time.Duration
	for r := range results {
		if r.Error != nil {
			failed++
		} else {
			success++
			totalLatency += r.Latency
		}
	}
	elapsed := time.Since(start)
	fmt.Printf("Completed in %s\n", elapsed)
	fmt.Printf("Success: %d, Failed: %d\n", success, failed)
	fmt.Printf("Avg latency: %s\n", totalLatency/time.Duration(max(success, 1)))
	fmt.Printf("Throughput: %.1f req/s\n", float64(success+failed)/elapsed.Seconds())
}

Go: Worker Pool with Channels

Dla bardziej ustrukturyzowanego przetwarzania, używać stałej puli pracowników spożywających z kanału.

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"
)
type Job struct {
	URL string
}
type JobResult struct {
	URL     string
	Status  int
	Body    string
	Latency time.Duration
	Err     error
}
func worker(id int, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		b := make([]byte, 4)
		rand.Read(b)
		sessionID := hex.EncodeToString(b)
		proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
		proxyURL, _ := url.Parse(proxyStr)
		client := &http.Client{
			Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
			Timeout:   30 * time.Second,
		}
		start := time.Now()
		resp, err := client.Get(job.URL)
		latency := time.Since(start)
		if err != nil {
			results <- JobResult{URL: job.URL, Err: err, Latency: latency}
			continue
		}
		body, _ := io.ReadAll(resp.Body)
		resp.Body.Close()
		results <- JobResult{
			URL:     job.URL,
			Status:  resp.StatusCode,
			Body:    string(body),
			Latency: latency,
		}
	}
}
func main() {
	numWorkers := 30
	urls := make([]string, 300)
	for i := range urls {
		urls[i] = fmt.Sprintf("https://example.com/page/%d", i+1)
	}
	jobs := make(chan Job, len(urls))
	results := make(chan JobResult, len(urls))
	var wg sync.WaitGroup
	// Start workers
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}
	// Send jobs
	for _, u := range urls {
		jobs <- Job{URL: u}
	}
	close(jobs)
	// Collect results
	go func() {
		wg.Wait()
		close(results)
	}()
	var success, failed int
	for r := range results {
		if r.Err != nil {
			failed++
		} else {
			success++
		}
	}
	fmt.Printf("Success: %d, Failed: %d\n", success, failed)
}

Wybór odpowiedniego poziomu wymiany walut

Optymalna współzależność zależy od kilku czynników. Oto praktyczny przewodnik startowy:

Typ docelowyZalecane połączenieUzasadnienie
Lekkie API (JSON)50- 200Szybkie odpowiedzi, niska pamięć na żądanie
Standardowe strony internetowe20- 50Rozmiary odpowiedzi umiarkowanej, niektóre ograniczenia częstości
Ciężkie strony JS-renderowane5- 15Kontekst przeglądarki używa znaczącej pamięci
Agresywne strony anty-bot5- 10Potrzeba realistycznego harmonogramu pomiędzy wnioskami
Duże pliki do pobrania5- 20Związane z widmem, nie związane z CPU-
Zacznij od 10 równoległych wniosków i stopniowo wzrastać, monitorując wskaźniki sukcesu. Jeśli wskaźnik sukcesu spadnie poniżej 90%, zmniejszyć współwalutę lub dodać opóźnienia pomiędzy wnioskami. Więcej informacji na temat tych wskaźników, zobacz nasz Monitorowanie wydajności proxy Przewodnik.

Dla wielokrotnego wykorzystania abstrakcji proxy z built- in concurrency, patrz Building a Proxy Middleware Layer. Dla end- to- end scrating architecture, read Projektowanie niezawodnej architektury skraplaniaZbadaj Python SDK, Węzeł SDKoraz Go SDK do integracji proxy gotowej do produkcji lub sprawdzania Ceny proksyHat Na początek.

Często zadawane pytania

Gotowy, aby zacząć?

Dostęp do ponad 50 mln rezydencjalnych IP w ponad 148 krajach z filtrowaniem AI.

Zobacz cenyProxy rezydencjalne
← Powrót do Bloga