Масштабирование прокси-запросов с контролем конкурентности

Освойте паттерны конкурентности для скрапинга через прокси: семафоры asyncio, Promise-пулы, Go worker-пулы, rate limiter-ы и backpressure. Продакшн-код на Python, Node.js и Go.

Масштабирование прокси-запросов с контролем конкурентности

Почему контроль конкурентности важен для скрапинга через прокси

Отправка запросов последовательно через прокси тратит пропускную способность и время впустую. Отправка всех сразу перегружает прокси-шлюз, целевой сервер и вашу собственную систему. Контроль конкурентности находит баланс — максимизирует пропускную способность, оставаясь в пределах лимитов вашего прокси-пула, терпимости целевого сайта и доступных ресурсов.

Это руководство охватывает продакшн-паттерны конкурентности на трёх языках: Python (asyncio), Node.js (Promise-пулы) и Go (горутины с семафорами). Каждый пример использует ротационные резидентные прокси ProxyHat и готов к копированию в ваши проекты.

Цель контроля конкурентности проста: максимизировать количество запросов в секунду без блокировок, исчерпания памяти или падения процесса. Правильный паттерн зависит от языка, целевого сайта и масштаба.

Сравнение паттернов конкурентности

ПаттернЯзыкЛучше всего дляМакс. конкурентность
asyncio.SemaphorePythonI/O-ориентированный скрапинг50-200 на процесс
Worker Pool (asyncio)PythonОчереди задач с backpressure10-100 воркеров
Promise.all + батчингNode.jsПростая параллельная загрузка50-500 на процесс
p-limit / p-queueNode.jsТочный контроль конкурентности10-200 на очередь
Горутины + СемафорGoВысокопроизводительный скрапинг100-1000+
Worker Pool (Go-каналы)GoСтруктурированное распределение задач10-500 воркеров

Python: asyncio Semaphore

Самый простой и эффективный паттерн конкурентности в Python. Семафор ограничивает количество корутин, выполняемых одновременно, предотвращая исчерпание ресурсов.

import asyncio
import aiohttp
import uuid
import time
PROXY_GATEWAY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
MAX_CONCURRENCY = 50
TIMEOUT = aiohttp.ClientTimeout(total=30)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> dict:
    async with semaphore:
        session_id = uuid.uuid4().hex[:8]
        proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
        start = time.time()
        try:
            async with session.get(url, proxy=proxy, timeout=TIMEOUT) as response:
                body = await response.text()
                return {
                    "url": url,
                    "status": response.status,
                    "length": len(body),
                    "latency": round(time.time() - start, 3),
                }
        except Exception as e:
            return {"url": url, "error": str(e), "latency": round(time.time() - start, 3)}
async def scrape_all(urls: list[str]) -> list[dict]:
    semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, semaphore) for url in urls]
        results = await asyncio.gather(*tasks)
    return results
# Usage
urls = [f"https://example.com/product/{i}" for i in range(1000)]
results = asyncio.run(scrape_all(urls))
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(results)} successful")
print(f"Avg latency: {sum(r['latency'] for r in results) / len(results):.3f}s")

Python: Worker Pool с Backpressure

Когда нужно больше контроля — ограничение скорости, backpressure или приоритетное планирование — используйте пул воркеров с asyncio.Queue.

import asyncio
import aiohttp
import uuid
class WorkerPool:
    """Fixed-size worker pool with backpressure via bounded queue."""
    def __init__(self, num_workers: int = 20, queue_size: int = 100):
        self.num_workers = num_workers
        self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
        self.results: list = []
        self.stats = {"success": 0, "failed": 0, "total_latency": 0.0}
        self._stop = False
    async def worker(self, session: aiohttp.ClientSession, worker_id: int):
        while not self._stop:
            try:
                url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
            except asyncio.TimeoutError:
                if self._stop:
                    break
                continue
            session_id = uuid.uuid4().hex[:8]
            proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
            import time
            start = time.time()
            try:
                async with session.get(
                    url, proxy=proxy,
                    timeout=aiohttp.ClientTimeout(total=30)
                ) as response:
                    body = await response.text()
                    latency = time.time() - start
                    self.stats["success"] += 1
                    self.stats["total_latency"] += latency
                    self.results.append({
                        "url": url, "status": response.status,
                        "length": len(body), "worker": worker_id,
                    })
            except Exception as e:
                self.stats["failed"] += 1
                self.results.append({"url": url, "error": str(e), "worker": worker_id})
            finally:
                self.queue.task_done()
    async def run(self, urls: list[str]) -> list[dict]:
        async with aiohttp.ClientSession() as session:
            # Start workers
            workers = [
                asyncio.create_task(self.worker(session, i))
                for i in range(self.num_workers)
            ]
            # Feed URLs into the queue (backpressure: blocks when queue is full)
            for url in urls:
                await self.queue.put(url)
            # Wait for all tasks to complete
            await self.queue.join()
            self._stop = True
            # Cancel workers
            for w in workers:
                w.cancel()
        return self.results
# Usage
pool = WorkerPool(num_workers=30, queue_size=50)
urls = [f"https://example.com/item/{i}" for i in range(500)]
results = asyncio.run(pool.run(urls))
print(f"Success: {pool.stats['success']}, Failed: {pool.stats['failed']}")
avg_lat = pool.stats["total_latency"] / max(pool.stats["success"], 1)
print(f"Avg latency: {avg_lat:.3f}s")

Python: Rate Limiter

Некоторые цели применяют строгие лимиты скорости. Этот ограничитель скорости на основе token-bucket интегрируется с паттернами конкурентности выше.

import asyncio
import time
class RateLimiter:
    """Token-bucket rate limiter for async operations."""
    def __init__(self, rate: float, burst: int = 1):
        """
        Args:
            rate: Requests per second
            burst: Maximum burst size
        """
        self.rate = rate
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()
    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now
            if self.tokens < 1:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0
            else:
                self.tokens -= 1
# Combined with semaphore
async def fetch_rate_limited(session, url, semaphore, limiter):
    async with semaphore:
        await limiter.acquire()
        session_id = uuid.uuid4().hex[:8]
        proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
        async with session.get(url, proxy=proxy, timeout=TIMEOUT) as resp:
            return await resp.text()
# 10 requests/second, max 30 concurrent
async def main():
    semaphore = asyncio.Semaphore(30)
    limiter = RateLimiter(rate=10.0, burst=5)
    urls = [f"https://example.com/page/{i}" for i in range(200)]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_rate_limited(session, u, semaphore, limiter) for u in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
    success = sum(1 for r in results if not isinstance(r, Exception))
    print(f"Done: {success}/{len(results)}")
asyncio.run(main())

Node.js: Promise Batching

Простейший паттерн конкурентности Node.js обрабатывает URL пакетами фиксированного размера.

const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const BATCH_SIZE = 20;
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function fetchUrl(url) {
  const agent = createAgent();
  const start = Date.now();
  try {
    const response = await fetch(url, {
      agent,
      signal: AbortSignal.timeout(30000),
    });
    const text = await response.text();
    return {
      url,
      status: response.status,
      length: text.length,
      latency: Date.now() - start,
    };
  } catch (err) {
    return { url, error: err.message, latency: Date.now() - start };
  }
}
async function scrapeInBatches(urls) {
  const results = [];
  for (let i = 0; i < urls.length; i += BATCH_SIZE) {
    const batch = urls.slice(i, i + BATCH_SIZE);
    const batchResults = await Promise.all(batch.map(fetchUrl));
    results.push(...batchResults);
    const success = batchResults.filter(r => !r.error).length;
    console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${success}/${batch.length} OK`);
  }
  return results;
}
// Usage
const urls = Array.from({ length: 200 }, (_, i) =>
  `https://example.com/product/${i + 1}`
);
scrapeInBatches(urls).then(results => {
  const success = results.filter(r => !r.error).length;
  console.log(`Total: ${success}/${results.length} successful`);
});

Node.js: p-limit для точного контроля

Для точных лимитов конкурентности без ручного разбиения на пакеты используйте библиотеку p-limit.

// npm install p-limit
const pLimit = require('p-limit');
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const limit = pLimit(30); // Max 30 concurrent requests
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function fetchWithLimit(url) {
  return limit(async () => {
    const agent = createAgent();
    const response = await fetch(url, {
      agent,
      signal: AbortSignal.timeout(30000),
    });
    return {
      url,
      status: response.status,
      body: await response.text(),
    };
  });
}
// All 500 URLs start immediately, but only 30 run concurrently
const urls = Array.from({ length: 500 }, (_, i) =>
  `https://example.com/item/${i + 1}`
);
Promise.all(urls.map(fetchWithLimit)).then(results => {
  const success = results.filter(r => r.status === 200).length;
  console.log(`Success: ${success}/${results.length}`);
});

Node.js: Очередь воркеров с Backpressure

// npm install p-queue
const PQueue = require('p-queue').default;
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const queue = new PQueue({
  concurrency: 25,
  intervalCap: 10,   // Max 10 requests...
  interval: 1000,    // ...per second (rate limiting)
});
queue.on('active', () => {
  console.log(`Active: ${queue.pending} pending, ${queue.size} queued`);
});
function createAgent() {
  const sessionId = crypto.randomBytes(4).toString('hex');
  return new HttpsProxyAgent(
    `http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
  );
}
async function processUrl(url) {
  const agent = createAgent();
  const response = await fetch(url, { agent, signal: AbortSignal.timeout(30000) });
  return { url, status: response.status, body: await response.text() };
}
// Add URLs to the queue
const urls = Array.from({ length: 1000 }, (_, i) =>
  `https://example.com/page/${i + 1}`
);
const results = await Promise.all(
  urls.map(url => queue.add(() => processUrl(url)))
);
console.log(`Completed: ${results.filter(r => r.status === 200).length}/${results.length}`);

Go: Горутины с семафором

Горутины Go легковесны, но конкурентность всё равно нужно ограничивать, чтобы не перегрузить прокси-соединения. Семафор на основе канала — идиоматический подход.

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"
)
const maxConcurrency = 50
type Result struct {
	URL     string
	Status  int
	Length  int
	Latency time.Duration
	Error   error
}
func newProxyClient() *http.Client {
	b := make([]byte, 4)
	rand.Read(b)
	sessionID := hex.EncodeToString(b)
	proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
	proxyURL, _ := url.Parse(proxyStr)
	return &http.Client{
		Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
		Timeout:   30 * time.Second,
	}
}
func fetchURL(target string, sem chan struct{}, wg *sync.WaitGroup, results chan<- Result) {
	defer wg.Done()
	sem <- struct{}{}        // Acquire semaphore
	defer func() { <-sem }() // Release semaphore
	client := newProxyClient()
	start := time.Now()
	resp, err := client.Get(target)
	if err != nil {
		results <- Result{URL: target, Error: err, Latency: time.Since(start)}
		return
	}
	defer resp.Body.Close()
	body, _ := io.ReadAll(resp.Body)
	results <- Result{
		URL:     target,
		Status:  resp.StatusCode,
		Length:  len(body),
		Latency: time.Since(start),
	}
}
func main() {
	urls := make([]string, 500)
	for i := range urls {
		urls[i] = fmt.Sprintf("https://example.com/item/%d", i+1)
	}
	sem := make(chan struct{}, maxConcurrency)
	results := make(chan Result, len(urls))
	var wg sync.WaitGroup
	start := time.Now()
	for _, u := range urls {
		wg.Add(1)
		go fetchURL(u, sem, &wg, results)
	}
	// Close results channel when all goroutines finish
	go func() {
		wg.Wait()
		close(results)
	}()
	var success, failed int
	var totalLatency time.Duration
	for r := range results {
		if r.Error != nil {
			failed++
		} else {
			success++
			totalLatency += r.Latency
		}
	}
	elapsed := time.Since(start)
	fmt.Printf("Completed in %s\n", elapsed)
	fmt.Printf("Success: %d, Failed: %d\n", success, failed)
	fmt.Printf("Avg latency: %s\n", totalLatency/time.Duration(max(success, 1)))
	fmt.Printf("Throughput: %.1f req/s\n", float64(success+failed)/elapsed.Seconds())
}

Go: Worker Pool с каналами

Для более структурированной обработки используйте фиксированный пул воркеров, потребляющих из канала.

package main
import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"sync"
	"time"
)
type Job struct {
	URL string
}
type JobResult struct {
	URL     string
	Status  int
	Body    string
	Latency time.Duration
	Err     error
}
func worker(id int, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		b := make([]byte, 4)
		rand.Read(b)
		sessionID := hex.EncodeToString(b)
		proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
		proxyURL, _ := url.Parse(proxyStr)
		client := &http.Client{
			Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
			Timeout:   30 * time.Second,
		}
		start := time.Now()
		resp, err := client.Get(job.URL)
		latency := time.Since(start)
		if err != nil {
			results <- JobResult{URL: job.URL, Err: err, Latency: latency}
			continue
		}
		body, _ := io.ReadAll(resp.Body)
		resp.Body.Close()
		results <- JobResult{
			URL:     job.URL,
			Status:  resp.StatusCode,
			Body:    string(body),
			Latency: latency,
		}
	}
}
func main() {
	numWorkers := 30
	urls := make([]string, 300)
	for i := range urls {
		urls[i] = fmt.Sprintf("https://example.com/page/%d", i+1)
	}
	jobs := make(chan Job, len(urls))
	results := make(chan JobResult, len(urls))
	var wg sync.WaitGroup
	// Start workers
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go worker(i, jobs, results, &wg)
	}
	// Send jobs
	for _, u := range urls {
		jobs <- Job{URL: u}
	}
	close(jobs)
	// Collect results
	go func() {
		wg.Wait()
		close(results)
	}()
	var success, failed int
	for r := range results {
		if r.Err != nil {
			failed++
		} else {
			success++
		}
	}
	fmt.Printf("Success: %d, Failed: %d\n", success, failed)
}

Выбор правильного уровня конкурентности

Оптимальная конкурентность зависит от нескольких факторов. Вот практическое руководство для начала:

Тип целиРекомендуемая конкурентностьПричина
Лёгкие API (JSON)50-200Быстрые ответы, мало памяти на запрос
Стандартные веб-страницы20-50Умеренный размер ответов, возможные ограничения
Тяжёлые JS-страницы5-15Контексты браузера используют много памяти
Сайты с агрессивным антиботом5-10Нужны реалистичные интервалы между запросами
Загрузка больших файлов5-20Ограничение по пропускной способности, не по CPU
Начните с 10 параллельных запросов и увеличивайте постепенно, мониторя процент успеха. Если процент успеха падает ниже 90%, уменьшите конкурентность или добавьте задержки между запросами. Подробнее об отслеживании этих метрик читайте в нашем руководстве Мониторинг производительности прокси.

Для переиспользуемой абстракции прокси со встроенной конкурентностью смотрите Создание промежуточного слоя прокси. Для сквозной архитектуры скрапинга читайте Проектирование надёжной архитектуры скрапинга. Изучите Python SDK, Node SDK и Go SDK для продакшн-интеграции с прокси, или ознакомьтесь с тарифами ProxyHat для начала работы.

Часто задаваемые вопросы

Готовы начать?

Доступ к более чем 50 млн резидентных IP в 148+ странах с AI-фильтрацией.

Смотреть ценыРезидентные прокси
← Вернуться в Блог