Skip to content

core

Async Leaky Bucket Rate Limiter (predictable, queue-based version)

AsyncLeakyBucket

Async Leaky Bucket Rate Limiter - Queue-based implementation

Parameters:

Name Type Description Default
leaky_bucket_config LeakyBucketConfig | None

Configuration for the leaky bucket with the max capacity and time period in seconds

None
Note

This implementation is synchronous and supports bursts up to the capacity within the specified time period

Source code in limitor/extra/leaky_bucket/core.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class AsyncLeakyBucket:
    """Async Leaky Bucket Rate Limiter - Queue-based implementation

    Args:
        leaky_bucket_config: Configuration for the leaky bucket with the max capacity and time period in seconds

    Note:
        This implementation is synchronous and supports bursts up to the capacity within the specified time period
    """

    def __init__(self, leaky_bucket_config: LeakyBucketConfig | None = None):
        config = leaky_bucket_config or LeakyBucketConfig()
        for key, value in vars(config).items():
            setattr(self, key, value)

        self.leak_rate = self.capacity / self.seconds
        self._bucket_level = 0.0
        self._last_leak = time.monotonic()
        self._queue = asyncio.Queue()
        self._worker_task = asyncio.create_task(self._worker())

    def _leak(self) -> None:
        """Leak the bucket based on the elapsed time since the last leak"""
        now = time.monotonic()
        elapsed = now - self._last_leak
        self._bucket_level = max(0.0, self._bucket_level - elapsed * self.leak_rate)
        self._last_leak = now

    def capacity_info(self, amount: float = 1) -> Capacity:
        """Get the current capacity information of the leaky bucket

        Args:
            amount: The amount of capacity to check for, defaults to 1

        Returns:
            A named tuple indicating if the bucket has enough capacity and how much more is needed
        """
        self._leak()
        needed = self._bucket_level + amount - self.capacity
        return Capacity(has_capacity=needed <= 0, needed_capacity=needed)

    async def _worker(self) -> None:  # single-worker coroutine
        """Worker coroutine that processes requests from the queue."""
        while True:
            item = await self._queue.get()
            if item is None:
                self._queue.task_done()
                break

            amount, future, timeout = item
            try:
                await self._timeout_acquire(amount, timeout)
                future.set_result(True)  # note: this can be set to anything
            except Exception as error:
                future.set_exception(error)

            self._queue.task_done()

    async def _acquire_logic(self, amount: float) -> None:
        """Core logic for acquiring capacity from the leaky bucket.

        Args:
            amount: The amount of capacity to check for, defaults to 1

        Notes:
            Adding a lock here ensures that the acquire logic is atomic, but it also means that the
                requests are going to be done in the order they were received  i.e. not out-of-order like
                most async programs.
            The benefit is that with multiple concurrent requests, we can ensure that the bucket level
                is updated correctly and that we don't have multiple requests trying to update the bucket level
                at the same time, which could lead to an inconsistent state i.e. a race condition.
        """
        capacity_info = self.capacity_info()
        while not capacity_info.has_capacity:
            needed = capacity_info.needed_capacity
            # amount we need to wait to leak (either part or the entire capacity)
            # needed is guaranteed to be positive here, so we can use it directly
            wait_time = needed / self.leak_rate
            if wait_time > 0:
                await asyncio.sleep(wait_time)

            capacity_info = self.capacity_info()

        self._bucket_level += amount

    async def _timeout_acquire(self, amount: float, timeout: float | None) -> None:
        """Acquire capacity from the leaky bucket, waiting asynchronously until allowed.

        Supports timeout and cancellation.

        Args:
            amount: The amount of capacity to acquire, defaults to 1
            timeout: Optional timeout in seconds for the acquire operation

        Raises:
            ValueError: If the requested amount exceeds the bucket's capacity
            TimeoutError: If the acquire operation times out after the specified timeout period
        """
        if amount > self.capacity:
            raise ValueError(f"Cannot acquire more than the bucket's capacity: {self.capacity}")

        if timeout is not None:
            try:
                await asyncio.wait_for(self._acquire_logic(amount), timeout=timeout)
            except TimeoutError as error:
                raise TimeoutError(f"Acquire timed out after {timeout} seconds for amount={amount}") from error
        else:
            await self._acquire_logic(amount)

    async def acquire(self, amount: float = 1.0, timeout: float | None = None) -> None:
        """Acquire capacity from the leaky bucket, waiting asynchronously until allowed.

        Args:
            amount: The amount of capacity to acquire, defaults to 1
            timeout: Optional timeout in seconds for the acquire operation
        """
        future = asyncio.get_event_loop().create_future()
        await self._queue.put((amount, future, timeout))
        await future

    async def shutdown(self) -> None:
        """Gracefully shut down the worker task."""
        await self._queue.put(None)  # Sentinel value
        await self._worker_task

    async def __aenter__(self) -> AsyncLeakyBucket:
        """Enter the context manager, acquiring resources if necessary"""
        await self.acquire()
        return self

    async def __aexit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
        """Exit the context manager, releasing any resources if necessary"""
        await self.shutdown()
        return None

__aenter__() async

Enter the context manager, acquiring resources if necessary

Source code in limitor/extra/leaky_bucket/core.py
160
161
162
163
async def __aenter__(self) -> AsyncLeakyBucket:
    """Enter the context manager, acquiring resources if necessary"""
    await self.acquire()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Exit the context manager, releasing any resources if necessary

Source code in limitor/extra/leaky_bucket/core.py
165
166
167
168
async def __aexit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
    """Exit the context manager, releasing any resources if necessary"""
    await self.shutdown()
    return None

acquire(amount=1.0, timeout=None) async

Acquire capacity from the leaky bucket, waiting asynchronously until allowed.

Parameters:

Name Type Description Default
amount float

The amount of capacity to acquire, defaults to 1

1.0
timeout float | None

Optional timeout in seconds for the acquire operation

None
Source code in limitor/extra/leaky_bucket/core.py
144
145
146
147
148
149
150
151
152
153
async def acquire(self, amount: float = 1.0, timeout: float | None = None) -> None:
    """Acquire capacity from the leaky bucket, waiting asynchronously until allowed.

    Args:
        amount: The amount of capacity to acquire, defaults to 1
        timeout: Optional timeout in seconds for the acquire operation
    """
    future = asyncio.get_event_loop().create_future()
    await self._queue.put((amount, future, timeout))
    await future

capacity_info(amount=1)

Get the current capacity information of the leaky bucket

Parameters:

Name Type Description Default
amount float

The amount of capacity to check for, defaults to 1

1

Returns:

Type Description
Capacity

A named tuple indicating if the bucket has enough capacity and how much more is needed

Source code in limitor/extra/leaky_bucket/core.py
63
64
65
66
67
68
69
70
71
72
73
74
def capacity_info(self, amount: float = 1) -> Capacity:
    """Get the current capacity information of the leaky bucket

    Args:
        amount: The amount of capacity to check for, defaults to 1

    Returns:
        A named tuple indicating if the bucket has enough capacity and how much more is needed
    """
    self._leak()
    needed = self._bucket_level + amount - self.capacity
    return Capacity(has_capacity=needed <= 0, needed_capacity=needed)

shutdown() async

Gracefully shut down the worker task.

Source code in limitor/extra/leaky_bucket/core.py
155
156
157
158
async def shutdown(self) -> None:
    """Gracefully shut down the worker task."""
    await self._queue.put(None)  # Sentinel value
    await self._worker_task

Capacity

Bases: NamedTuple

Information about the current capacity of the leaky bucket

Source code in limitor/extra/leaky_bucket/core.py
25
26
27
28
29
30
31
32
class Capacity(NamedTuple):
    """Information about the current capacity of the leaky bucket"""

    has_capacity: bool
    """Indicates if the bucket has enough capacity to accommodate the requested amount"""

    needed_capacity: float
    """Amount of capacity needed to accommodate the request, if any"""

has_capacity instance-attribute

Indicates if the bucket has enough capacity to accommodate the requested amount

needed_capacity instance-attribute

Amount of capacity needed to accommodate the request, if any