Why Concurrency Control Matters for Proxy Scraping
Sending requests sequentially through a proxy wastes bandwidth and time. Sending them all at once overwhelms the proxy gateway, the target server, and your own system. Concurrency control strikes the balance — maximizing throughput while staying within the limits of your proxy pool, target site tolerance, and available resources.
This guide covers production-grade concurrency patterns in three languages: Python (asyncio), Node.js (Promise pools), and Go (goroutines with semaphores). Every example uses ProxyHat's rotating residential proxies and is ready to copy into your projects.
The goal of concurrency control is simple: maximize requests per second without triggering blocks, exhausting memory, or crashing your process. The right pattern depends on your language, target site, and scale.
Concurrency Patterns Compared
| Pattern | Language | Best For | Max Concurrency |
|---|---|---|---|
| asyncio.Semaphore | Python | I/O-bound scraping | 50-200 per process |
| Worker Pool (asyncio) | Python | Task queues with backpressure | 10-100 workers |
| Promise.all + batching | Node.js | Simple parallel fetching | 50-500 per process |
| p-limit / p-queue | Node.js | Fine-grained concurrency | 10-200 per queue |
| Goroutines + Semaphore | Go | High-throughput scraping | 100-1000+ |
| Worker Pool (Go channels) | Go | Structured task distribution | 10-500 workers |
Python: asyncio Semaphore
The simplest and most effective concurrency pattern in Python. A semaphore limits how many coroutines can execute concurrently, preventing resource exhaustion.
import asyncio
import aiohttp
import uuid
import time
PROXY_GATEWAY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
MAX_CONCURRENCY = 50
TIMEOUT = aiohttp.ClientTimeout(total=30)
async def fetch(session: aiohttp.ClientSession, url: str, semaphore: asyncio.Semaphore) -> dict:
async with semaphore:
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
start = time.time()
try:
async with session.get(url, proxy=proxy, timeout=TIMEOUT) as response:
body = await response.text()
return {
"url": url,
"status": response.status,
"length": len(body),
"latency": round(time.time() - start, 3),
}
except Exception as e:
return {"url": url, "error": str(e), "latency": round(time.time() - start, 3)}
async def scrape_all(urls: list[str]) -> list[dict]:
semaphore = asyncio.Semaphore(MAX_CONCURRENCY)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
urls = [f"https://example.com/product/{i}" for i in range(1000)]
results = asyncio.run(scrape_all(urls))
success = sum(1 for r in results if "error" not in r)
print(f"Completed: {success}/{len(results)} successful")
print(f"Avg latency: {sum(r['latency'] for r in results) / len(results):.3f}s")
Python: Worker Pool with Backpressure
When you need more control — rate limiting, backpressure, or priority scheduling — use a worker pool with an asyncio.Queue.
import asyncio
import aiohttp
import uuid
class WorkerPool:
"""Fixed-size worker pool with backpressure via bounded queue."""
def __init__(self, num_workers: int = 20, queue_size: int = 100):
self.num_workers = num_workers
self.queue: asyncio.Queue = asyncio.Queue(maxsize=queue_size)
self.results: list = []
self.stats = {"success": 0, "failed": 0, "total_latency": 0.0}
self._stop = False
async def worker(self, session: aiohttp.ClientSession, worker_id: int):
while not self._stop:
try:
url = await asyncio.wait_for(self.queue.get(), timeout=5.0)
except asyncio.TimeoutError:
if self._stop:
break
continue
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
import time
start = time.time()
try:
async with session.get(
url, proxy=proxy,
timeout=aiohttp.ClientTimeout(total=30)
) as response:
body = await response.text()
latency = time.time() - start
self.stats["success"] += 1
self.stats["total_latency"] += latency
self.results.append({
"url": url, "status": response.status,
"length": len(body), "worker": worker_id,
})
except Exception as e:
self.stats["failed"] += 1
self.results.append({"url": url, "error": str(e), "worker": worker_id})
finally:
self.queue.task_done()
async def run(self, urls: list[str]) -> list[dict]:
async with aiohttp.ClientSession() as session:
# Start workers
workers = [
asyncio.create_task(self.worker(session, i))
for i in range(self.num_workers)
]
# Feed URLs into the queue (backpressure: blocks when queue is full)
for url in urls:
await self.queue.put(url)
# Wait for all tasks to complete
await self.queue.join()
self._stop = True
# Cancel workers
for w in workers:
w.cancel()
return self.results
# Usage
pool = WorkerPool(num_workers=30, queue_size=50)
urls = [f"https://example.com/item/{i}" for i in range(500)]
results = asyncio.run(pool.run(urls))
print(f"Success: {pool.stats['success']}, Failed: {pool.stats['failed']}")
avg_lat = pool.stats["total_latency"] / max(pool.stats["success"], 1)
print(f"Avg latency: {avg_lat:.3f}s")
Python: Rate Limiter
Some targets enforce strict rate limits. This token-bucket rate limiter integrates with the concurrency patterns above.
import asyncio
import time
class RateLimiter:
"""Token-bucket rate limiter for async operations."""
def __init__(self, rate: float, burst: int = 1):
"""
Args:
rate: Requests per second
burst: Maximum burst size
"""
self.rate = rate
self.burst = burst
self.tokens = burst
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens < 1:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0
else:
self.tokens -= 1
# Combined with semaphore
async def fetch_rate_limited(session, url, semaphore, limiter):
async with semaphore:
await limiter.acquire()
session_id = uuid.uuid4().hex[:8]
proxy = f"http://USERNAME-session-{session_id}:PASSWORD@gate.proxyhat.com:8080"
async with session.get(url, proxy=proxy, timeout=TIMEOUT) as resp:
return await resp.text()
# 10 requests/second, max 30 concurrent
async def main():
semaphore = asyncio.Semaphore(30)
limiter = RateLimiter(rate=10.0, burst=5)
urls = [f"https://example.com/page/{i}" for i in range(200)]
async with aiohttp.ClientSession() as session:
tasks = [fetch_rate_limited(session, u, semaphore, limiter) for u in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = sum(1 for r in results if not isinstance(r, Exception))
print(f"Done: {success}/{len(results)}")
asyncio.run(main())
Node.js: Promise Batching
The simplest Node.js concurrency pattern processes URLs in fixed-size batches.
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const BATCH_SIZE = 20;
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function fetchUrl(url) {
const agent = createAgent();
const start = Date.now();
try {
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
const text = await response.text();
return {
url,
status: response.status,
length: text.length,
latency: Date.now() - start,
};
} catch (err) {
return { url, error: err.message, latency: Date.now() - start };
}
}
async function scrapeInBatches(urls) {
const results = [];
for (let i = 0; i < urls.length; i += BATCH_SIZE) {
const batch = urls.slice(i, i + BATCH_SIZE);
const batchResults = await Promise.all(batch.map(fetchUrl));
results.push(...batchResults);
const success = batchResults.filter(r => !r.error).length;
console.log(`Batch ${Math.floor(i / BATCH_SIZE) + 1}: ${success}/${batch.length} OK`);
}
return results;
}
// Usage
const urls = Array.from({ length: 200 }, (_, i) =>
`https://example.com/product/${i + 1}`
);
scrapeInBatches(urls).then(results => {
const success = results.filter(r => !r.error).length;
console.log(`Total: ${success}/${results.length} successful`);
});
Node.js: p-limit for Fine-Grained Control
For precise concurrency limits without manual batching, use the p-limit library.
// npm install p-limit
const pLimit = require('p-limit');
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const limit = pLimit(30); // Max 30 concurrent requests
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function fetchWithLimit(url) {
return limit(async () => {
const agent = createAgent();
const response = await fetch(url, {
agent,
signal: AbortSignal.timeout(30000),
});
return {
url,
status: response.status,
body: await response.text(),
};
});
}
// All 500 URLs start immediately, but only 30 run concurrently
const urls = Array.from({ length: 500 }, (_, i) =>
`https://example.com/item/${i + 1}`
);
Promise.all(urls.map(fetchWithLimit)).then(results => {
const success = results.filter(r => r.status === 200).length;
console.log(`Success: ${success}/${results.length}`);
});
Node.js: Worker Queue with Backpressure
// npm install p-queue
const PQueue = require('p-queue').default;
const { HttpsProxyAgent } = require('https-proxy-agent');
const crypto = require('crypto');
const queue = new PQueue({
concurrency: 25,
intervalCap: 10, // Max 10 requests...
interval: 1000, // ...per second (rate limiting)
});
queue.on('active', () => {
console.log(`Active: ${queue.pending} pending, ${queue.size} queued`);
});
function createAgent() {
const sessionId = crypto.randomBytes(4).toString('hex');
return new HttpsProxyAgent(
`http://USERNAME-session-${sessionId}:PASSWORD@gate.proxyhat.com:8080`
);
}
async function processUrl(url) {
const agent = createAgent();
const response = await fetch(url, { agent, signal: AbortSignal.timeout(30000) });
return { url, status: response.status, body: await response.text() };
}
// Add URLs to the queue
const urls = Array.from({ length: 1000 }, (_, i) =>
`https://example.com/page/${i + 1}`
);
const results = await Promise.all(
urls.map(url => queue.add(() => processUrl(url)))
);
console.log(`Completed: ${results.filter(r => r.status === 200).length}/${results.length}`);
Go: Goroutines with Semaphore
Go's goroutines are lightweight, but you still need to limit concurrency to avoid overwhelming proxy connections. A channel-based semaphore is the idiomatic approach.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
const maxConcurrency = 50
type Result struct {
URL string
Status int
Length int
Latency time.Duration
Error error
}
func newProxyClient() *http.Client {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
proxyURL, _ := url.Parse(proxyStr)
return &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
}
func fetchURL(target string, sem chan struct{}, wg *sync.WaitGroup, results chan<- Result) {
defer wg.Done()
sem <- struct{}{} // Acquire semaphore
defer func() { <-sem }() // Release semaphore
client := newProxyClient()
start := time.Now()
resp, err := client.Get(target)
if err != nil {
results <- Result{URL: target, Error: err, Latency: time.Since(start)}
return
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
results <- Result{
URL: target,
Status: resp.StatusCode,
Length: len(body),
Latency: time.Since(start),
}
}
func main() {
urls := make([]string, 500)
for i := range urls {
urls[i] = fmt.Sprintf("https://example.com/item/%d", i+1)
}
sem := make(chan struct{}, maxConcurrency)
results := make(chan Result, len(urls))
var wg sync.WaitGroup
start := time.Now()
for _, u := range urls {
wg.Add(1)
go fetchURL(u, sem, &wg, results)
}
// Close results channel when all goroutines finish
go func() {
wg.Wait()
close(results)
}()
var success, failed int
var totalLatency time.Duration
for r := range results {
if r.Error != nil {
failed++
} else {
success++
totalLatency += r.Latency
}
}
elapsed := time.Since(start)
fmt.Printf("Completed in %s\n", elapsed)
fmt.Printf("Success: %d, Failed: %d\n", success, failed)
fmt.Printf("Avg latency: %s\n", totalLatency/time.Duration(max(success, 1)))
fmt.Printf("Throughput: %.1f req/s\n", float64(success+failed)/elapsed.Seconds())
}
Go: Worker Pool with Channels
For more structured processing, use a fixed pool of workers consuming from a channel.
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type Job struct {
URL string
}
type JobResult struct {
URL string
Status int
Body string
Latency time.Duration
Err error
}
func worker(id int, jobs <-chan Job, results chan<- JobResult, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
b := make([]byte, 4)
rand.Read(b)
sessionID := hex.EncodeToString(b)
proxyStr := fmt.Sprintf("http://USERNAME-session-%s:PASSWORD@gate.proxyhat.com:8080", sessionID)
proxyURL, _ := url.Parse(proxyStr)
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
start := time.Now()
resp, err := client.Get(job.URL)
latency := time.Since(start)
if err != nil {
results <- JobResult{URL: job.URL, Err: err, Latency: latency}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- JobResult{
URL: job.URL,
Status: resp.StatusCode,
Body: string(body),
Latency: latency,
}
}
}
func main() {
numWorkers := 30
urls := make([]string, 300)
for i := range urls {
urls[i] = fmt.Sprintf("https://example.com/page/%d", i+1)
}
jobs := make(chan Job, len(urls))
results := make(chan JobResult, len(urls))
var wg sync.WaitGroup
// Start workers
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// Send jobs
for _, u := range urls {
jobs <- Job{URL: u}
}
close(jobs)
// Collect results
go func() {
wg.Wait()
close(results)
}()
var success, failed int
for r := range results {
if r.Err != nil {
failed++
} else {
success++
}
}
fmt.Printf("Success: %d, Failed: %d\n", success, failed)
}
Choosing the Right Concurrency Level
The optimal concurrency depends on several factors. Here is a practical starting-point guide:
| Target Type | Recommended Concurrency | Reason |
|---|---|---|
| Lightweight APIs (JSON) | 50-200 | Fast responses, low memory per request |
| Standard web pages | 20-50 | Moderate response sizes, some rate limiting |
| Heavy JS-rendered pages | 5-15 | Browser contexts use significant memory |
| Aggressive anti-bot sites | 5-10 | Need realistic timing between requests |
| Large file downloads | 5-20 | Bandwidth-bound, not CPU-bound |
Start with 10 concurrent requests and increase gradually while monitoring success rates. If your success rate drops below 90%, reduce concurrency or add delays between requests. For more on tracking these metrics, see our Monitoring Proxy Performance guide.
For a reusable proxy abstraction with built-in concurrency, see Building a Proxy Middleware Layer. For end-to-end scraping architecture, read Designing a Reliable Scraping Architecture. Explore the Python SDK, Node SDK, and Go SDK for production-ready proxy integration, or check ProxyHat pricing to get started.






