Skip to content

core

A simple synchronous implementation of the Generic Cell Rate Algorithm (GCRA)

References: - https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm - https://en.wikipedia.org/wiki/Leaky_bucket

GCRAConfig dataclass

Configuration for the Token Bucket Rate Limiter

Source code in limitor/generic_cell_rate/core.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@dataclass
class GCRAConfig:
    """Configuration for the Token Bucket Rate Limiter"""

    capacity: int = 10
    """Maximum number of units we can hold i.e. number of requests that can be processed at once"""

    seconds: float = 1
    """Up to `capacity` acquisitions are allowed within this time period in a burst"""

    def __post_init__(self):
        """Validate the configuration parameters"""
        fill_rate_per_sec = self.capacity / self.seconds
        if fill_rate_per_sec <= 0:
            raise ValueError("fill_rate_per_sec must be positive and non-zero")

        if self.capacity < 1:
            raise ValueError("capacity must be at least 1")

capacity = 10 class-attribute instance-attribute

Maximum number of units we can hold i.e. number of requests that can be processed at once

seconds = 1 class-attribute instance-attribute

Up to capacity acquisitions are allowed within this time period in a burst

__post_init__()

Validate the configuration parameters

Source code in limitor/generic_cell_rate/core.py
24
25
26
27
28
29
30
31
def __post_init__(self):
    """Validate the configuration parameters"""
    fill_rate_per_sec = self.capacity / self.seconds
    if fill_rate_per_sec <= 0:
        raise ValueError("fill_rate_per_sec must be positive and non-zero")

    if self.capacity < 1:
        raise ValueError("capacity must be at least 1")

SyncLeakyBucketGCRA

Continuous-state Leaky Bucket Rate Limiter

Parameters:

Name Type Description Default
gcra_config GCRAConfig | None

Configuration for the GCR algorithm with the max capacity and time period in seconds

required
Note

This implementation is synchronous and supports bursts up to the capacity within the specified time period

References

https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm

Source code in limitor/generic_cell_rate/core.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
class SyncLeakyBucketGCRA:
    """Continuous-state Leaky Bucket Rate Limiter

    Args:
        gcra_config: Configuration for the GCR algorithm with the max capacity and time period in seconds

    Note:
        This implementation is synchronous and supports bursts up to the capacity within the specified time period

    References:
        https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm
    """

    def __init__(self, gcra_config: GCRAConfig | None):
        # import config and set attributes
        config = gcra_config or GCRAConfig()
        for key, value in vars(config).items():
            setattr(self, key, value)

        self.leak_rate = self.capacity / self.seconds  # units per second
        self.T = 1 / self.leak_rate  # time to leak one unit

        # burst rate, but can't do this if the amount is variable
        # self.tau = self.T * self.burst

        self._bucket_level = 0  # current volume in the bucket
        self._last_leak = None  # same as last conforming time or LCT

    def acquire(self, amount: float = 1) -> None:
        """Acquire resources, blocking if necessary to conform to the rate limit

        Args:
            amount: The amount of resources to acquire (default is 1)

        Raises:
            ValueError: If the amount exceeds the configured capacity
        """
        if amount > self.capacity:
            raise ValueError(f"Cannot acquire more than the capacity: {self.capacity}")

        t_a = time.monotonic()
        if self._last_leak is None:
            # first cell
            self._bucket_level = 0
            self._last_leak = t_a

        elapsed = t_a - self._last_leak
        self._bucket_level = self._bucket_level - elapsed

        # note: we can also make `self.capacity - amount` as class param = burst i.e. independent of capacity
        tau = self.T * (self.capacity - amount)
        if self._bucket_level > tau:
            delay = self._bucket_level - tau
            time.sleep(delay)

            self._bucket_level = self._bucket_level - delay
            t_a += delay

        self._bucket_level = max(0.0, self._bucket_level) + amount * self.T
        self._last_leak = t_a

    def __enter__(self) -> SyncLeakyBucketGCRA:
        """Enter the context manager, acquiring resources if necessary

        Returns:
            An instance of the LeakyBucketGCRA class
        """
        self.acquire()
        return self

    def __exit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
        """Exit the context manager, releasing any resources if necessary

        Args:
            exc_type: The type of the exception raised
            exc_val: The value of the exception raised
            exc_tb: The traceback object
        """
        return None

__enter__()

Enter the context manager, acquiring resources if necessary

Returns:

Type Description
SyncLeakyBucketGCRA

An instance of the LeakyBucketGCRA class

Source code in limitor/generic_cell_rate/core.py
168
169
170
171
172
173
174
175
def __enter__(self) -> SyncLeakyBucketGCRA:
    """Enter the context manager, acquiring resources if necessary

    Returns:
        An instance of the LeakyBucketGCRA class
    """
    self.acquire()
    return self

__exit__(exc_type, exc_val, exc_tb)

Exit the context manager, releasing any resources if necessary

Parameters:

Name Type Description Default
exc_type type[BaseException]

The type of the exception raised

required
exc_val BaseException

The value of the exception raised

required
exc_tb TracebackType

The traceback object

required
Source code in limitor/generic_cell_rate/core.py
177
178
179
180
181
182
183
184
185
def __exit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
    """Exit the context manager, releasing any resources if necessary

    Args:
        exc_type: The type of the exception raised
        exc_val: The value of the exception raised
        exc_tb: The traceback object
    """
    return None

acquire(amount=1)

Acquire resources, blocking if necessary to conform to the rate limit

Parameters:

Name Type Description Default
amount float

The amount of resources to acquire (default is 1)

1

Raises:

Type Description
ValueError

If the amount exceeds the configured capacity

Source code in limitor/generic_cell_rate/core.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def acquire(self, amount: float = 1) -> None:
    """Acquire resources, blocking if necessary to conform to the rate limit

    Args:
        amount: The amount of resources to acquire (default is 1)

    Raises:
        ValueError: If the amount exceeds the configured capacity
    """
    if amount > self.capacity:
        raise ValueError(f"Cannot acquire more than the capacity: {self.capacity}")

    t_a = time.monotonic()
    if self._last_leak is None:
        # first cell
        self._bucket_level = 0
        self._last_leak = t_a

    elapsed = t_a - self._last_leak
    self._bucket_level = self._bucket_level - elapsed

    # note: we can also make `self.capacity - amount` as class param = burst i.e. independent of capacity
    tau = self.T * (self.capacity - amount)
    if self._bucket_level > tau:
        delay = self._bucket_level - tau
        time.sleep(delay)

        self._bucket_level = self._bucket_level - delay
        t_a += delay

    self._bucket_level = max(0.0, self._bucket_level) + amount * self.T
    self._last_leak = t_a

SyncVirtualSchedulingGCRA

Virtual Scheduling Generic Cell Rate Algorithm Rate Limiter

Parameters:

Name Type Description Default
gcra_config GCRAConfig | None

Configuration for the GCR algorithm with the max capacity and time period in seconds

required
Note

This implementation is synchronous and supports bursts up to the capacity within the specified time period

References

https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm

Source code in limitor/generic_cell_rate/core.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
class SyncVirtualSchedulingGCRA:
    """Virtual Scheduling Generic Cell Rate Algorithm Rate Limiter

    Args:
        gcra_config: Configuration for the GCR algorithm with the max capacity and time period in seconds

    Note:
        This implementation is synchronous and supports bursts up to the capacity within the specified time period

    References:
        https://en.wikipedia.org/wiki/Generic_cell_rate_algorithm
    """

    def __init__(self, gcra_config: GCRAConfig | None):
        # import config and set attributes
        config = gcra_config or GCRAConfig()
        for key, value in vars(config).items():
            setattr(self, key, value)

        self.leak_rate = self.capacity / self.seconds  # units per second
        self.T = 1 / self.leak_rate  # time to leak one unit

        # burst rate, but can't do this if the amount is variable
        # self.tau = self.T * self.burst

        # theoretical arrival time (TAT)
        self._tat = None

    def acquire(self, amount: float = 1) -> None:
        """Acquire resources, blocking if necessary to conform to the rate limit

        Args:
            amount: The amount of resources to acquire (default is 1)

        Raises:
            ValueError: If the amount exceeds the configured capacity
        """
        if amount > self.capacity:
            raise ValueError(f"Cannot acquire more than the capacity: {self.capacity}")

        t_a = time.monotonic()
        if self._tat is None:
            # first cell
            self._tat = t_a

        # note: we can also make `self.capacity - amount` as class param = burst i.e. independent of capacity
        tau = self.T * (self.capacity - amount)
        if t_a < self._tat - tau:
            delay = (self._tat - tau) - t_a
            time.sleep(delay)

        self._tat = max(t_a, self._tat) + amount * self.T

    def __enter__(self) -> SyncVirtualSchedulingGCRA:
        """Enter the context manager, acquiring resources if necessary

        Returns:
            An instance of the VirtualSchedulingGCRA class
        """
        self.acquire()
        return self

    def __exit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
        """Exit the context manager, releasing any resources if necessary

        Args:
            exc_type: The type of the exception raised
            exc_val: The value of the exception raised
            exc_tb: The traceback object
        """
        return None

__enter__()

Enter the context manager, acquiring resources if necessary

Returns:

Type Description
SyncVirtualSchedulingGCRA

An instance of the VirtualSchedulingGCRA class

Source code in limitor/generic_cell_rate/core.py
87
88
89
90
91
92
93
94
def __enter__(self) -> SyncVirtualSchedulingGCRA:
    """Enter the context manager, acquiring resources if necessary

    Returns:
        An instance of the VirtualSchedulingGCRA class
    """
    self.acquire()
    return self

__exit__(exc_type, exc_val, exc_tb)

Exit the context manager, releasing any resources if necessary

Parameters:

Name Type Description Default
exc_type type[BaseException]

The type of the exception raised

required
exc_val BaseException

The value of the exception raised

required
exc_tb TracebackType

The traceback object

required
Source code in limitor/generic_cell_rate/core.py
 96
 97
 98
 99
100
101
102
103
104
def __exit__(self, exc_type: type[BaseException], exc_val: BaseException, exc_tb: TracebackType) -> None:
    """Exit the context manager, releasing any resources if necessary

    Args:
        exc_type: The type of the exception raised
        exc_val: The value of the exception raised
        exc_tb: The traceback object
    """
    return None

acquire(amount=1)

Acquire resources, blocking if necessary to conform to the rate limit

Parameters:

Name Type Description Default
amount float

The amount of resources to acquire (default is 1)

1

Raises:

Type Description
ValueError

If the amount exceeds the configured capacity

Source code in limitor/generic_cell_rate/core.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def acquire(self, amount: float = 1) -> None:
    """Acquire resources, blocking if necessary to conform to the rate limit

    Args:
        amount: The amount of resources to acquire (default is 1)

    Raises:
        ValueError: If the amount exceeds the configured capacity
    """
    if amount > self.capacity:
        raise ValueError(f"Cannot acquire more than the capacity: {self.capacity}")

    t_a = time.monotonic()
    if self._tat is None:
        # first cell
        self._tat = t_a

    # note: we can also make `self.capacity - amount` as class param = burst i.e. independent of capacity
    tau = self.T * (self.capacity - amount)
    if t_a < self._tat - tau:
        delay = (self._tat - tau) - t_a
        time.sleep(delay)

    self._tat = max(t_a, self._tat) + amount * self.T