dev@locazia: test fn

This commit is contained in:
user 2024-02-29 23:20:23 +03:00
parent 308099f8de
commit a427d75d61
1 changed files with 98 additions and 95 deletions

View File

@ -1,97 +1,100 @@
import sys import sys
import traceback print(sys.argv)
from asyncio import sleep
from datetime import datetime
import asyncio, sys
from aiogram import Bot # import sys
# import traceback
from app.api import app # from asyncio import sleep
from app.bot import dp # from datetime import datetime
from app.core._config import SANIC_PORT, MYSQL_URI, TELEGRAM_API_KEY # import asyncio, sys
from app.core._utils.create_maria_tables import create_maria_tables #
from app.core.logger import make_log # from aiogram import Bot
from app.core.models import Memory #
from app.core.storage import engine # from app.api import app
# from app.bot import dp
# from app.core._config import SANIC_PORT, MYSQL_URI, TELEGRAM_API_KEY
async def queue_daemon(app): # from app.core._utils.create_maria_tables import create_maria_tables
await sleep(3) # from app.core.logger import make_log
# from app.core.models import Memory
while True: # from app.core.storage import engine
delayed_list = {k: v for k, v in app.ctx.memory._delayed_queue.items()} #
for _execute_ts in delayed_list: #
if _execute_ts <= datetime.now().timestamp(): # async def queue_daemon(app):
del app.ctx.memory._delayed_queue[_execute_ts] # await sleep(3)
app.ctx.memory._execute_queue.append(delayed_list[_execute_ts]) #
# while True:
await sleep(.7) # delayed_list = {k: v for k, v in app.ctx.memory._delayed_queue.items()}
# for _execute_ts in delayed_list:
# if _execute_ts <= datetime.now().timestamp():
async def execute_queue(app): # del app.ctx.memory._delayed_queue[_execute_ts]
await create_maria_tables(engine) # app.ctx.memory._execute_queue.append(delayed_list[_execute_ts])
#
telegram_bot_username = (await app.ctx.memory._telegram_bot.get_me()).username # await sleep(.7)
make_log(None, f"Application normally started. HTTP port: {SANIC_PORT}") #
make_log(None, f"Telegram bot: https://t.me/{telegram_bot_username}") #
make_log(None, f"MariaDB host: {MYSQL_URI.split('@')[1].split('/')[0].replace('/', '')}") # async def execute_queue(app):
while True: # await create_maria_tables(engine)
try: #
_cmd = app.ctx.memory._execute_queue.pop(0) # telegram_bot_username = (await app.ctx.memory._telegram_bot.get_me()).username
except IndexError: # make_log(None, f"Application normally started. HTTP port: {SANIC_PORT}")
await sleep(.05) # make_log(None, f"Telegram bot: https://t.me/{telegram_bot_username}")
continue # make_log(None, f"MariaDB host: {MYSQL_URI.split('@')[1].split('/')[0].replace('/', '')}")
# while True:
_fn = _cmd.pop(0) # try:
assert _fn # _cmd = app.ctx.memory._execute_queue.pop(0)
_args = _cmd.pop(0) # except IndexError:
assert type(_args) is tuple # await sleep(.05)
try: # continue
_kwargs = _cmd.pop(0) #
assert type(_kwargs) is dict # _fn = _cmd.pop(0)
except IndexError: # assert _fn
_kwargs = {} # _args = _cmd.pop(0)
# assert type(_args) is tuple
try: # try:
make_log("Queue.execute", f"{_fn} {_args} {_kwargs}", level='debug') # _kwargs = _cmd.pop(0)
await _fn(*_args, **_kwargs) # assert type(_kwargs) is dict
except BaseException as e: # except IndexError:
make_log("Queue.execute", f"{_fn} {_args} {_kwargs} => Error: {e}" + '\n' + str(traceback.format_exc())) # _kwargs = {}
#
# try:
if __name__ == '__main__': # make_log("Queue.execute", f"{_fn} {_args} {_kwargs}", level='debug')
startup_target = '__main__' # await _fn(*_args, **_kwargs)
try: # except BaseException as e:
startup_target = sys.argv[3] # make_log("Queue.execute", f"{_fn} {_args} {_kwargs} => Error: {e}" + '\n' + str(traceback.format_exc()))
except IndexError: #
pass #
# if __name__ == '__main__':
if startup_target == '__main__': # startup_target = '__main__'
app.ctx.memory = Memory() # try:
app.ctx.memory._telegram_bot = Bot(TELEGRAM_API_KEY) # startup_target = sys.argv[3]
dp._s_memory = app.ctx.memory # except IndexError:
app.ctx.memory._app = app # pass
#
app.add_task(execute_queue(app)) # if startup_target == '__main__':
app.add_task(queue_daemon(app)) # app.ctx.memory = Memory()
app.add_task(dp.start_polling(app.ctx.memory._telegram_bot)) # app.ctx.memory._telegram_bot = Bot(TELEGRAM_API_KEY)
# dp._s_memory = app.ctx.memory
app.run(host='0.0.0.0', port=SANIC_PORT) # app.ctx.memory._app = app
else: #
loop = asyncio.get_event_loop() # app.add_task(execute_queue(app))
startup_fn = None # app.add_task(queue_daemon(app))
if startup_target == 'indexator': # app.add_task(dp.start_polling(app.ctx.memory._telegram_bot))
from app.core.background.indexator_service import main as target_fn #
elif startup_target == 'uploader': # app.run(host='0.0.0.0', port=SANIC_PORT)
from app.core.background.uploader_service import main as target_fn # else:
# loop = asyncio.get_event_loop()
startup_fn = startup_fn or target_fn # startup_fn = None
assert startup_fn # if startup_target == 'indexator':
# from app.core.background.indexator_service import main as target_fn
try: # elif startup_target == 'uploader':
loop.run_until_complete(startup_fn()) # from app.core.background.uploader_service import main as target_fn
except BaseException as e: #
make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()), level='error') # startup_fn = startup_fn or target_fn
sys.exit(1) # assert startup_fn
finally: #
loop.close() # try:
# loop.run_until_complete(startup_fn())
# except BaseException as e:
# make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()), level='error')
# sys.exit(1)
# finally:
# loop.close()