为何需要专门建筑
一个单行脚本击中一个网站,对于小任务来说效果很好. 但是,当你需要每天刮去数百万页 跨越数十个目标, 这个脚本成为一个瓶颈。 扩大废旧基础设施 需要从线性脚本向分布式,基于队列的架构移动,这些架构优雅地处理失败,管理代理旋转,并最大限度地实现吞吐量.
本指南涵盖了架构模式,队列系统,水平缩放策略,以及电动生产级刮刮规模的代理管理技术.
这篇文章以我们从 完整网页搜索代理指南。关于代理池大小,参见 需要多少代理吗?
可缩放划痕的架构模式
模式1:基于队列的拼写
可缩放刮刮的基础是一个 消息队列 从数据采集中解开URL发现. 工人从队列中提取任务,通过代理获取页面,并将结果推向存储.
# Architecture overview:
#
# URL Source → [Message Queue] → Worker 1 → [Results Store]
# → Worker 2 →
# → Worker N →
# ↓
# [Dead Letter Queue]
# (failed requests)这种模式的好处:
- 水平缩放 : 在不改变系统的情况下添加或删除工人
- 过失容忍度: 失败的任务返回队列进行重试
- 费率控制 : 调整工人人数以控制总体吞吐量
- 可见度 : 队列深度显示积压;完成率显示健康
用 Redis 队列执行 Python
import redis
import requests
import json
import time
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
REDIS_URL = "redis://localhost:6379/0"
@dataclass
class ScrapeTask:
url: str
target: str
priority: int = 0
retries: int = 0
max_retries: int = 3
class ScrapingQueue:
def __init__(self, redis_url: str):
self.redis = redis.from_url(redis_url)
self.queue_key = "scrape:tasks"
self.results_key = "scrape:results"
self.dlq_key = "scrape:dead_letter"
def enqueue(self, task: ScrapeTask):
self.redis.lpush(self.queue_key, json.dumps(task.__dict__))
def dequeue(self, timeout: int = 5) -> ScrapeTask | None:
result = self.redis.brpop(self.queue_key, timeout=timeout)
if result:
data = json.loads(result[1])
return ScrapeTask(**data)
return None
def store_result(self, url: str, data: dict):
self.redis.hset(self.results_key, url, json.dumps(data))
def send_to_dlq(self, task: ScrapeTask, error: str):
task_data = task.__dict__
task_data["error"] = error
self.redis.lpush(self.dlq_key, json.dumps(task_data))
@property
def pending_count(self) -> int:
return self.redis.llen(self.queue_key)
class ScrapingWorker:
def __init__(self, queue: ScrapingQueue, worker_id: int):
self.queue = queue
self.worker_id = worker_id
self.session = requests.Session()
self.session.proxies = {"http": PROXY, "https": PROXY}
def process_task(self, task: ScrapeTask) -> bool:
try:
resp = self.session.get(task.url, timeout=30)
if resp.status_code == 200:
self.queue.store_result(task.url, {
"status": 200,
"body": resp.text[:10000], # Truncate for storage
"target": task.target,
})
return True
elif resp.status_code in [429, 503]:
# Retry with backoff
if task.retries < task.max_retries:
task.retries += 1
time.sleep(2 ** task.retries)
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
else:
self.queue.send_to_dlq(task, f"HTTP {resp.status_code}")
except Exception as e:
if task.retries < task.max_retries:
task.retries += 1
self.queue.enqueue(task)
else:
self.queue.send_to_dlq(task, str(e))
return False
def run(self):
print(f"Worker {self.worker_id} started")
while True:
task = self.queue.dequeue(timeout=5)
if task:
self.process_task(task)
# Launch multiple workers
queue = ScrapingQueue(REDIS_URL)
# Enqueue tasks
for i in range(10000):
queue.enqueue(ScrapeTask(
url=f"https://example.com/product/{i}",
target="example.com"
))
# Start 10 workers
with ThreadPoolExecutor(max_workers=10) as executor:
workers = [ScrapingWorker(queue, i) for i in range(10)]
for worker in workers:
executor.submit(worker.run)节点.js 用牛排执行
const Queue = require('bull');
const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
// Create queue with Redis backend
const scrapeQueue = new Queue('scraping', 'redis://localhost:6379');
// Define the worker processor
scrapeQueue.process(10, async (job) => { // 10 concurrent workers
const { url, target } = job.data;
try {
const res = await fetch(url, { agent, timeout: 30000 });
if (res.ok) {
const body = await res.text();
return { url, status: res.status, body: body.slice(0, 10000) };
}
if (res.status === 429 || res.status === 503) {
throw new Error(`Rate limited: HTTP ${res.status}`);
}
return { url, status: res.status, body: null };
} catch (err) {
throw err; // Bull will retry based on job options
}
});
// Enqueue tasks with retry options
async function enqueueTasks(urls, target) {
for (const url of urls) {
await scrapeQueue.add(
{ url, target },
{
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 1000,
removeOnFail: false,
}
);
}
}
// Monitor progress
scrapeQueue.on('completed', (job, result) => {
console.log(`Done: ${result.url} (${result.status})`);
});
scrapeQueue.on('failed', (job, err) => {
console.error(`Failed: ${job.data.url} - ${err.message}`);
});图案2:管道结构
对于复杂的刮碎工作流程,使用 管道 每个阶段处理不同的问题:
# Pipeline stages:
#
# Stage 1: URL Discovery → finds pages to scrape
# Stage 2: Content Fetching → downloads pages via proxies
# Stage 3: Data Extraction → parses HTML, extracts data
# Stage 4: Data Validation → checks quality, deduplicates
# Stage 5: Storage → saves to database/warehouse执行
package main
import (
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
)
type ScrapeResult struct {
URL string `json:"url"`
Status int `json:"status"`
Body string `json:"body"`
}
func fetcher(urls <-chan string, results chan<- ScrapeResult, wg *sync.WaitGroup) {
defer wg.Done()
proxyURL, _ := url.Parse("http://USERNAME:PASSWORD@gate.proxyhat.com:8080")
client := &http.Client{
Transport: &http.Transport{Proxy: http.ProxyURL(proxyURL)},
Timeout: 30 * time.Second,
}
for u := range urls {
resp, err := client.Get(u)
if err != nil {
results <- ScrapeResult{URL: u, Status: 0}
continue
}
body, _ := io.ReadAll(resp.Body)
resp.Body.Close()
results <- ScrapeResult{URL: u, Status: resp.StatusCode, Body: string(body)}
}
}
func processor(results <-chan ScrapeResult, done chan<- bool) {
for result := range results {
if result.Status == 200 {
// Extract and store data
data, _ := json.Marshal(result)
fmt.Printf("Processed: %s (%d bytes)\n", result.URL, len(data))
}
}
done <- true
}
func main() {
urls := make(chan string, 1000)
results := make(chan ScrapeResult, 1000)
done := make(chan bool)
// Start 20 fetcher workers
var wg sync.WaitGroup
for i := 0; i < 20; i++ {
wg.Add(1)
go fetcher(urls, results, &wg)
}
// Start processor
go processor(results, done)
// Feed URLs
go func() {
for i := 0; i < 10000; i++ {
urls <- fmt.Sprintf("https://example.com/product/%d", i)
}
close(urls)
}()
// Wait for all fetchers, then close results
wg.Wait()
close(results)
<-done
}水平缩放策略
战略1:多机制部署
将工人分散在多个机器上. 队列充当协调点 :
| 构成部分 | 部署 | 缩放 |
|---|---|---|
| 队列( Redis/ RabbitMQ) | 专用服务器或管理服务 | 垂直( 更多内存) |
| 工人 | 多个机器或容器 | 水平( 添加实例) |
| 成果储存 | 数据库或对象存储 | 垂直加硬化 |
| 监测 | 集中式仪表板 | 单一实例 |
战略2:基于集装箱的扩大
使用Docker和Kubernetes进行弹性缩放. 每个工人在一个可以复制的容器中运行:
# docker-compose.yml for scraping workers
version: '3.8'
services:
redis:
image: redis:7-alpine
ports:
- "6379:6379"
scraper-worker:
build: .
environment:
- PROXY_URL=http://USERNAME:PASSWORD@gate.proxyhat.com:8080
- REDIS_URL=redis://redis:6379/0
- WORKER_CONCURRENCY=10
deploy:
replicas: 5 # 5 containers × 10 concurrency = 50 parallel requests
resources:
limits:
memory: 512M
cpus: '0.5'
depends_on:
- redis规模代理管理
在规模上,代理管理成为一个关键的系统组成部分. 主要考虑因素:
连接集合
重用代理网关的连接, 而不是每个请求创建新网关 。 这减少了延迟和连接间接费用:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session() -> requests.Session:
"""Create a session with connection pooling and retry logic."""
session = requests.Session()
# Connection pool: keep 20 connections, max 50
adapter = HTTPAdapter(
pool_connections=20,
pool_maxsize=50,
max_retries=Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503],
)
)
session.mount("http://", adapter)
session.mount("https://", adapter)
session.proxies = {
"http": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
"https": "http://USERNAME:PASSWORD@gate.proxyhat.com:8080",
}
return session
# Reuse across many requests
session = create_optimized_session()
for url in urls:
resp = session.get(url, timeout=30)卫生监测
实时监视您的代理性能, 以便及早发现问题 :
import time
from collections import defaultdict
from dataclasses import dataclass, field
@dataclass
class ProxyMetrics:
"""Track proxy health metrics for monitoring."""
requests_total: int = 0
requests_success: int = 0
requests_blocked: int = 0
requests_timeout: int = 0
latency_samples: list = field(default_factory=list)
status_codes: dict = field(default_factory=lambda: defaultdict(int))
def record_request(self, status_code: int, latency_ms: float):
self.requests_total += 1
self.status_codes[status_code] += 1
self.latency_samples.append(latency_ms)
if status_code == 200:
self.requests_success += 1
elif status_code in [403, 429]:
self.requests_blocked += 1
# Keep only last 1000 samples
if len(self.latency_samples) > 1000:
self.latency_samples = self.latency_samples[-1000:]
@property
def success_rate(self) -> float:
return self.requests_success / self.requests_total if self.requests_total else 0
@property
def avg_latency(self) -> float:
return sum(self.latency_samples) / len(self.latency_samples) if self.latency_samples else 0
def report(self) -> str:
return (
f"Total: {self.requests_total}, "
f"Success: {self.success_rate:.1%}, "
f"Blocked: {self.requests_blocked}, "
f"Avg Latency: {self.avg_latency:.0f}ms"
)数据按比例存储
| 存储类型 | 最佳时间 | 缩放 |
|---|---|---|
| 邮递SQL | 结构产品/定价数据 | 百万行数 |
| 蒙哥达 | 半结构/可变计划 | 10亿份文件 |
| S3/ 对象存储 | Raw HTML 归档 | 字节 |
| 弹性搜索 | 对已删除的数据进行全文搜索 | 10亿份文件 |
| 点击之家 | 对大数据集的分析 | 数以千计的行数 |
缩放检查列表
- 从获取中解析 URL 发现 。 在阶段之间使用信件队列 。
- 执行正确的重试逻辑 。 以死信队列表示持续失败的反向指示.
- 监视一切 排队深度,成功率,耐久性,每个目标域的错误率.
- 使用连接集合 。 重复使用代理连接, 而不是每个请求创建新的连接 。
- 计划失败。 工人坠机 代理被封锁 目标改变结构 建立每个层次的复原力。
- 发射前进行规模测试. 一个工作在100 RPM的系统可能因为内存,连接限制,或队列瓶颈而以10,000 RPM失败.
将辅助您缩放架构的代理旋转策略改为 大型碎屑代用旋转策略。要处理比例限制,请参见 速率限制解释。 。 。
使用 Python SDK 键盘, (中文). 节点 SDK,或 冲啊 SDK 用于生产代理集成,并探索 代理哈特计划 用于大容量刮刮。
经常被问到的问题
哪个排队系统最适合按级刮?
Redis with Bull (Node.js) 或 RQ (Python) 工作良好,每天可完成数百万项任务. 对于更大的规模,Apache Kafka或RabbitMQ提供更好的耐久性和吞吐量. 根据您现有的基础设施和团队专长来选择 。
我该跑多少个同时工作的工人?
从10到20名工人开始,并根据你的代理能力和目标站点耐受性进行规模化. 监测成功率——如果低于90%,在增加更多工人之前减少货币。 每个工人通过ProxyHat获得自动IP旋转.
我该给工人用电动或线织吗?
对于I/O式刮刮(大多数案例),ASync(Python asyncio,Node.js)比线程提供更好的资源效率. 仅在需要CPU重度解析并同时获取时使用线程或多处理. 去Goroutines 优异的两种模式。
我如何处理目标地点结构的变化?
在您的管道中执行数据验证 。 当解析数据失败验证( 丢失字段, 错误类型) 时, 提醒您的团队和队列会影响 URL , 以便使用更新的解析器进行再处理 。 版本您的解析器, 以便在需要时可以滚回 。






