From 7f06ee9bd3e5c8ea4d7d1007e475506099b69549 Mon Sep 17 00:00:00 2001 From: user Date: Thu, 29 Feb 2024 23:22:48 +0300 Subject: [PATCH] dev@locazia: fix startup args --- app/__main__.py | 194 ++++++++++++++++++++++++------------------------ 1 file changed, 96 insertions(+), 98 deletions(-) diff --git a/app/__main__.py b/app/__main__.py index c99e276..254dac2 100644 --- a/app/__main__.py +++ b/app/__main__.py @@ -1,100 +1,98 @@ import sys -print(sys.argv) +import traceback +from asyncio import sleep +from datetime import datetime +import asyncio, sys -# import sys -# import traceback -# from asyncio import sleep -# from datetime import datetime -# import asyncio, sys -# -# from aiogram import Bot -# -# from app.api import app -# from app.bot import dp -# from app.core._config import SANIC_PORT, MYSQL_URI, TELEGRAM_API_KEY -# from app.core._utils.create_maria_tables import create_maria_tables -# from app.core.logger import make_log -# from app.core.models import Memory -# from app.core.storage import engine -# -# -# async def queue_daemon(app): -# await sleep(3) -# -# while True: -# delayed_list = {k: v for k, v in app.ctx.memory._delayed_queue.items()} -# for _execute_ts in delayed_list: -# if _execute_ts <= datetime.now().timestamp(): -# del app.ctx.memory._delayed_queue[_execute_ts] -# app.ctx.memory._execute_queue.append(delayed_list[_execute_ts]) -# -# await sleep(.7) -# -# -# async def execute_queue(app): -# await create_maria_tables(engine) -# -# telegram_bot_username = (await app.ctx.memory._telegram_bot.get_me()).username -# make_log(None, f"Application normally started. HTTP port: {SANIC_PORT}") -# make_log(None, f"Telegram bot: https://t.me/{telegram_bot_username}") -# make_log(None, f"MariaDB host: {MYSQL_URI.split('@')[1].split('/')[0].replace('/', '')}") -# while True: -# try: -# _cmd = app.ctx.memory._execute_queue.pop(0) -# except IndexError: -# await sleep(.05) -# continue -# -# _fn = _cmd.pop(0) -# assert _fn -# _args = _cmd.pop(0) -# assert type(_args) is tuple -# try: -# _kwargs = _cmd.pop(0) -# assert type(_kwargs) is dict -# except IndexError: -# _kwargs = {} -# -# try: -# make_log("Queue.execute", f"{_fn} {_args} {_kwargs}", level='debug') -# await _fn(*_args, **_kwargs) -# except BaseException as e: -# make_log("Queue.execute", f"{_fn} {_args} {_kwargs} => Error: {e}" + '\n' + str(traceback.format_exc())) -# -# -# if __name__ == '__main__': -# startup_target = '__main__' -# try: -# startup_target = sys.argv[3] -# except IndexError: -# pass -# -# if startup_target == '__main__': -# app.ctx.memory = Memory() -# app.ctx.memory._telegram_bot = Bot(TELEGRAM_API_KEY) -# dp._s_memory = app.ctx.memory -# app.ctx.memory._app = app -# -# app.add_task(execute_queue(app)) -# app.add_task(queue_daemon(app)) -# app.add_task(dp.start_polling(app.ctx.memory._telegram_bot)) -# -# app.run(host='0.0.0.0', port=SANIC_PORT) -# else: -# loop = asyncio.get_event_loop() -# startup_fn = None -# if startup_target == 'indexator': -# from app.core.background.indexator_service import main as target_fn -# elif startup_target == 'uploader': -# from app.core.background.uploader_service import main as target_fn -# -# startup_fn = startup_fn or target_fn -# assert startup_fn -# -# try: -# loop.run_until_complete(startup_fn()) -# except BaseException as e: -# make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()), level='error') -# sys.exit(1) -# finally: -# loop.close() +from aiogram import Bot + +from app.api import app +from app.bot import dp +from app.core._config import SANIC_PORT, MYSQL_URI, TELEGRAM_API_KEY +from app.core._utils.create_maria_tables import create_maria_tables +from app.core.logger import make_log +from app.core.models import Memory +from app.core.storage import engine + + +async def queue_daemon(app): + await sleep(3) + + while True: + delayed_list = {k: v for k, v in app.ctx.memory._delayed_queue.items()} + for _execute_ts in delayed_list: + if _execute_ts <= datetime.now().timestamp(): + del app.ctx.memory._delayed_queue[_execute_ts] + app.ctx.memory._execute_queue.append(delayed_list[_execute_ts]) + + await sleep(.7) + + +async def execute_queue(app): + await create_maria_tables(engine) + + telegram_bot_username = (await app.ctx.memory._telegram_bot.get_me()).username + make_log(None, f"Application normally started. HTTP port: {SANIC_PORT}") + make_log(None, f"Telegram bot: https://t.me/{telegram_bot_username}") + make_log(None, f"MariaDB host: {MYSQL_URI.split('@')[1].split('/')[0].replace('/', '')}") + while True: + try: + _cmd = app.ctx.memory._execute_queue.pop(0) + except IndexError: + await sleep(.05) + continue + + _fn = _cmd.pop(0) + assert _fn + _args = _cmd.pop(0) + assert type(_args) is tuple + try: + _kwargs = _cmd.pop(0) + assert type(_kwargs) is dict + except IndexError: + _kwargs = {} + + try: + make_log("Queue.execute", f"{_fn} {_args} {_kwargs}", level='debug') + await _fn(*_args, **_kwargs) + except BaseException as e: + make_log("Queue.execute", f"{_fn} {_args} {_kwargs} => Error: {e}" + '\n' + str(traceback.format_exc())) + + +if __name__ == '__main__': + startup_target = '__main__' + try: + startup_target = sys.argv[1] + except BaseException: + pass + + if startup_target == '__main__': + app.ctx.memory = Memory() + app.ctx.memory._telegram_bot = Bot(TELEGRAM_API_KEY) + dp._s_memory = app.ctx.memory + app.ctx.memory._app = app + + app.add_task(execute_queue(app)) + app.add_task(queue_daemon(app)) + app.add_task(dp.start_polling(app.ctx.memory._telegram_bot)) + + app.run(host='0.0.0.0', port=SANIC_PORT) + else: + loop = asyncio.get_event_loop() + startup_fn = None + if startup_target == 'indexator': + from app.core.background.indexator_service import main as target_fn + elif startup_target == 'uploader': + from app.core.background.uploader_service import main as target_fn + + startup_fn = startup_fn or target_fn + assert startup_fn + + try: + loop.run_until_complete(startup_fn()) + except BaseException as e: + make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()), + level='error') + sys.exit(1) + finally: + loop.close()