uploader-bot/app/core/models/memory.py

51 lines
1.7 KiB
Python

import asyncio
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from app.core.logger import make_log
class Memory:
def __init__(self):
self._execute_queue = []
self._delayed_queue = {}
self.transactions_disabled = False
@asynccontextmanager
async def transaction(self, desc=""):
make_log("Memory.transaction", f"Starting transaction; {desc}", level='debug')
while self.transactions_disabled:
await asyncio.sleep(.2)
self.transactions_disabled = True
try:
yield None
except BaseException as e:
self.transactions_disabled = False
make_log("Memory.transaction", f"Transaction error: {e}", level='error')
raise e
self.transactions_disabled = False
make_log("Memory.transaction", f"Transaction finished; {desc}", level='debug')
def add_task(self, _fn, *args, **kwargs):
try:
if type(kwargs.get("delay_s")) in [int, float]:
kwargs['execute_ts'] = (datetime.now() + timedelta(seconds=kwargs.pop('delay_s'))).timestamp()
_execute_ts = kwargs.pop('execute_ts')
_execute_ts = int(_execute_ts)
assert _execute_ts > 0
while _execute_ts in self._delayed_queue:
_execute_ts += .01
self._delayed_queue[_execute_ts] = [_fn, args, kwargs]
except (KeyError, ValueError, AssertionError) as e:
if not ("execute_ts" in str(e)):
make_log("Queue.add_task", f"Error when adding task to memory: {e}", level='error')
self._execute_queue.append([_fn, args, kwargs])