uploader-bot/app/core/models/memory.py

77 lines
2.4 KiB
Python

import asyncio
from contextlib import asynccontextmanager
from datetime import datetime
from datetime import timedelta
from aiogram import Bot
from app.core._config import TELEGRAM_API_KEY, CLIENT_TELEGRAM_API_KEY
from app.core.logger import make_log
class Memory:
def __init__(self):
self._execute_queue = []
self._delayed_queue = {}
self.transactions_disabled = False
self.known_states = {
"uploader_daemon": {
"status": "no status",
"timestamp": None
},
# "uploader_bot": {
# "status": "no status",
# "timestamp": None
# },
"indexer": {
"status": "no status",
"timestamp": None
},
"ton_daemon": {
"status": "no status",
"timestamp": None
}
}
self._telegram_bot = Bot(TELEGRAM_API_KEY)
self._client_telegram_bot = Bot(CLIENT_TELEGRAM_API_KEY)
@asynccontextmanager
async def transaction(self, desc=""):
make_log("Memory.transaction", f"Starting transaction; {desc}", level='debug')
while self.transactions_disabled:
await asyncio.sleep(.2)
self.transactions_disabled = True
try:
yield None
except BaseException as e:
self.transactions_disabled = False
make_log("Memory.transaction", f"Transaction error: {e}", level='error')
raise e
self.transactions_disabled = False
make_log("Memory.transaction", f"Transaction finished; {desc}", level='debug')
def add_task(self, _fn, *args, **kwargs):
try:
if type(kwargs.get("delay_s")) in [int, float]:
kwargs['execute_ts'] = (datetime.now() + timedelta(seconds=kwargs.pop('delay_s'))).timestamp()
_execute_ts = kwargs.pop('execute_ts')
_execute_ts = int(_execute_ts)
assert _execute_ts > 0
while _execute_ts in self._delayed_queue:
_execute_ts += .01
self._delayed_queue[_execute_ts] = [_fn, args, kwargs]
except (KeyError, ValueError, AssertionError) as e:
if not ("execute_ts" in str(e)):
make_log("Queue.add_task", f"Error when adding task to memory: {e}", level='error')
self._execute_queue.append([_fn, args, kwargs])