No description
Find a file
Illia Bahlai 42efc61405
All checks were successful
Publish to PyPI on Tag / Build and Publish (push) Successful in 25s
minor: add often async function
2026-04-29 18:06:25 +00:00
.forgejo/workflows feat: add tests and CI development workflow 2026-03-29 10:23:24 +02:00
src/more_asyncio minor: add often async function 2026-04-29 18:06:25 +00:00
tests minor: add often async function 2026-04-29 18:06:25 +00:00
.gitignore Initial commit 2025-08-08 18:56:33 +02:00
.python-version minor: set python >= 3.10 2025-08-08 18:56:40 +02:00
LICENSE Initial commit 2025-08-08 16:48:16 +00:00
pyproject.toml minor: add often async function 2026-04-29 18:06:25 +00:00
README.md minor: add often async function 2026-04-29 18:06:25 +00:00
uv.lock minor: add often async function 2026-04-29 18:06:25 +00:00

more-asyncio

Async utilities the standard library forgot — bounded gather, retry with backoff, async caching, rate limiting, async itertools, stream multiplexing — all in zero dependencies stdlib-only Python.

pip install more-asyncio

Requires Python 3.10+.

Why

asyncio ships with the bare essentials. Everything else — bounded concurrency, retry policies, async-aware caching, file-backed TTL caches, multi-stream selectors — gets re-invented in every codebase. more-asyncio collects those patterns into a coherent, fully-typed library, with no dependencies beyond stdlib.

Quick start

import asyncio
from more_asyncio import (
    gather_per, map_per, retry, with_timeout,
    achunked, acached, Throttler,
)

async def fetch(url: str) -> str: ...

# 1. Bounded gather — process 1000 URLs, 25 at a time
results = await gather_per(*(fetch(u) for u in urls), per=25)

# Same thing, more readable:
results = await map_per(fetch, urls, per=25)

# 2. Retry with exponential backoff + jitter
data = await retry(lambda: fetch(url), attempts=5, base_delay=0.5)

# 3. Timeout with fallback
data = await with_timeout(fetch(url), timeout=2.0, default=None)

# 4. Cache async results (LRU + optional TTL)
@acached(maxsize=128, ttl=60)
async def get_user(uid: int) -> dict: ...

# 5. Rate-limit external API calls (token bucket)
api = Throttler(rate=10, burst=20)  # 10 req/s, burst 20
async with api:
    await fetch(url)

What's inside

The library is organized into nine focused modules. Everything is also re-exported from the top level — from more_asyncio import X always works.

concurrency — bounded concurrency primitives

Symbol What it does
`gather_per(*coros, per=10, mode="gather" "queue")`
map_per(func, items, per=10) map-style wrapper over gather_per.
as_completed_per(*coros, per=10) Streams results as they finish (not in input order), with bounded concurrency.
concurrency_limit(n) Decorator: cap concurrent calls of an async function to n.
limited(semaphore) Async context manager: async with limited(sem): ....
Throttler(rate, burst) Token-bucket rate limiter. Works as async with throttler: or await throttler.acquire().
# Stream results from slowest-first to fastest-last is annoying;
# stream them in completion order with a 10-way cap:
async for result in as_completed_per(*[fetch(u) for u in urls], per=10):
    await save(result)  # don't wait for the slow ones

resilience — retry, timeout, circuit breaker

Symbol What it does
retry(coro_factory, *, attempts, base_delay, max_delay, exponential, jitter, retry_on, on_retry) Run a coroutine factory with exponential backoff + jitter.
retryable(...) Decorator form of retry.
with_timeout(coro, timeout, *, default=...) asyncio.wait_for with optional fallback value.
CircuitBreaker(failure_threshold, recovery_timeout) Three-state breaker (closed / open / half-open).
CircuitOpenError Raised by CircuitBreaker.call when the circuit is open.
# Retry transient errors with backoff, give up after 5 tries
data = await retry(
    lambda: client.get(url),
    attempts=5,
    base_delay=0.5,
    retry_on=(ConnectionError, TimeoutError),
)

# Stop hammering a flaky API after 5 failures; cool off for 30 seconds
breaker = CircuitBreaker(failure_threshold=5, recovery_timeout=30)
result = await breaker.call(lambda: client.get(url))

iteration — async itertools

Lazy operators (yield):

Symbol Stdlib counterpart
achunked(iterable, size) — chunks any iterable (sync or async) into list[T]
arebatch(async_iter, size, flatten=False) re-chunks an async iterable; O(N) buffer
amap(func, async_iter) map (sync or async predicate)
afilter(pred, async_iter) filter
aenumerate(async_iter, start=0) enumerate
achain(*async_iters) itertools.chain
azip(*async_iters) zip (stops at shortest)
azip_longest(*async_iters, fillvalue) itertools.zip_longest
aslice(async_iter, start, stop, step) itertools.islice
apairwise(async_iter) itertools.pairwise
astarmap(func, async_iter) itertools.starmap
atakewhile(pred, async_iter) itertools.takewhile
adropwhile(pred, async_iter) itertools.dropwhile
agroupby(async_iter, key=None) itertools.groupby (group materialized as list)

Terminal aggregations (drain to value):

Symbol Behavior
asum(async_iter, start=0) sum
amin(async_iter, *, key, default) min
amax(async_iter, *, key, default) max
aany(async_iter) any (short-circuits)
aall(async_iter) all (short-circuits)
asorted(async_iter, *, key, reverse) sorted
collect(async_iter) drain to list
collect_chunks(async_iter) drain AsyncIterable[Iterable[T]] to flat list[T]
collect_first(async_iter, default=None) first item or default
collect_one(async_iter) first item or raise NotFoundError
# Replace this:
for i in range(0, len(items), 100):
    chunk = items[i:i + 100]
    await session.add_all(chunk)
    await session.flush()

# With this:
async for chunk in achunked(items, 100):
    await session.add_all(chunk)
    await session.flush()

streams — multiplexing & cleanup

Symbol What it does
merge_iters(*async_iters, timeout_per_chunk=None, ignore_exceptions=()) Multiplex N async iterables; yields each item in completion order.
cancel_pending_safely(*tasks, timeout=None) Cancel + drain pending tasks, swallowing exceptions during shutdown.
# Merge results from multiple scrapers / consumers / WebSockets
async for job in merge_iters(
    pracuj.iter_jobs(),
    nofluff.iter_jobs(),
    justjoin.iter_jobs(),
    timeout_per_chunk=30.0,
    ignore_exceptions=(RateLimitError,),
):
    await db.save(job)

functools — async-aware function tools

Mirrors stdlib functools for async def.

Symbol What it does
awraps(wrapped) functools.wraps that keeps asyncio.iscoroutinefunction truthy on def wrappers.
apartial(func, *args, **kwargs) functools.partial that returns an async def (FastAPI/TaskIQ detect it correctly).
acached(maxsize=128, ttl=None, key=None) Async LRU cache, optional TTL.
acached_property Async cached_property with single-flight on first await.
single_flight(key=None) Deduplicate concurrent calls — N inflight calls of the same key share one execution.
areduce(func, async_iter, initial) functools.reduce for async iterables.
AsyncLazyValue(factory) Compute-once async value (is_set(), reset()).
syncable Decorator that adds a synchronous entry-point func.sync(...) to an async function.
@acached(maxsize=256, ttl=60)
async def fetch_user(uid: int) -> User: ...

# 50 concurrent calls to fetch_config() — only one underlying request fires
@single_flight()
async def fetch_config() -> Config: ...

# Run the same function from sync or async code
@syncable
async def fetch(url: str) -> str: ...

# async context:
data = await fetch("https://...")
# sync context (CLI scripts, pytest fixtures, ...):
data = fetch.sync("https://...")

threading — bridge to blocking I/O

Symbol What it does
run_in_thread(func, *args, executor=None, **kwargs) asyncio.to_thread with optional dedicated executor.
ThreadPool(max_workers, name_prefix) Async context manager around a dedicated ThreadPoolExecutor.
thread_executor(*, executor=None, max_workers=None) Decorator wrapping a sync function as async.
asyncable Bare-decorator (@asyncable) form of thread_executor().
# Wrap a sync HTTP client to run off the event loop
@asyncable
def get_sync(url: str) -> str:
    return requests.get(url).text

# Now use as async:
text = await get_sync("https://...")

# Isolate workloads — heavy parsing in its own pool, won't starve HTTP
async with ThreadPool(max_workers=4, name_prefix="parser") as pool:
    docs = await asyncio.gather(*(pool.run(parse, raw) for raw in payloads))

scheduling — periodic, debounce, throttle

Symbol What it does
every(interval, *, jitter=0.0, immediate=False) Decorator that turns an async function into a background asyncio.Task loop.
debounce(delay) Coalesce rapid calls — only the last fires after delay seconds of silence.
throttle(rate) Cap firing rate; calls within the cooldown window return None.
@every(60.0, jitter=0.1)
async def refresh_tokens():
    await store.rotate()

task = refresh_tokens()  # returns asyncio.Task
# task.cancel() to stop

cache — persistent file-backed TTL cache

Symbol What it does
AsyncFileCache(root, *, default_ttl=None) JSON-backed cache, namespaced, SHA-256 keys, atomic writes, TTL expiry, corruption recovery.
cache = AsyncFileCache(Path("./.cache"), default_ttl=3600)

async def fetch_expensive(slug: str) -> str:
    return await cache.get_or_set(
        "scraped",
        slug,
        lambda: scrape(slug),  # only called on miss
        ttl=86400,
    )

Submodule imports

For smaller import surfaces or thematic grouping:

from more_asyncio.concurrency import gather_per, map_per, Throttler
from more_asyncio.resilience import retry, with_timeout, CircuitBreaker
from more_asyncio.iteration import achunked, asorted, collect, agroupby
from more_asyncio.functools import acached, single_flight, syncable
from more_asyncio.streams import merge_iters, cancel_pending_safely
from more_asyncio.threading import ThreadPool, asyncable, run_in_thread
from more_asyncio.scheduling import every, debounce, throttle
from more_asyncio.cache import AsyncFileCache

Everything is also at top level (from more_asyncio import gather_per, retry, ...).

Design principles

  • Zero dependencies. Pure stdlib (Python 3.10+: asyncio, functools, pathlib, hashlib, json, time, concurrent.futures).
  • Fully typed. ParamSpec, Concatenate, TypeVar, overload where it matters. Ships with py.typed.
  • Composable. Decorators stack (@retryable() @acached() async def f(): ...), Throttler works inside gather_per, acached_property shares single-flight semantics with single_flight.
  • Predictable failure modes. return_exceptions=True propagates through gather_per/map_per. cancel_pending_safely handles the cleanup edge cases stdlib asyncio.gather won't.

Versioning

Semver. Top-level (from more_asyncio import X) is the stable API. Submodule paths may be reorganized in minor releases — pin to a major if you import from submodules.

License

MIT. See LICENSE.