76 lines
2.4 KiB
Python
76 lines
2.4 KiB
Python
import asyncio
|
|
from contextlib import asynccontextmanager
|
|
from datetime import datetime, timedelta
|
|
|
|
from aiogram import Bot
|
|
from app.core.logger import make_log
|
|
from app.core._config import TELEGRAM_API_KEY, CLIENT_TELEGRAM_API_KEY
|
|
from datetime import datetime
|
|
|
|
|
|
class Memory:
|
|
def __init__(self):
|
|
self._execute_queue = []
|
|
self._delayed_queue = {}
|
|
|
|
self.transactions_disabled = False
|
|
|
|
self.known_states = {
|
|
"uploader": {
|
|
"status": "no status",
|
|
"timestamp": None
|
|
},
|
|
"uploader_bot": {
|
|
"status": "no status",
|
|
"timestamp": None
|
|
},
|
|
"indexator": {
|
|
"status": "no status",
|
|
"timestamp": None
|
|
},
|
|
"ton_daemon": {
|
|
"status": "no status",
|
|
"timestamp": None
|
|
}
|
|
}
|
|
|
|
self._telegram_bot = Bot(TELEGRAM_API_KEY)
|
|
self._client_telegram_bot = Bot(CLIENT_TELEGRAM_API_KEY)
|
|
|
|
@asynccontextmanager
|
|
async def transaction(self, desc=""):
|
|
make_log("Memory.transaction", f"Starting transaction; {desc}", level='debug')
|
|
while self.transactions_disabled:
|
|
await asyncio.sleep(.2)
|
|
|
|
self.transactions_disabled = True
|
|
try:
|
|
yield None
|
|
except BaseException as e:
|
|
self.transactions_disabled = False
|
|
make_log("Memory.transaction", f"Transaction error: {e}", level='error')
|
|
raise e
|
|
|
|
self.transactions_disabled = False
|
|
make_log("Memory.transaction", f"Transaction finished; {desc}", level='debug')
|
|
|
|
def add_task(self, _fn, *args, **kwargs):
|
|
try:
|
|
if type(kwargs.get("delay_s")) in [int, float]:
|
|
kwargs['execute_ts'] = (datetime.now() + timedelta(seconds=kwargs.pop('delay_s'))).timestamp()
|
|
|
|
_execute_ts = kwargs.pop('execute_ts')
|
|
_execute_ts = int(_execute_ts)
|
|
assert _execute_ts > 0
|
|
while _execute_ts in self._delayed_queue:
|
|
_execute_ts += .01
|
|
|
|
self._delayed_queue[_execute_ts] = [_fn, args, kwargs]
|
|
except (KeyError, ValueError, AssertionError) as e:
|
|
if not ("execute_ts" in str(e)):
|
|
make_log("Queue.add_task", f"Error when adding task to memory: {e}", level='error')
|
|
|
|
self._execute_queue.append([_fn, args, kwargs])
|
|
|
|
|