import asyncio from contextlib import asynccontextmanager from datetime import datetime from datetime import timedelta from aiogram import Bot from app.core._config import TELEGRAM_API_KEY, CLIENT_TELEGRAM_API_KEY from app.core.logger import make_log class Memory: def __init__(self): self._execute_queue = [] self._delayed_queue = {} self.transactions_disabled = False self.known_states = { "uploader_daemon": { "status": "no status", "timestamp": None }, "indexer": { "status": "no status", "timestamp": None }, 'licenses': { 'status': 'no status', 'timestamp': None }, "ton_daemon": { "status": "no status", "timestamp": None }, 'convert_service': { 'status': 'no status', 'timestamp': None } } self._telegram_bot = Bot(TELEGRAM_API_KEY) self._client_telegram_bot = Bot(CLIENT_TELEGRAM_API_KEY) @asynccontextmanager async def transaction(self, desc=""): make_log("Memory.transaction", f"Starting transaction; {desc}", level='debug') while self.transactions_disabled: await asyncio.sleep(.2) self.transactions_disabled = True try: yield None except BaseException as e: self.transactions_disabled = False make_log("Memory.transaction", f"Transaction error: {e}", level='error') raise e self.transactions_disabled = False make_log("Memory.transaction", f"Transaction finished; {desc}", level='debug') def add_task(self, _fn, *args, **kwargs): try: if type(kwargs.get("delay_s")) in [int, float]: kwargs['execute_ts'] = (datetime.now() + timedelta(seconds=kwargs.pop('delay_s'))).timestamp() _execute_ts = kwargs.pop('execute_ts') _execute_ts = int(_execute_ts) assert _execute_ts > 0 while _execute_ts in self._delayed_queue: _execute_ts += .01 self._delayed_queue[_execute_ts] = [_fn, args, kwargs] except (KeyError, ValueError, AssertionError) as e: if not ("execute_ts" in str(e)): make_log("Queue.add_task", f"Error when adding task to memory: {e}", level='error') self._execute_queue.append([_fn, args, kwargs])