uploader-bot/app/__main__.py

115 lines
3.8 KiB
Python

import sys
import traceback
from asyncio import sleep
from datetime import datetime
import asyncio, sys
import time
from app.api import app
from app.bot import dp as uploader_bot_dp
from app.client_bot import dp as client_bot_dp
from app.core._config import SANIC_PORT, MYSQL_URI, PROJECT_HOST
from app.core._utils.create_maria_tables import create_maria_tables
from app.core.logger import make_log
from app.core.models import Memory
from app.core.storage import engine
async def queue_daemon(app):
await sleep(3)
while True:
delayed_list = {k: v for k, v in app.ctx.memory._delayed_queue.items()}
for _execute_ts in delayed_list:
if _execute_ts <= datetime.now().timestamp():
del app.ctx.memory._delayed_queue[_execute_ts]
app.ctx.memory._execute_queue.append(delayed_list[_execute_ts])
await sleep(.7)
async def execute_queue(app):
await create_maria_tables(engine)
telegram_bot_username = (await app.ctx.memory._telegram_bot.get_me()).username
make_log(None, f"Application normally started. HTTP port: {SANIC_PORT}")
make_log(None, f"Telegram bot: https://t.me/{telegram_bot_username}")
make_log(None, f"MariaDB host: {MYSQL_URI.split('@')[1].split('/')[0].replace('/', '')}")
make_log(None, f"API host: {PROJECT_HOST}")
while True:
try:
_cmd = app.ctx.memory._execute_queue.pop(0)
except IndexError:
await sleep(.05)
continue
_fn = _cmd.pop(0)
assert _fn
_args = _cmd.pop(0)
assert type(_args) is tuple
try:
_kwargs = _cmd.pop(0)
assert type(_kwargs) is dict
except IndexError:
_kwargs = {}
try:
make_log("Queue.execute", f"{_fn} {_args} {_kwargs}", level='debug')
await _fn(*_args, **_kwargs)
except BaseException as e:
make_log("Queue.execute", f"{_fn} {_args} {_kwargs} => Error: {e}" + '\n' + str(traceback.format_exc()))
if __name__ == '__main__':
startup_target = '__main__'
try:
startup_target = sys.argv[1]
except BaseException:
pass
if startup_target == '__main__':
app.ctx.memory = Memory()
for _target in [uploader_bot_dp, client_bot_dp]:
_target._s_memory = app.ctx.memory
app.ctx.memory._app = app
app.add_task(execute_queue(app))
app.add_task(queue_daemon(app))
app.add_task(uploader_bot_dp.start_polling(app.ctx.memory._telegram_bot))
app.add_task(client_bot_dp.start_polling(app.ctx.memory._client_telegram_bot))
app.run(host='0.0.0.0', port=SANIC_PORT)
else:
time.sleep(3)
startup_fn = None
if startup_target == 'indexator':
from app.core.background.indexator_service import main_fn as target_fn
elif startup_target == 'uploader':
from app.core.background.uploader_service import main_fn as target_fn
elif startup_target == 'ton_daemon':
from app.core.background.ton_service import main_fn as target_fn
startup_fn = startup_fn or target_fn
assert startup_fn
async def wrapped_startup_fn():
try:
await startup_fn()
except BaseException as e:
make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()),
level='error')
sys.exit(1)
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(wrapped_startup_fn())
except BaseException as e:
make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()),
level='error')
sys.exit(0)
finally:
loop.close()