Files
2025-12-09 12:13:01 +01:00

78 lines
1.9 KiB
Python

""" Wrappers over different abstract types
"""
from inspect import isawaitable
from typing import Optional
from .bucket import AbstractBucket
from .rate import RateItem
class BucketAsyncWrapper(AbstractBucket):
"""BucketAsyncWrapper is a wrapping over any bucket
that turns a async/synchronous bucket into an async one
"""
def __init__(self, bucket: AbstractBucket):
assert isinstance(bucket, AbstractBucket)
self.bucket = bucket
async def put(self, item: RateItem):
result = self.bucket.put(item)
while isawaitable(result):
result = await result
return result
async def count(self):
result = self.bucket.count()
while isawaitable(result):
result = await result
return result
async def leak(self, current_timestamp: Optional[int] = None) -> int:
result = self.bucket.leak(current_timestamp)
while isawaitable(result):
result = await result
assert isinstance(result, int)
return result
async def flush(self) -> None:
result = self.bucket.flush()
while isawaitable(result):
# TODO: AbstractBucket.flush() may not have correct type annotation?
result = await result # type: ignore
return None
async def peek(self, index: int) -> Optional[RateItem]:
item = self.bucket.peek(index)
while isawaitable(item):
item = await item
assert item is None or isinstance(item, RateItem)
return item
async def waiting(self, item: RateItem) -> int:
wait = super().waiting(item)
if isawaitable(wait):
wait = await wait
assert isinstance(wait, int)
return wait
@property
def failing_rate(self):
return self.bucket.failing_rate
@property
def rates(self):
return self.bucket.rates