unit-of-work (1.1.0)
Installation
pip install --index-url unit-of-workAbout this package
Unit Of Work Pattern Implementation for Python. Work like Deps in FastApi
Unit of Work
A Python implementation of the Unit of Work pattern with built-in dependency injection inspired by FastAPI's Depends. Provides lifecycle management for repositories across SQL databases and REST APIs, with support for pipelines that coordinate multiple units of work as a single atomic operation.
Installation
pip install unit-of-work
With optional extras:
pip install unit-of-work[sql] # SQLAlchemy async support
pip install unit-of-work[rest] # HTTPX / CloudScraper support
pip install unit-of-work[all] # Everything
Requires Python 3.10+
Quick Start
SQL Unit of Work
from typing import Annotated
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
from repository import BaseAsyncSQLRepository
from unit_of_work import BaseSQLUnitOfWork, Depends
# 1. Define your repositories
class UserRepository(BaseAsyncSQLRepository):
...
class OrderRepository(BaseAsyncSQLRepository):
...
# 2. Group them in a Unit of Work
class OrderUoW(BaseSQLUnitOfWork):
users: Annotated[UserRepository, Depends()]
orders: Annotated[OrderRepository, Depends()]
# 3. Configure the session factory
engine = create_async_engine("postgresql+asyncpg://...")
session_factory = async_sessionmaker(engine)
# 4. Use it
async def create_order(user_id: int, items: list):
async with OrderUoW(
session_factory=session_factory,
is_transaction=True,
) as uow:
user = await uow.users.get(user_id)
order = await uow.orders.create(user=user, items=items)
# Auto-commits on exit when is_transaction=True
return order
REST Unit of Work
from typing import Annotated
from repository import BaseRestApiRepository
from unit_of_work import BaseRestUnitOfWork, Depends
class GitHubApiRepository(BaseRestApiRepository):
...
class SlackApiRepository(BaseRestApiRepository):
...
class NotificationUoW(BaseRestUnitOfWork):
github: Annotated[GitHubApiRepository, Depends()]
slack: Annotated[SlackApiRepository, Depends()]
async def notify(message: str):
async with NotificationUoW() as uow:
await uow.github.create_issue(message)
await uow.slack.send_message(message)
FastAPI Integration
from fastapi import FastAPI, Depends as FastApiDepends
app = FastAPI()
def get_order_uow():
return OrderUoW(session_factory=session_factory, is_transaction=True)
@app.post("/orders")
async def create_order(
uow: OrderUoW = FastApiDepends(get_order_uow),
):
async with uow:
order = await uow.orders.create(...)
return order
Pipelines
Pipelines coordinate multiple units of work under a single async with block. All UoWs are initialized in order and cleaned up in reverse order.
Generic Pipeline
from unit_of_work import UoWPipeline
async with UoWPipeline(OrderUoW(...), NotificationUoW()) as (orders, notifications):
order = await orders.orders.create(...)
await notifications.slack.send_message(f"New order: {order.id}")
SQL Pipeline (Shared Session)
SQLUoWPipeline runs multiple SQL units of work on a single shared database session. The first UoW owns the session and manages the transaction — subsequent UoWs attach to it.
from unit_of_work import SQLUoWPipeline
async with SQLUoWPipeline(
OrderUoW(session_factory=session_factory, is_transaction=True),
InventoryUoW(),
) as (orders, inventory):
await orders.orders.create(...)
await inventory.stock.decrement(...)
# Single transaction — both commit or both rollback
REST Pipeline
from unit_of_work import RestUoWPipeline
async with RestUoWPipeline(
NotificationUoW(),
AnalyticsUoW(),
) as (notifications, analytics):
await notifications.slack.send_message(...)
await analytics.events.track(...)
Core Concepts
Depends()
Marks a repository attribute for automatic discovery and initialization. Uses Annotated type hints:
class MyUoW(BaseSQLUnitOfWork):
repo: Annotated[MyRepository, Depends()]
If FastAPI is installed, it uses FastAPI's Depends class, making the UoW compatible with FastAPI's dependency injection.
Lifecycle
Every unit of work follows the same lifecycle:
begin()/abegin()— discovers repository attributes, creates sessions, initializes repositories- Use repositories — perform data operations
close()/aclose()— commits transactions (if enabled), closes sessions, resets repositories
Using with / async with handles this automatically.
Transaction Management (SQL)
# Auto-commit on successful exit
async with MyUoW(session_factory=sf, is_transaction=True) as uow:
await uow.repo.create(...)
# Commits here
# Manual control
async with MyUoW(session_factory=sf) as uow:
await uow.repo.create(...)
await uow.acommit() # Explicit commit
# or
await uow.arollback() # Explicit rollback
On error, acommit() automatically rolls back before re-raising the exception.
API Reference
| Class | Description |
|---|---|
IUnitOfWork[T] |
Abstract interface for all units of work |
BaseUnitOfWork[T] |
Base implementation with repository discovery |
BaseSQLUnitOfWork |
SQL variant with AsyncSession management and transactions |
BaseRestUnitOfWork |
REST variant with HTTPX and CloudScraper client injection |
UoWPipeline |
Generic pipeline for multiple UoWs |
SQLUoWPipeline |
SQL pipeline with shared session across UoWs |
RestUoWPipeline |
REST pipeline with type validation |
Depends() |
Dependency marker for repository attributes |