Skip to content

core

Leaky Bucket Rate Limiter Implementation

AsyncLeakyBucket

Asynchronous Leaky Bucket Rate Limiter

Parameters:

Name Type Description Default
leaky_bucket_config LeakyBucketConfig | None

Configuration for the leaky bucket with the max capacity and time period in seconds

None
max_concurrent int | None

Maximum number of concurrent requests allowed to acquire capacity

None
Note

This implementation is synchronous and supports bursts up to the capacity within the specified time period

Source code in limitor/leaky_bucket/core.py
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
class AsyncLeakyBucket:
    """Asynchronous Leaky Bucket Rate Limiter

    Args:
        leaky_bucket_config: Configuration for the leaky bucket with the max capacity and time period in seconds
        max_concurrent: Maximum number of concurrent requests allowed to acquire capacity

    Note:
        This implementation is synchronous and supports bursts up to the capacity within the specified time period
    """

    def __init__(self, leaky_bucket_config: LeakyBucketConfig | None = None, max_concurrent: int | None = None):
        config = leaky_bucket_config or LeakyBucketConfig()
        for key, value in vars(config).items():
            setattr(self, key, value)

        self.leak_rate = self.capacity / self.seconds
        self._bucket_level = 0.0
        self._last_leak = time.monotonic()

        self.max_concurrent = max_concurrent
        self._lock = asyncio.Lock()

    def _leak(self) -> None:
        """Leak the bucket based on the elapsed time since the last leak"""
        now = time.monotonic()
        elapsed = now - self._last_leak
        self._bucket_level = max(0.0, self._bucket_level - elapsed * self.leak_rate)
        self._last_leak = now

    def capacity_info(self, amount: float = 1) -> Capacity:
        """Get the current capacity information of the leaky bucket

        Args:
            amount: The amount of capacity to check for, defaults to 1

        Returns:
            A named tuple indicating if the bucket has enough capacity and how much more is needed
        """
        self._leak()
        needed = self._bucket_level + amount - self.capacity
        return Capacity(has_capacity=needed <= 0, needed_capacity=needed)

    async def _acquire_logic(self, amount: float = 1) -> None:
        """Core logic for acquiring capacity from the leaky bucket.

        Args:
            amount: The amount of capacity to check for, defaults to 1

        Notes:
            Adding a lock here ensures that the acquire logic is atomic, but it also means that the
                requests are going to be done in the order they were received  i.e. not out-of-order like
                most async programs.
            The benefit is that with multiple concurrent requests, we can ensure that the bucket level
                is updated correctly and that we don't have multiple requests trying to update the bucket level
                at the same time, which could lead to an inconsistent state i.e. a race condition.
        """
        async with self._lock:  # ensures atomicity given we can have multiple concurrent requests
            capacity_info = self.capacity_info()
            while not capacity_info.has_capacity:
                needed = capacity_info.needed_capacity
                # amount we need to wait to leak (either part or the entire capacity)
                # needed is guaranteed to be positive here, so we can use it directly
                wait_time = needed / self.leak_rate
                if wait_time > 0:
                    await asyncio.sleep(wait_time)

                capacity_info = self.capacity_info()

            self._bucket_level += amount

    async def _semaphore_acquire(self, amount: float = 1) -> None:
        """Acquire capacity using a semaphore to limit concurrency.

        Args:
            amount: The amount of capacity to acquire, defaults to 1
        """
        semaphore = asyncio.Semaphore(self.max_concurrent) if self.max_concurrent else nullcontext()
        async with semaphore:
            await self._acquire_logic(amount)

    async def acquire(self, amount: float = 1, timeout: float | None = None) -> None:
        """Acquire capacity from the leaky bucket, waiting asynchronously until allowed.

        Supports timeout and cancellation.

        Args:
            amount: The amount of capacity to acquire, defaults to 1
            timeout: Optional timeout in seconds for the acquire operation

        Raises:
            ValueError: If the requested amount exceeds the bucket's capacity
            TimeoutError: If the acquire operation times out after the specified timeout period
        """
        if amount > self.capacity:
            raise ValueError(f"Cannot acquire more than the bucket's capacity: {self.capacity}")

        if timeout is not None:
            try:
                await asyncio.wait_for(self._semaphore_acquire(amount), timeout=timeout)
            except TimeoutError as error:
                raise TimeoutError(f"Acquire timed out after {timeout} seconds for amount={amount}") from error
        else:
            await self._semaphore_acquire(amount)

    async def __aenter__(self) -> AsyncLeakyBucket:
        """Enter the context manager, acquiring resources if necessary"""
        await self.acquire()
        return self

    async def __aexit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
        """Exit the context manager, releasing any resources if necessary"""
        return None

__aenter__() async

Enter the context manager, acquiring resources if necessary

Source code in limitor/leaky_bucket/core.py
230
231
232
233
async def __aenter__(self) -> AsyncLeakyBucket:
    """Enter the context manager, acquiring resources if necessary"""
    await self.acquire()
    return self

__aexit__(exc_type, exc_val, exc_tb) async

Exit the context manager, releasing any resources if necessary

Source code in limitor/leaky_bucket/core.py
235
236
237
async def __aexit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
    """Exit the context manager, releasing any resources if necessary"""
    return None

acquire(amount=1, timeout=None) async

Acquire capacity from the leaky bucket, waiting asynchronously until allowed.

Supports timeout and cancellation.

Parameters:

Name Type Description Default
amount float

The amount of capacity to acquire, defaults to 1

1
timeout float | None

Optional timeout in seconds for the acquire operation

None

Raises:

Type Description
ValueError

If the requested amount exceeds the bucket's capacity

TimeoutError

If the acquire operation times out after the specified timeout period

Source code in limitor/leaky_bucket/core.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
async def acquire(self, amount: float = 1, timeout: float | None = None) -> None:
    """Acquire capacity from the leaky bucket, waiting asynchronously until allowed.

    Supports timeout and cancellation.

    Args:
        amount: The amount of capacity to acquire, defaults to 1
        timeout: Optional timeout in seconds for the acquire operation

    Raises:
        ValueError: If the requested amount exceeds the bucket's capacity
        TimeoutError: If the acquire operation times out after the specified timeout period
    """
    if amount > self.capacity:
        raise ValueError(f"Cannot acquire more than the bucket's capacity: {self.capacity}")

    if timeout is not None:
        try:
            await asyncio.wait_for(self._semaphore_acquire(amount), timeout=timeout)
        except TimeoutError as error:
            raise TimeoutError(f"Acquire timed out after {timeout} seconds for amount={amount}") from error
    else:
        await self._semaphore_acquire(amount)

capacity_info(amount=1)

Get the current capacity information of the leaky bucket

Parameters:

Name Type Description Default
amount float

The amount of capacity to check for, defaults to 1

1

Returns:

Type Description
Capacity

A named tuple indicating if the bucket has enough capacity and how much more is needed

Source code in limitor/leaky_bucket/core.py
155
156
157
158
159
160
161
162
163
164
165
166
def capacity_info(self, amount: float = 1) -> Capacity:
    """Get the current capacity information of the leaky bucket

    Args:
        amount: The amount of capacity to check for, defaults to 1

    Returns:
        A named tuple indicating if the bucket has enough capacity and how much more is needed
    """
    self._leak()
    needed = self._bucket_level + amount - self.capacity
    return Capacity(has_capacity=needed <= 0, needed_capacity=needed)

Capacity

Bases: NamedTuple

Information about the current capacity of the leaky bucket

Source code in limitor/leaky_bucket/core.py
33
34
35
36
37
38
39
40
class Capacity(NamedTuple):
    """Information about the current capacity of the leaky bucket"""

    has_capacity: bool
    """Indicates if the bucket has enough capacity to accommodate the requested amount"""

    needed_capacity: float
    """Amount of capacity needed to accommodate the request, if any"""

has_capacity instance-attribute

Indicates if the bucket has enough capacity to accommodate the requested amount

needed_capacity instance-attribute

Amount of capacity needed to accommodate the request, if any

LeakyBucketConfig dataclass

Configuration for the Leaky Bucket Rate Limiter

Source code in limitor/leaky_bucket/core.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@dataclass
class LeakyBucketConfig:
    """Configuration for the Leaky Bucket Rate Limiter"""

    capacity: int = 10
    """Maximum number of items the bucket can hold i.e. number of requests that can be processed at once"""

    seconds: float = 1
    """Up to `capacity` acquisitions are allowed within this time period in a burst"""

    def __post_init__(self):
        """Validate the configuration parameters"""
        leak_rate_per_sec = self.capacity / self.seconds
        if leak_rate_per_sec <= 0:
            raise ValueError("leak_rate_per_sec must be positive and non-zero")

        if self.capacity < 1:
            raise ValueError("capacity must be at least 1")

capacity = 10 class-attribute instance-attribute

Maximum number of items the bucket can hold i.e. number of requests that can be processed at once

seconds = 1 class-attribute instance-attribute

Up to capacity acquisitions are allowed within this time period in a burst

__post_init__()

Validate the configuration parameters

Source code in limitor/leaky_bucket/core.py
23
24
25
26
27
28
29
30
def __post_init__(self):
    """Validate the configuration parameters"""
    leak_rate_per_sec = self.capacity / self.seconds
    if leak_rate_per_sec <= 0:
        raise ValueError("leak_rate_per_sec must be positive and non-zero")

    if self.capacity < 1:
        raise ValueError("capacity must be at least 1")

SyncLeakyBucket

Leaky Bucket Rate Limiter

Parameters:

Name Type Description Default
leaky_bucket_config LeakyBucketConfig | None

Configuration for the leaky bucket with the max capacity and time period in seconds

required
Note

This implementation is synchronous and supports bursts up to the capacity within the specified time period

Source code in limitor/leaky_bucket/core.py
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class SyncLeakyBucket:
    """Leaky Bucket Rate Limiter

    Args:
        leaky_bucket_config: Configuration for the leaky bucket with the max capacity and time period in seconds

    Note:
        This implementation is synchronous and supports bursts up to the capacity within the specified time period
    """

    def __init__(self, leaky_bucket_config: LeakyBucketConfig | None):
        # import config and set attributes
        config = leaky_bucket_config or LeakyBucketConfig()
        for key, value in vars(config).items():
            setattr(self, key, value)

        self.leak_rate = self.capacity / self.seconds  # units per second

        self._bucket_level = 0  # current volume in the bucket
        self._last_leak = time.monotonic()  # last leak time

    def _leak(self) -> None:
        """Leak the bucket based on the elapsed time since the last leak"""
        now = time.monotonic()
        elapsed = now - self._last_leak
        self._bucket_level = max(0.0, self._bucket_level - elapsed * self.leak_rate)
        self._last_leak = now

    def capacity_info(self, amount: float = 1) -> Capacity:
        """Get the current capacity information of the leaky bucket

        Args:
            amount: The amount of capacity to check for, defaults to 1

        Returns:
            A named tuple indicating if the bucket has enough capacity and how much more is needed
        """
        self._leak()
        needed = self._bucket_level + amount - self.capacity
        return Capacity(has_capacity=needed <= 0, needed_capacity=needed)

    def acquire(self, amount: float = 1) -> None:
        """Acquire capacity from the leaky bucket, blocking until enough capacity is available.

        This method will block and sleep until the requested amount can be acquired
        without exceeding the bucket's capacity, simulating rate limiting.

        Args:
            amount: The amount of capacity to acquire, defaults to 1

        Raises:
            ValueError: If the requested amount exceeds the bucket's capacity

        Notes:
            The while loop is just to make sure nothing funny happens while waiting
        """
        if amount > self.capacity:
            raise ValueError(f"Cannot acquire more than the bucket's capacity: {self.capacity}")

        capacity_info = self.capacity_info()
        while not capacity_info.has_capacity:
            needed = capacity_info.needed_capacity
            # amount we need to wait to leak (either part or the entire capacity)
            # needed is guaranteed to be positive here, so we can use it directly
            wait_time = needed / self.leak_rate
            if wait_time > 0:
                time.sleep(wait_time)

            capacity_info = self.capacity_info()

        self._bucket_level += amount

    def __enter__(self) -> SyncLeakyBucket:
        """Enter the context manager, acquiring resources if necessary"""
        self.acquire()
        return self

    def __exit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
        """Exit the context manager, releasing any resources if necessary"""
        return None

__enter__()

Enter the context manager, acquiring resources if necessary

Source code in limitor/leaky_bucket/core.py
115
116
117
118
def __enter__(self) -> SyncLeakyBucket:
    """Enter the context manager, acquiring resources if necessary"""
    self.acquire()
    return self

__exit__(exc_type, exc_val, exc_tb)

Exit the context manager, releasing any resources if necessary

Source code in limitor/leaky_bucket/core.py
120
121
122
def __exit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
    """Exit the context manager, releasing any resources if necessary"""
    return None

acquire(amount=1)

Acquire capacity from the leaky bucket, blocking until enough capacity is available.

This method will block and sleep until the requested amount can be acquired without exceeding the bucket's capacity, simulating rate limiting.

Parameters:

Name Type Description Default
amount float

The amount of capacity to acquire, defaults to 1

1

Raises:

Type Description
ValueError

If the requested amount exceeds the bucket's capacity

Notes

The while loop is just to make sure nothing funny happens while waiting

Source code in limitor/leaky_bucket/core.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def acquire(self, amount: float = 1) -> None:
    """Acquire capacity from the leaky bucket, blocking until enough capacity is available.

    This method will block and sleep until the requested amount can be acquired
    without exceeding the bucket's capacity, simulating rate limiting.

    Args:
        amount: The amount of capacity to acquire, defaults to 1

    Raises:
        ValueError: If the requested amount exceeds the bucket's capacity

    Notes:
        The while loop is just to make sure nothing funny happens while waiting
    """
    if amount > self.capacity:
        raise ValueError(f"Cannot acquire more than the bucket's capacity: {self.capacity}")

    capacity_info = self.capacity_info()
    while not capacity_info.has_capacity:
        needed = capacity_info.needed_capacity
        # amount we need to wait to leak (either part or the entire capacity)
        # needed is guaranteed to be positive here, so we can use it directly
        wait_time = needed / self.leak_rate
        if wait_time > 0:
            time.sleep(wait_time)

        capacity_info = self.capacity_info()

    self._bucket_level += amount

capacity_info(amount=1)

Get the current capacity information of the leaky bucket

Parameters:

Name Type Description Default
amount float

The amount of capacity to check for, defaults to 1

1

Returns:

Type Description
Capacity

A named tuple indicating if the bucket has enough capacity and how much more is needed

Source code in limitor/leaky_bucket/core.py
71
72
73
74
75
76
77
78
79
80
81
82
def capacity_info(self, amount: float = 1) -> Capacity:
    """Get the current capacity information of the leaky bucket

    Args:
        amount: The amount of capacity to check for, defaults to 1

    Returns:
        A named tuple indicating if the bucket has enough capacity and how much more is needed
    """
    self._leak()
    needed = self._bucket_level + amount - self.capacity
    return Capacity(has_capacity=needed <= 0, needed_capacity=needed)