网络爬虫速率限制详解

费率如何限制工作,场地如何检测刮刮机,以及维持在限额内的实际战略。 包括适应性节流码和分布率限制模式.

网络爬虫速率限制详解

什么是速率限制?

速率限制是网站用来控制任何单一客户端能够提出多快请求的无形墙壁. 当你冲破一个网站时,你会撞到这些墙壁,其后果从暂时的减速到永久的知识产权禁令。 了解速率如何限制工作,他们如何检测你,如何在速率下停留,对于建造可靠地提供数据的刮刮机至关重要.

这个指南解释了限制速率背后的力学, 检测信号网站的使用, 以及适应性节流的实际策略,

关于用代理人刮刮的更广泛的概述,见我们 完整网页搜索代理指南中,将 " 避免 " 改为 " 避免 " 。 如何在不受阻的情况下搜索网站。 。 。

如何限制利率

网站在多个层执行速率限制,每个层都有不同的检测颗粒性:

第1层:基于IP的费率限制

最常见的办法。 服务器在一个时间窗口内跟踪每个IP地址的请求. 超过阈值后, 您收到 HTTP 429( 太多请求) 或 503 回复 。

# Typical rate limit behavior
Request 1-50:    HTTP 200 (normal)
Request 51:      HTTP 429 (rate limited)
Wait 60 seconds...
Request 52:      HTTP 200 (reset)

第2层:会话/基于 Cookie 的限制

音轨要求每个会话的频率或浏览器 cookie. 即使你旋转了IP,同一会话符会快速击中服务器会触发限制.

第3层:基于账户的限制

对于需要登录的站点,无论IP,限制都会与用户账户绑定. 在APIs和SaaS平台上常见.

第4层:行为分析

Cloudflare,PerimeterX,Akamai等高级系统分析行为模式:请求时间,导航流量,鼠标移动(在浏览器背景中). 这个层最难绕过,因为它不依赖简单的计数器.

通用速率限制检测信号

网站同时使用多个信号检测自动刮刮:

信号它检测到什么难以撤离
每分钟一次原始速度简单( 使用代理)
每小时每个IP的要求持续数量中度(旋转式实施伙伴)
要求时间符合规定类似机器的间隔介质( 加热)
缺少/错误头非浏览客户端简单( 设置正则信头)
顺序 URL 模式系统爬行中 (随机排序)
TLS 指纹库对浏览器硬度( 使用真实浏览器)
JavaScript 执行无头浏览器硬度( 高级配置)
鼠标/键盘事件瓶装行为很难

在我们的指南中更多地了解检测机制 反毒系统如何检测代理。 。 。

HTTP 响应代码 限制信号速率

知道哪个HTTP代码表示速率限制帮助您建立正确的重试逻辑 :

代码含义行动
200(与CAPTCHA) (中文(简体) ).软块——挑战页面已服务旋转 IP, 慢点
403 禁止IP 或会话被屏蔽立即旋转 IP
429个过多的请求明确的费率限额等待并重试后退
503 服务不可用服务器超载或屏蔽后退, 检查是否被屏蔽
302/307改为CAPTCHA URL重定向挑战旋转 IP, 降低速度

战略1:尊重呼啸

最简单的方法——将你的请求率远远低于目标允许的程度. 这意味着故障减少、带宽浪费减少以及更可持续的刮刮。

import requests
import time
import random
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
def respectful_scrape(urls: list[str], rpm_limit: int = 10) -> list[str]:
    """Scrape URLs while respecting a requests-per-minute limit."""
    delay = 60.0 / rpm_limit
    results = []
    for url in urls:
        try:
            resp = requests.get(
                url,
                proxies={"http": PROXY, "https": PROXY},
                timeout=30
            )
            results.append(resp.text if resp.status_code == 200 else None)
        except requests.RequestException:
            results.append(None)
        # Add delay with random jitter (±30%) to look less robotic
        jitter = delay * random.uniform(0.7, 1.3)
        time.sleep(jitter)
    return results

战略2:适应性调压

而不是固定的速率,根据收到的响应来动态调整速度. 一切正常时加快速度,看到警示标志时放慢速度.

Python 执行

import requests
import time
import random
from dataclasses import dataclass, field
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
@dataclass
class AdaptiveThrottle:
    """Automatically adjusts request rate based on server responses."""
    base_delay: float = 2.0      # seconds between requests
    min_delay: float = 0.5
    max_delay: float = 30.0
    current_delay: float = 2.0
    success_streak: int = 0
    warning_codes: set = field(default_factory=lambda: {429, 403, 503})
    def on_success(self):
        self.success_streak += 1
        # Speed up after 10 consecutive successes
        if self.success_streak >= 10:
            self.current_delay = max(self.current_delay * 0.85, self.min_delay)
            self.success_streak = 0
    def on_rate_limit(self):
        self.success_streak = 0
        # Double the delay on rate limit
        self.current_delay = min(self.current_delay * 2.0, self.max_delay)
    def on_block(self):
        self.success_streak = 0
        # Aggressive backoff on block
        self.current_delay = min(self.current_delay * 3.0, self.max_delay)
    def wait(self):
        jitter = self.current_delay * random.uniform(0.7, 1.3)
        time.sleep(jitter)
def scrape_adaptive(urls: list[str]) -> list[dict]:
    throttle = AdaptiveThrottle()
    results = []
    for url in urls:
        try:
            resp = requests.get(
                url,
                proxies={"http": PROXY, "https": PROXY},
                timeout=30
            )
            if resp.status_code == 200:
                throttle.on_success()
                results.append({"url": url, "status": 200, "body": resp.text})
            elif resp.status_code == 429:
                throttle.on_rate_limit()
                # Check Retry-After header
                retry_after = int(resp.headers.get("Retry-After", 0))
                if retry_after:
                    time.sleep(retry_after)
                results.append({"url": url, "status": 429, "body": None})
            elif resp.status_code == 403:
                throttle.on_block()
                results.append({"url": url, "status": 403, "body": None})
            else:
                results.append({"url": url, "status": resp.status_code, "body": resp.text})
        except requests.RequestException as e:
            throttle.on_block()
            results.append({"url": url, "status": 0, "error": str(e)})
        throttle.wait()
        print(f"Current delay: {throttle.current_delay:.1f}s")
    return results

节点.js 执行

const HttpsProxyAgent = require('https-proxy-agent');
const fetch = require('node-fetch');
class AdaptiveThrottle {
  constructor() {
    this.currentDelay = 2000; // ms
    this.minDelay = 500;
    this.maxDelay = 30000;
    this.successStreak = 0;
  }
  onSuccess() {
    this.successStreak++;
    if (this.successStreak >= 10) {
      this.currentDelay = Math.max(this.currentDelay * 0.85, this.minDelay);
      this.successStreak = 0;
    }
  }
  onRateLimit() {
    this.successStreak = 0;
    this.currentDelay = Math.min(this.currentDelay * 2, this.maxDelay);
  }
  onBlock() {
    this.successStreak = 0;
    this.currentDelay = Math.min(this.currentDelay * 3, this.maxDelay);
  }
  async wait() {
    const jitter = this.currentDelay * (0.7 + Math.random() * 0.6);
    return new Promise(resolve => setTimeout(resolve, jitter));
  }
}
async function scrapeAdaptive(urls) {
  const throttle = new AdaptiveThrottle();
  const agent = new HttpsProxyAgent('http://USERNAME:PASSWORD@gate.proxyhat.com:8080');
  const results = [];
  for (const url of urls) {
    try {
      const res = await fetch(url, { agent, timeout: 30000 });
      if (res.ok) {
        throttle.onSuccess();
        results.push({ url, status: res.status, body: await res.text() });
      } else if (res.status === 429) {
        throttle.onRateLimit();
        const retryAfter = parseInt(res.headers.get('retry-after') || '0');
        if (retryAfter) await new Promise(r => setTimeout(r, retryAfter * 1000));
        results.push({ url, status: 429, body: null });
      } else if (res.status === 403) {
        throttle.onBlock();
        results.push({ url, status: 403, body: null });
      }
    } catch (err) {
      throttle.onBlock();
      results.push({ url, status: 0, error: err.message });
    }
    await throttle.wait();
    console.log(`Current delay: ${throttle.currentDelay.toFixed(0)}ms`);
  }
  return results;
}

战略3:分配比率限制

在平行运行多个刮刮机时,协调所有工人的费率限制。 由于没有协调,每个工人都遵守自己的限制,但总的交通量仍然超过目标。

import requests
import time
import threading
class DistributedRateLimiter:
    """Thread-safe rate limiter for multiple scraper workers."""
    def __init__(self, max_rpm: int):
        self.min_interval = 60.0 / max_rpm
        self.lock = threading.Lock()
        self.last_request_time = 0.0
    def acquire(self):
        """Block until it is safe to make the next request."""
        with self.lock:
            now = time.time()
            elapsed = now - self.last_request_time
            if elapsed < self.min_interval:
                time.sleep(self.min_interval - elapsed)
            self.last_request_time = time.time()
# Shared limiter across all threads
limiter = DistributedRateLimiter(max_rpm=30)
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
def worker(urls: list[str], results: list):
    for url in urls:
        limiter.acquire()
        try:
            resp = requests.get(
                url,
                proxies={"http": PROXY, "https": PROXY},
                timeout=30
            )
            results.append({"url": url, "status": resp.status_code})
        except Exception as e:
            results.append({"url": url, "error": str(e)})

战略4:要求优先排队

对于复杂的刮切项目,使用管理每个目标域的速率限制的优先排队:

import requests
import time
import heapq
import threading
from collections import defaultdict
PROXY = "http://USERNAME:PASSWORD@gate.proxyhat.com:8080"
class DomainRateLimiter:
    """Per-domain rate limiting with priority queue."""
    def __init__(self, default_rpm: int = 10):
        self.default_rpm = default_rpm
        self.domain_limits = {}          # domain -> max RPM
        self.domain_last = defaultdict(float)  # domain -> last request time
        self.lock = threading.Lock()
    def set_limit(self, domain: str, rpm: int):
        self.domain_limits[domain] = rpm
    def wait_for_domain(self, domain: str):
        rpm = self.domain_limits.get(domain, self.default_rpm)
        min_interval = 60.0 / rpm
        with self.lock:
            now = time.time()
            elapsed = now - self.domain_last[domain]
            if elapsed < min_interval:
                time.sleep(min_interval - elapsed)
            self.domain_last[domain] = time.time()
# Configure per-domain limits
limiter = DomainRateLimiter(default_rpm=10)
limiter.set_limit("amazon.com", 3)        # Very conservative for Amazon
limiter.set_limit("example.com", 30)      # Lenient for simple sites
limiter.set_limit("google.com", 5)        # Moderate for Google

读取速率提示的 Robots.txt

许多网站在机器人.txt中发布他们的爬行喜好. 那个 Crawl-delay 指令告诉你请求之间的最小秒 :

import requests
from urllib.parse import urlparse
from urllib.robotparser import RobotFileParser
def get_crawl_delay(base_url: str, user_agent: str = "*") -> float | None:
    """Extract Crawl-delay from robots.txt."""
    parsed = urlparse(base_url)
    robots_url = f"{parsed.scheme}://{parsed.netloc}/robots.txt"
    try:
        resp = requests.get(robots_url, timeout=10)
        if resp.status_code != 200:
            return None
        rp = RobotFileParser()
        rp.parse(resp.text.splitlines())
        delay = rp.crawl_delay(user_agent)
        return delay
    except Exception:
        return None
# Check before scraping
delay = get_crawl_delay("https://example.com")
if delay:
    print(f"Site requests {delay}s between requests")
else:
    print("No crawl-delay specified")

共同费率限制错误

  • 忽略429个回应. 许多刮刮机对所有非200个响应的处理相同. 一张429 告诉你到底发生了什么——使用"Retrich-After Header"并退后.
  • 固定的延迟而不紧张。 每2000秒就有一个请求 看上去像机器人 在您的延迟中添加随机变化( 抖动) 。
  • 不协调平行工人。 5名工人每人做10个RPM等于总共50个RPM. 使用共享费率限制符 。
  • 旋转IP而不减速. IP旋转会给你争取时间,但如果每个新的IP立即敲击网站,高级检测仍然会抓住你. 将旋转与适当的节奏相结合。
  • 高峰时段打扫. 在高流量期间,网站的收费率受到限制,因而更加活跃。 目标时区在非高峰时段的重爬行

要计算您需要多少代理来支持您限定费率的刮切,请查看 需要多少代理吗?中,将“补充限制费率的代理轮调战略”改为“补充限制费率的代理轮调战略”。 大型碎屑代用旋转策略。 。 。

开始使用 : 代理汉字 Python SDK 或探索 定价计划 你的项目。

经常被问到的问题

我超过标准会怎么样?

回应取决于网站. 大部分回HTTP 429带有一个重试后头. 有人为CAPTCHA服务. 攻击性网站立即以403响应屏蔽IP. 在最坏的情况下,一再违反导致永久的知识产权禁令。

我怎么才能找到一个网站的利率上限?

在监测响应代码的同时,开始缓慢并逐步增加. 请检查access-date=中的日期值 (帮助) Robots.txt发布Crawl-delay指令. 观测X-RateLimit-Limit和X-RateLimit-保留字段的响应头. 一些API在文档中公布其限制.

是否使用绕行率限制 ?

代理在多个IP间分配请求,因此每个IP都停留在每IP的限制下. 然而,精密的网站也跟踪会话,指纹和行为模式. 代理是必要的,但还不够——将它们与适当的节奏和现实的要求模式结合起来。

刮刮最安全的要求率是多少?

没有普遍答案。 对于像Google或亚马逊这样的攻击性目标,每个IP每分钟1-5个请求是安全的. 对于受轻度保护的场所,每个IP20-60 RPM可能可行. 总是根据观察到的成功率开始保守和增加。

准备开始了吗?

通过AI过滤访问148多个国家的5000多万个住宅IP。

查看价格住宅代理
← 返回博客