import sys import traceback from asyncio import sleep from datetime import datetime import asyncio, sys import time from app.api import app from app.bot import dp as uploader_bot_dp from app.client_bot import dp as client_bot_dp from app.core._config import SANIC_PORT, MYSQL_URI, TELEGRAM_API_KEY from app.core._utils.create_maria_tables import create_maria_tables from app.core.logger import make_log from app.core.models import Memory from app.core.storage import engine async def queue_daemon(app): await sleep(3) while True: delayed_list = {k: v for k, v in app.ctx.memory._delayed_queue.items()} for _execute_ts in delayed_list: if _execute_ts <= datetime.now().timestamp(): del app.ctx.memory._delayed_queue[_execute_ts] app.ctx.memory._execute_queue.append(delayed_list[_execute_ts]) await sleep(.7) async def execute_queue(app): await create_maria_tables(engine) telegram_bot_username = (await app.ctx.memory._telegram_bot.get_me()).username make_log(None, f"Application normally started. HTTP port: {SANIC_PORT}") make_log(None, f"Telegram bot: https://t.me/{telegram_bot_username}") make_log(None, f"MariaDB host: {MYSQL_URI.split('@')[1].split('/')[0].replace('/', '')}") while True: try: _cmd = app.ctx.memory._execute_queue.pop(0) except IndexError: await sleep(.05) continue _fn = _cmd.pop(0) assert _fn _args = _cmd.pop(0) assert type(_args) is tuple try: _kwargs = _cmd.pop(0) assert type(_kwargs) is dict except IndexError: _kwargs = {} try: make_log("Queue.execute", f"{_fn} {_args} {_kwargs}", level='debug') await _fn(*_args, **_kwargs) except BaseException as e: make_log("Queue.execute", f"{_fn} {_args} {_kwargs} => Error: {e}" + '\n' + str(traceback.format_exc())) if __name__ == '__main__': startup_target = '__main__' try: startup_target = sys.argv[1] except BaseException: pass if startup_target == '__main__': app.ctx.memory = Memory() for _target in [uploader_bot_dp, client_bot_dp]: _target._s_memory = app.ctx.memory app.ctx.memory._app = app app.add_task(execute_queue(app)) app.add_task(queue_daemon(app)) app.add_task(uploader_bot_dp.start_polling(app.ctx.memory._telegram_bot)) app.add_task(client_bot_dp.start_polling(app.ctx.memory._client_telegram_bot)) app.run(host='0.0.0.0', port=SANIC_PORT) else: time.sleep(3) startup_fn = None if startup_target == 'indexator': from app.core.background.indexator_service import main_fn as target_fn elif startup_target == 'uploader': from app.core.background.uploader_service import main_fn as target_fn elif startup_target == 'ton_daemon': from app.core.background.ton_service import main as target_fn startup_fn = startup_fn or target_fn assert startup_fn loop = asyncio.get_event_loop() try: loop.run_until_complete(startup_fn()) except BaseException as e: make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()), level='error') sys.exit(0) finally: loop.close()