import sys import traceback from asyncio import sleep from datetime import datetime import asyncio, sys from aiogram import Bot import time from app.api import app from app.bot import dp from app.core._config import SANIC_PORT, MYSQL_URI, TELEGRAM_API_KEY from app.core._utils.create_maria_tables import create_maria_tables from app.core.logger import make_log from app.core.models import Memory from app.core.storage import engine async def queue_daemon(app): await sleep(3) while True: delayed_list = {k: v for k, v in app.ctx.memory._delayed_queue.items()} for _execute_ts in delayed_list: if _execute_ts <= datetime.now().timestamp(): del app.ctx.memory._delayed_queue[_execute_ts] app.ctx.memory._execute_queue.append(delayed_list[_execute_ts]) await sleep(.7) async def execute_queue(app): await create_maria_tables(engine) telegram_bot_username = (await app.ctx.memory._telegram_bot.get_me()).username make_log(None, f"Application normally started. HTTP port: {SANIC_PORT}") make_log(None, f"Telegram bot: https://t.me/{telegram_bot_username}") make_log(None, f"MariaDB host: {MYSQL_URI.split('@')[1].split('/')[0].replace('/', '')}") while True: try: _cmd = app.ctx.memory._execute_queue.pop(0) except IndexError: await sleep(.05) continue _fn = _cmd.pop(0) assert _fn _args = _cmd.pop(0) assert type(_args) is tuple try: _kwargs = _cmd.pop(0) assert type(_kwargs) is dict except IndexError: _kwargs = {} try: make_log("Queue.execute", f"{_fn} {_args} {_kwargs}", level='debug') await _fn(*_args, **_kwargs) except BaseException as e: make_log("Queue.execute", f"{_fn} {_args} {_kwargs} => Error: {e}" + '\n' + str(traceback.format_exc())) if __name__ == '__main__': startup_target = '__main__' try: startup_target = sys.argv[1] except BaseException: pass if startup_target == '__main__': app.ctx.memory = Memory() app.ctx.memory._telegram_bot = Bot(TELEGRAM_API_KEY) dp._s_memory = app.ctx.memory app.ctx.memory._app = app app.add_task(execute_queue(app)) app.add_task(queue_daemon(app)) app.add_task(dp.start_polling(app.ctx.memory._telegram_bot)) app.run(host='0.0.0.0', port=SANIC_PORT) else: time.sleep(3) startup_fn = None if startup_target == 'indexator': from app.core.background.indexator_service import main as target_fn elif startup_target == 'uploader': from app.core.background.uploader_service import main as target_fn elif startup_target == 'ton_daemon': from app.core.background.ton_service import main as target_fn startup_fn = startup_fn or target_fn assert startup_fn loop = asyncio.get_event_loop() try: loop.run_until_complete(startup_fn()) except BaseException as e: make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()), level='error') sys.exit(0) finally: loop.close()