import asyncio from contextlib import asynccontextmanager from datetime import datetime, timedelta from app.core.logger import make_log class Memory: def __init__(self): self._execute_queue = [] self._delayed_queue = {} self.transactions_disabled = False @asynccontextmanager async def transaction(self, desc=""): make_log("Memory.transaction", f"Starting transaction; {desc}", level='debug') while self.transactions_disabled: await asyncio.sleep(.2) self.transactions_disabled = True try: yield None except BaseException as e: self.transactions_disabled = False make_log("Memory.transaction", f"Transaction error: {e}", level='error') raise e self.transactions_disabled = False make_log("Memory.transaction", f"Transaction finished; {desc}", level='debug') def add_task(self, _fn, *args, **kwargs): try: if type(kwargs.get("delay_s")) in [int, float]: kwargs['execute_ts'] = (datetime.now() + timedelta(seconds=kwargs.pop('delay_s'))).timestamp() _execute_ts = kwargs.pop('execute_ts') _execute_ts = int(_execute_ts) assert _execute_ts > 0 while _execute_ts in self._delayed_queue: _execute_ts += .01 self._delayed_queue[_execute_ts] = [_fn, args, kwargs] except (KeyError, ValueError, AssertionError) as e: if not ("execute_ts" in str(e)): make_log("Queue.add_task", f"Error when adding task to memory: {e}", level='error') self._execute_queue.append([_fn, args, kwargs])