import asyncio import sys import os import time import traceback from asyncio import sleep from datetime import datetime startup_target = '__main__' try: startup_target = sys.argv[1] except BaseException: pass from app.core._utils.create_maria_tables import create_db_tables from app.core.storage import engine if startup_target == '__main__': # Ensure DB schema exists (async engine) asyncio.get_event_loop().run_until_complete(create_db_tables(engine)) else: time.sleep(7) from app.api import app from app.bot import dp as uploader_bot_dp from app.client_bot import dp as client_bot_dp from app.core._config import SANIC_PORT, PROJECT_HOST, DATABASE_URL from app.core.logger import make_log if int(os.getenv("SANIC_MAINTENANCE", '0')) == 1: make_log("Global", "Application is in maintenance mode") while True: time.sleep(1) from app.core.models import Memory async def queue_daemon(app): await sleep(3) while True: delayed_list = {k: v for k, v in app.ctx.memory._delayed_queue.items()} for _execute_ts in delayed_list: if _execute_ts <= datetime.now().timestamp(): del app.ctx.memory._delayed_queue[_execute_ts] app.ctx.memory._execute_queue.append(delayed_list[_execute_ts]) await sleep(.7) async def execute_queue(app): telegram_bot_username = (await app.ctx.memory._telegram_bot.get_me()).username client_telegram_bot_username = (await app.ctx.memory._client_telegram_bot.get_me()).username make_log(None, f"Application normally started. HTTP port: {SANIC_PORT}") make_log(None, f"Telegram bot: https://t.me/{telegram_bot_username}") make_log(None, f"Client Telegram bot: https://t.me/{client_telegram_bot_username}") try: _db_host = DATABASE_URL.split('@')[1].split('/')[0].replace('/', '') except Exception: _db_host = 'postgres://' make_log(None, f"PostgreSQL host: {_db_host}") make_log(None, f"API host: {PROJECT_HOST}") while True: try: _cmd = app.ctx.memory._execute_queue.pop(0) except IndexError: await sleep(.05) continue _fn = _cmd.pop(0) assert _fn _args = _cmd.pop(0) assert type(_args) is tuple try: _kwargs = _cmd.pop(0) assert type(_kwargs) is dict except IndexError: _kwargs = {} try: make_log("Queue.execute", f"{_fn} {_args} {_kwargs}", level='debug') await _fn(*_args, **_kwargs) except BaseException as e: make_log("Queue.execute", f"{_fn} {_args} {_kwargs} => Error: {e}" + '\n' + str(traceback.format_exc())) if __name__ == '__main__': main_memory = Memory() if startup_target == '__main__': app.ctx.memory = main_memory for _target in [uploader_bot_dp, client_bot_dp]: _target._s_memory = app.ctx.memory app.ctx.memory._app = app app.add_task(execute_queue(app)) app.add_task(queue_daemon(app)) app.add_task(uploader_bot_dp.start_polling(app.ctx.memory._telegram_bot)) app.add_task(client_bot_dp.start_polling(app.ctx.memory._client_telegram_bot)) app.run(host='0.0.0.0', port=SANIC_PORT) else: time.sleep(2) startup_fn = None if startup_target == 'indexer': from app.core.background.indexer_service import main_fn as target_fn time.sleep(1) elif startup_target == 'uploader': from app.core.background.uploader_service import main_fn as target_fn time.sleep(3) elif startup_target == 'ton_daemon': from app.core.background.ton_service import main_fn as target_fn time.sleep(5) elif startup_target == 'license_index': from app.core.background.license_service import main_fn as target_fn time.sleep(7) elif startup_target == 'convert_process': from app.core.background.convert_service import main_fn as target_fn time.sleep(9) startup_fn = startup_fn or target_fn assert startup_fn async def wrapped_startup_fn(*args): try: await startup_fn(*args) except BaseException as e: make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()), level='error') sys.exit(1) loop = asyncio.get_event_loop() try: loop.run_until_complete(wrapped_startup_fn(main_memory)) except BaseException as e: make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()), level='error') sys.exit(0) finally: loop.close()