Add rate limiting code

This commit is contained in:
Ashish Pratap Singh
2024-07-16 19:53:11 -07:00
parent 494b04b1b4
commit 5a9bf86980
10 changed files with 356 additions and 0 deletions

View File

@@ -0,0 +1,33 @@
import time
class FixedWindowCounter:
def __init__(self, window_size, max_requests):
self.window_size = window_size # Size of the window in seconds
self.max_requests = max_requests # Maximum number of requests per window
self.current_window = time.time() // window_size
self.request_count = 0
def allow_request(self):
current_time = time.time()
window = current_time // self.window_size
# If we've moved to a new window, reset the counter
if window != self.current_window:
self.current_window = window
self.request_count = 0
# Check if we're still within the limit for this window
if self.request_count < self.max_requests:
self.request_count += 1
return True
return False
# Usage example
limiter = FixedWindowCounter(window_size=60, max_requests=5) # 5 requests per minute
for _ in range(10):
print(limiter.allow_request()) # Will print True for the first 5 requests, then False
time.sleep(0.1) # Wait a bit between requests
time.sleep(60) # Wait for the window to reset
print(limiter.allow_request()) # True

View File

@@ -0,0 +1,36 @@
from collections import deque
import time
class LeakyBucket:
def __init__(self, capacity, leak_rate):
self.capacity = capacity # Maximum number of requests in the bucket
self.leak_rate = leak_rate # Rate at which requests leak (requests/second)
self.bucket = deque() # Queue to hold request timestamps
self.last_leak = time.time() # Last time we leaked from the bucket
def allow_request(self):
now = time.time()
# Simulate leaking from the bucket
leak_time = now - self.last_leak
leaked = int(leak_time * self.leak_rate)
if leaked > 0:
# Remove the leaked requests from the bucket
for _ in range(min(leaked, len(self.bucket))):
self.bucket.popleft()
self.last_leak = now
# Check if there's capacity and add the new request
if len(self.bucket) < self.capacity:
self.bucket.append(now)
return True
return False
# Usage example
limiter = LeakyBucket(capacity=5, leak_rate=1) # 5 requests, leak 1 per second
for _ in range(10):
print(limiter.allow_request()) # Will print True for the first 5 requests, then False
time.sleep(0.1) # Wait a bit between requests
time.sleep(1) # Wait for bucket to leak
print(limiter.allow_request()) # True

View File

@@ -0,0 +1,39 @@
import time
class SlidingWindowCounter:
def __init__(self, window_size, max_requests):
self.window_size = window_size # Size of the sliding window in seconds
self.max_requests = max_requests # Maximum number of requests per window
self.current_window = time.time() // window_size
self.request_count = 0
self.previous_count = 0
def allow_request(self):
now = time.time()
window = now // self.window_size
# If we've moved to a new window, update the counts
if window != self.current_window:
self.previous_count = self.request_count
self.request_count = 0
self.current_window = window
# Calculate the weighted request count
window_elapsed = (now % self.window_size) / self.window_size
threshold = self.previous_count * (1 - window_elapsed) + self.request_count
# Check if we're within the limit
if threshold < self.max_requests:
self.request_count += 1
return True
return False
# Usage example
limiter = SlidingWindowCounter(window_size=60, max_requests=5) # 5 requests per minute
for _ in range(10):
print(limiter.allow_request()) # Will print True for the first 5 requests, then gradually become False
time.sleep(0.1) # Wait a bit between requests
time.sleep(30) # Wait for half the window to pass
print(limiter.allow_request()) # Might be True or False depending on the exact timing

View File

@@ -0,0 +1,31 @@
import time
from collections import deque
class SlidingWindowLog:
def __init__(self, window_size, max_requests):
self.window_size = window_size # Size of the sliding window in seconds
self.max_requests = max_requests # Maximum number of requests per window
self.request_log = deque() # Log to keep track of request timestamps
def allow_request(self):
now = time.time()
# Remove timestamps that are outside the current window
while self.request_log and now - self.request_log[0] >= self.window_size:
self.request_log.popleft()
# Check if we're still within the limit
if len(self.request_log) < self.max_requests:
self.request_log.append(now)
return True
return False
# Usage example
limiter = SlidingWindowLog(window_size=60, max_requests=5) # 5 requests per minute
for _ in range(10):
print(limiter.allow_request()) # Will print True for the first 5 requests, then False
time.sleep(0.1) # Wait a bit between requests
time.sleep(60) # Wait for the window to slide
print(limiter.allow_request()) # True

View File

@@ -0,0 +1,31 @@
import time
class TokenBucket:
def __init__(self, capacity, fill_rate):
self.capacity = capacity # Maximum number of tokens the bucket can hold
self.fill_rate = fill_rate # Rate at which tokens are added (tokens/second)
self.tokens = capacity # Current token count, start with a full bucket
self.last_time = time.time() # Last time we checked the token count
def allow_request(self, tokens=1):
now = time.time()
# Calculate how many tokens have been added since the last check
time_passed = now - self.last_time
self.tokens = min(self.capacity, self.tokens + time_passed * self.fill_rate)
self.last_time = now
# Check if we have enough tokens for this request
if self.tokens >= tokens:
self.tokens -= tokens
return True
return False
# Usage example
limiter = TokenBucket(capacity=10, fill_rate=1) # 10 tokens, refill 1 token per second
for _ in range(15):
print(limiter.allow_request()) # Will print True for the first 10 requests, then False
time.sleep(0.1) # Wait a bit between requests
time.sleep(5) # Wait for bucket to refill
print(limiter.allow_request()) # True