|
All checks were successful
Publish to PyPI on Tag / Build and Publish (push) Successful in 25s
|
||
|---|---|---|
| .forgejo/workflows | ||
| src/more_asyncio | ||
| tests | ||
| .gitignore | ||
| .python-version | ||
| LICENSE | ||
| pyproject.toml | ||
| README.md | ||
| uv.lock | ||
more-asyncio
Async utilities the standard library forgot — bounded gather, retry with backoff,
async caching, rate limiting, async itertools, stream multiplexing — all in zero
dependencies stdlib-only Python.
pip install more-asyncio
Requires Python 3.10+.
Why
asyncio ships with the bare essentials. Everything else — bounded concurrency,
retry policies, async-aware caching, file-backed TTL caches, multi-stream
selectors — gets re-invented in every codebase. more-asyncio collects those
patterns into a coherent, fully-typed library, with no dependencies beyond stdlib.
Quick start
import asyncio
from more_asyncio import (
gather_per, map_per, retry, with_timeout,
achunked, acached, Throttler,
)
async def fetch(url: str) -> str: ...
# 1. Bounded gather — process 1000 URLs, 25 at a time
results = await gather_per(*(fetch(u) for u in urls), per=25)
# Same thing, more readable:
results = await map_per(fetch, urls, per=25)
# 2. Retry with exponential backoff + jitter
data = await retry(lambda: fetch(url), attempts=5, base_delay=0.5)
# 3. Timeout with fallback
data = await with_timeout(fetch(url), timeout=2.0, default=None)
# 4. Cache async results (LRU + optional TTL)
@acached(maxsize=128, ttl=60)
async def get_user(uid: int) -> dict: ...
# 5. Rate-limit external API calls (token bucket)
api = Throttler(rate=10, burst=20) # 10 req/s, burst 20
async with api:
await fetch(url)
What's inside
The library is organized into nine focused modules. Everything is also re-exported
from the top level — from more_asyncio import X always works.
concurrency — bounded concurrency primitives
| Symbol | What it does |
|---|---|
| `gather_per(*coros, per=10, mode="gather" | "queue")` |
map_per(func, items, per=10) |
map-style wrapper over gather_per. |
as_completed_per(*coros, per=10) |
Streams results as they finish (not in input order), with bounded concurrency. |
concurrency_limit(n) |
Decorator: cap concurrent calls of an async function to n. |
limited(semaphore) |
Async context manager: async with limited(sem): .... |
Throttler(rate, burst) |
Token-bucket rate limiter. Works as async with throttler: or await throttler.acquire(). |
# Stream results from slowest-first to fastest-last is annoying;
# stream them in completion order with a 10-way cap:
async for result in as_completed_per(*[fetch(u) for u in urls], per=10):
await save(result) # don't wait for the slow ones
resilience — retry, timeout, circuit breaker
| Symbol | What it does |
|---|---|
retry(coro_factory, *, attempts, base_delay, max_delay, exponential, jitter, retry_on, on_retry) |
Run a coroutine factory with exponential backoff + jitter. |
retryable(...) |
Decorator form of retry. |
with_timeout(coro, timeout, *, default=...) |
asyncio.wait_for with optional fallback value. |
CircuitBreaker(failure_threshold, recovery_timeout) |
Three-state breaker (closed / open / half-open). |
CircuitOpenError |
Raised by CircuitBreaker.call when the circuit is open. |
# Retry transient errors with backoff, give up after 5 tries
data = await retry(
lambda: client.get(url),
attempts=5,
base_delay=0.5,
retry_on=(ConnectionError, TimeoutError),
)
# Stop hammering a flaky API after 5 failures; cool off for 30 seconds
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
result = await breaker.call(lambda: client.get(url))
iteration — async itertools
Lazy operators (yield):
| Symbol | Stdlib counterpart |
|---|---|
achunked(iterable, size) |
— chunks any iterable (sync or async) into list[T] |
arebatch(async_iter, size, flatten=False) |
re-chunks an async iterable; O(N) buffer |
amap(func, async_iter) |
map (sync or async predicate) |
afilter(pred, async_iter) |
filter |
aenumerate(async_iter, start=0) |
enumerate |
achain(*async_iters) |
itertools.chain |
azip(*async_iters) |
zip (stops at shortest) |
azip_longest(*async_iters, fillvalue) |
itertools.zip_longest |
aslice(async_iter, start, stop, step) |
itertools.islice |
apairwise(async_iter) |
itertools.pairwise |
astarmap(func, async_iter) |
itertools.starmap |
atakewhile(pred, async_iter) |
itertools.takewhile |
adropwhile(pred, async_iter) |
itertools.dropwhile |
agroupby(async_iter, key=None) |
itertools.groupby (group materialized as list) |
Terminal aggregations (drain to value):
| Symbol | Behavior |
|---|---|
asum(async_iter, start=0) |
sum |
amin(async_iter, *, key, default) |
min |
amax(async_iter, *, key, default) |
max |
aany(async_iter) |
any (short-circuits) |
aall(async_iter) |
all (short-circuits) |
asorted(async_iter, *, key, reverse) |
sorted |
collect(async_iter) |
drain to list |
collect_chunks(async_iter) |
drain AsyncIterable[Iterable[T]] to flat list[T] |
collect_first(async_iter, default=None) |
first item or default |
collect_one(async_iter) |
first item or raise NotFoundError |
# Replace this:
for i in range(0, len(items), 100):
chunk = items[i:i + 100]
await session.add_all(chunk)
await session.flush()
# With this:
async for chunk in achunked(items, 100):
await session.add_all(chunk)
await session.flush()
streams — multiplexing & cleanup
| Symbol | What it does |
|---|---|
merge_iters(*async_iters, timeout_per_chunk=None, ignore_exceptions=()) |
Multiplex N async iterables; yields each item in completion order. |
cancel_pending_safely(*tasks, timeout=None) |
Cancel + drain pending tasks, swallowing exceptions during shutdown. |
# Merge results from multiple scrapers / consumers / WebSockets
async for job in merge_iters(
pracuj.iter_jobs(),
nofluff.iter_jobs(),
justjoin.iter_jobs(),
timeout_per_chunk=30.0,
ignore_exceptions=(RateLimitError,),
):
await db.save(job)
functools — async-aware function tools
Mirrors stdlib functools for async def.
| Symbol | What it does |
|---|---|
awraps(wrapped) |
functools.wraps that keeps asyncio.iscoroutinefunction truthy on def wrappers. |
apartial(func, *args, **kwargs) |
functools.partial that returns an async def (FastAPI/TaskIQ detect it correctly). |
acached(maxsize=128, ttl=None, key=None) |
Async LRU cache, optional TTL. |
acached_property |
Async cached_property with single-flight on first await. |
single_flight(key=None) |
Deduplicate concurrent calls — N inflight calls of the same key share one execution. |
areduce(func, async_iter, initial) |
functools.reduce for async iterables. |
AsyncLazyValue(factory) |
Compute-once async value (is_set(), reset()). |
syncable |
Decorator that adds a synchronous entry-point func.sync(...) to an async function. |
@acached(maxsize=256, ttl=60)
async def fetch_user(uid: int) -> User: ...
# 50 concurrent calls to fetch_config() — only one underlying request fires
@single_flight()
async def fetch_config() -> Config: ...
# Run the same function from sync or async code
@syncable
async def fetch(url: str) -> str: ...
# async context:
data = await fetch("https://...")
# sync context (CLI scripts, pytest fixtures, ...):
data = fetch.sync("https://...")
threading — bridge to blocking I/O
| Symbol | What it does |
|---|---|
run_in_thread(func, *args, executor=None, **kwargs) |
asyncio.to_thread with optional dedicated executor. |
ThreadPool(max_workers, name_prefix) |
Async context manager around a dedicated ThreadPoolExecutor. |
thread_executor(*, executor=None, max_workers=None) |
Decorator wrapping a sync function as async. |
asyncable |
Bare-decorator (@asyncable) form of thread_executor(). |
# Wrap a sync HTTP client to run off the event loop
@asyncable
def get_sync(url: str) -> str:
return requests.get(url).text
# Now use as async:
text = await get_sync("https://...")
# Isolate workloads — heavy parsing in its own pool, won't starve HTTP
async with ThreadPool(max_workers=4, name_prefix="parser") as pool:
docs = await asyncio.gather(*(pool.run(parse, raw) for raw in payloads))
scheduling — periodic, debounce, throttle
| Symbol | What it does |
|---|---|
every(interval, *, jitter=0.0, immediate=False) |
Decorator that turns an async function into a background asyncio.Task loop. |
debounce(delay) |
Coalesce rapid calls — only the last fires after delay seconds of silence. |
throttle(rate) |
Cap firing rate; calls within the cooldown window return None. |
@every(60.0, jitter=0.1)
async def refresh_tokens():
await store.rotate()
task = refresh_tokens() # returns asyncio.Task
# task.cancel() to stop
cache — persistent file-backed TTL cache
| Symbol | What it does |
|---|---|
AsyncFileCache(root, *, default_ttl=None) |
JSON-backed cache, namespaced, SHA-256 keys, atomic writes, TTL expiry, corruption recovery. |
cache = AsyncFileCache(Path("./.cache"), default_ttl=3600)
async def fetch_expensive(slug: str) -> str:
return await cache.get_or_set(
"scraped",
slug,
lambda: scrape(slug), # only called on miss
ttl=86400,
)
Submodule imports
For smaller import surfaces or thematic grouping:
from more_asyncio.concurrency import gather_per, map_per, Throttler
from more_asyncio.resilience import retry, with_timeout, CircuitBreaker
from more_asyncio.iteration import achunked, asorted, collect, agroupby
from more_asyncio.functools import acached, single_flight, syncable
from more_asyncio.streams import merge_iters, cancel_pending_safely
from more_asyncio.threading import ThreadPool, asyncable, run_in_thread
from more_asyncio.scheduling import every, debounce, throttle
from more_asyncio.cache import AsyncFileCache
Everything is also at top level (from more_asyncio import gather_per, retry, ...).
Design principles
- Zero dependencies. Pure stdlib (Python 3.10+:
asyncio,functools,pathlib,hashlib,json,time,concurrent.futures). - Fully typed.
ParamSpec,Concatenate,TypeVar,overloadwhere it matters. Ships withpy.typed. - Composable. Decorators stack (
@retryable() @acached() async def f(): ...),Throttlerworks insidegather_per,acached_propertyshares single-flight semantics withsingle_flight. - Predictable failure modes.
return_exceptions=Truepropagates throughgather_per/map_per.cancel_pending_safelyhandles the cleanup edge cases stdlibasyncio.gatherwon't.
Versioning
Semver. Top-level (from more_asyncio import X) is the stable API.
Submodule paths may be reorganized in minor releases — pin to a major if you import from submodules.
License
MIT. See LICENSE.