Beta notice: APIs and behavior may change; expect sharp edges while things settle.
- What is aioscraper?
- Key Features
- Installation
- Quick Start
- Examples
- Why aioscraper?
- Use Cases
- Performance
- Documentation
- Changelog
- Contributing
aioscraper is an async Python framework designed for mass data collection from APIs and external services at scale.
Built for:
- Fetching data from hundreds/thousands of REST API endpoints concurrently
- Integrating multiple external services (payment gateways, analytics APIs, etc.)
- Building data aggregation pipelines from heterogeneous API sources
- Queue-based scraping workers consuming tasks from Redis/RabbitMQ
- Microservice fan-out requests with automatic rate limiting and retries
NOT built for:
- Parsing HTML/CSS (but nothing stops you from using BeautifulSoup if you want - see examples/quotes.py)
- Single API requests (use httpx or aiohttp directly)
- GraphQL or WebSocket scraping (different paradigm)
Think: "I need to fetch data from 10,000 product API endpoints" or "I need to poll 50 microservices every minute" → aioscraper is for you.
- Async-first core with pluggable HTTP backends (
aiohttp/httpx) andaiojobsscheduling - Declarative flow: requests → callbacks → pipelines, with middleware hooks at each stage
- Priority queueing plus configurable concurrency limits per group
- Adaptive rate limiting with EWMA + AIMD algorithm - automatically backs off on server overload
- Small, explicit API that is easy to test and compose with existing async applications
Choose your HTTP backend:
# Option 1: Use aiohttp (recommended for most cases)
pip install "aioscraper[aiohttp]"
# Option 2: Use httpx (if you prefer httpx ecosystem)
pip install "aioscraper[httpx]"
# Option 3: Install both backends for flexibility
pip install "aioscraper[aiohttp,httpx]"Create scraper.py:
import logging
from aioscraper import AIOScraper, Request, Response, SendRequest, Pipeline
from dataclasses import dataclass
logger = logging.getLogger("github_repos")
scraper = AIOScraper()
@dataclass(slots=True)
class RepoStats:
"""Data model for extracted repository stats."""
name: str
stars: int
language: str
# this decorator registers this pipeline to handle RepoStats items
@scraper.pipeline(RepoStats)
class StatsPipeline:
"""Pipeline for processing extracted repository data."""
def __init__(self):
self.total_stars = 0
async def put_item(self, item: RepoStats) -> RepoStats:
"""
Called for each extracted item.
This is where you'd:
- Save to database
- Send to message queue
- Perform validation/transformation
- Aggregate statistics
"""
self.total_stars += item.stars
logger.info("✓ %s: ⭐ %s (%s)", item.name, item.stars, item.language)
return item
async def close(self):
"""
Called when scraper shuts down.
Use for:
- Final aggregations
- Closing database connections
- Cleanup operations
"""
logger.info("Total stars collected: %s", self.total_stars)
# this decorator marks this as the scraper's entry point.
@scraper
async def get_repos(send_request: SendRequest):
"""
Entry point: defines what to scrape.
Receives send_request - a function to schedule HTTP requests.
"""
repos = (
"django/django",
"fastapi/fastapi",
"pallets/flask",
"encode/httpx",
"aio-libs/aiohttp",
)
for repo in repos:
await send_request(
Request(
url=f"https://api.github.com/repos/{repo}", # API endpoint
callback=parse_repo, # Success handler
errback=on_failure, # Error handler (network failures, timeouts)
cb_kwargs={"repo": repo}, # Additional arguments to pass to callbacks
headers={"Accept": "application/vnd.github+json"}, # Required by GitHub API
)
)
async def parse_repo(response: Response, pipeline: Pipeline):
"""
Success callback: parse response and extract data.
The `pipeline` dependency is automatically injected by aioscraper.
"""
data = await response.json() # Parse JSON response from API
await pipeline( # Send extracted item to pipeline
RepoStats(
name=data["full_name"],
stars=data["stargazers_count"],
language=data.get("language", "Unknown"),
)
)
async def on_failure(exc: Exception, repo: str):
"""
Error callback: handle request/processing failures.
Use for:
- Logging errors
- Sending alerts
- Custom retry logic
"""
logger.error("%s: cannot parse response: %s", repo, exc)Run it:
aioscraper scraperWhat's happening?
@scraperregisters your entry point@scraper.pipelineregisters a pipeline for processing extracted datasend_request()schedules multiple API requests concurrently with automatic queuingcallback=parse_repoprocesses successful responses,errback=on_failurehandles errors
Recommendation:
Configure retries, rate limiting, concurrency via environment variables for production use.
See the examples/ directory for fully commented code demonstrating.
vs Scrapy:
- Scrapy is built for HTML scraping with CSS/XPath selectors and website crawling
- aioscraper is optimized for API data collection (JSON, REST, microservices)
- Native asyncio (no Twisted), modern type hints, minimal footprint
- Easily embeds into existing async applications
vs httpx/aiohttp directly:
- Manual approach: you handle rate limiting, retries, queuing, concurrency, backpressure
- aioscraper: adaptive rate limits, priority queues, pipelines, middleware out of the box
- Declarative Request → callback → pipeline instead of imperative control flow
vs building custom async workers:
- Less boilerplate: focus on business logic, not infrastructure
- Production-ready components: EWMA+AIMD rate limiting, graceful shutdown, dependency injection
- Testable: explicit dependencies, no global state, easy mocking
When to use aioscraper:
- Collecting data from 100+ API endpoints
- Fan-out calls to microservices for data enrichment
- Queue consumers processing API scraping tasks
- API aggregation/monitoring pipelines
- High-throughput data collection jobs
Poll 10,000 product API endpoints across multiple marketplaces:
- Adaptive rate limiting prevents bans
- Priority queue for trending products
- Pipeline aggregates prices → saves to DB → sends alerts on changes
Collect real-time prices from 20+ exchange APIs:
- Concurrent requests with per-exchange rate limits
- Built-in retry for transient failures
- Pipeline normalizes data formats → writes to time-series DB
Your FastAPI app needs data from 50 internal services:
- Embed aioscraper in your async application
- Fan-out concurrent requests with backpressure control
- Middleware for auth, logging, circuit breaking
Distributed architecture with Redis/RabbitMQ/SQS:
- Message queue publishes scraping tasks (URLs + params)
- aioscraper workers consume queue → fetch data → process
- Pipeline acknowledges messages after successful processing
Aggregate user stats from Twitter, LinkedIn, GitHub APIs:
- Different rate limits per platform (adaptive throttling)
- Error callbacks for quota exceeded / auth failures
- Pipeline deduplicates → enriches → stores to database
Collect point-in-time data from 500+ API sources simultaneously:
- Health monitoring: poll status endpoints of distributed services every minute
- Market data: snapshot prices from 200+ suppliers at exact intervals
- Analytics aggregation: fetch metrics from dozens of analytics APIs on schedule
- Concurrent execution with precise timing and automatic retries for failed sources
Benchmarks show stable throughput across CPython 3.11–3.14 (see benchmarks)
Full documentation at aioscraper.readthedocs.io
See CHANGELOG.md for version history and release notes.
Please see the Contributing guide for workflow, tooling, and review expectations.
MIT License
Copyright (c) 2025 darkstussy
