fix db connections

This commit is contained in:
user 2025-08-24 17:17:58 +03:00
parent 3a6f787a78
commit 4401916104
4 changed files with 49 additions and 129 deletions

View File

@ -18,10 +18,6 @@ if startup_target != '__main__':
# Background services get a short delay before startup # Background services get a short delay before startup
time.sleep(7) time.sleep(7)
from app.api import app
from app.bot import dp as uploader_bot_dp
from app.client_bot import dp as client_bot_dp
from app.core._config import SANIC_PORT, PROJECT_HOST, DATABASE_URL
from app.core.logger import make_log from app.core.logger import make_log
if int(os.getenv("SANIC_MAINTENANCE", '0')) == 1: if int(os.getenv("SANIC_MAINTENANCE", '0')) == 1:
@ -84,13 +80,37 @@ async def execute_queue(app):
if __name__ == '__main__': if __name__ == '__main__':
main_memory = Memory() main_memory = Memory()
if startup_target == '__main__': if startup_target == '__main__':
# Defer heavy imports to avoid side effects in background services
# Mark this process as the primary node for seeding/config init
os.environ.setdefault('NODE_ROLE', 'primary')
# Create DB tables synchronously before importing HTTP app to satisfy _secrets
try:
from sqlalchemy import create_engine
from app.core.models import AlchemyBase # imports all models
db_url = os.environ.get('DATABASE_URL')
if not db_url:
raise RuntimeError('DATABASE_URL is not set')
# Normalize to sync driver
if '+asyncpg' in db_url:
db_url_sync = db_url.replace('+asyncpg', '+psycopg2')
else:
db_url_sync = db_url
sync_engine = create_engine(db_url_sync, pool_pre_ping=True)
AlchemyBase.metadata.create_all(sync_engine)
except Exception as e:
make_log('Startup', f'DB sync init failed: {e}', level='error')
from app.api import app
from app.bot import dp as uploader_bot_dp
from app.client_bot import dp as client_bot_dp
from app.core._config import SANIC_PORT, PROJECT_HOST, DATABASE_URL
app.ctx.memory = main_memory app.ctx.memory = main_memory
for _target in [uploader_bot_dp, client_bot_dp]: for _target in [uploader_bot_dp, client_bot_dp]:
_target._s_memory = app.ctx.memory _target._s_memory = app.ctx.memory
app.ctx.memory._app = app app.ctx.memory._app = app
# Ensure DB schema exists using the same event loop as Sanic # Ensure DB schema exists using the same event loop as Sanic (idempotent)
app.add_task(create_db_tables(engine)) app.add_task(create_db_tables(engine))
app.add_task(execute_queue(app)) app.add_task(execute_queue(app))
@ -132,6 +152,7 @@ if __name__ == '__main__':
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
try: try:
# Background services no longer perform schema initialization
loop.run_until_complete(wrapped_startup_fn(main_memory)) loop.run_until_complete(wrapped_startup_fn(main_memory))
except BaseException as e: except BaseException as e:
make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()), make_log(startup_target[0].upper() + startup_target[1:], f"Error: {e}" + '\n' + str(traceback.format_exc()),

View File

@ -8,8 +8,10 @@ from tonsdk.utils import Address
from app.core._blockchain.ton.wallet_v3cr3 import WalletV3CR3 from app.core._blockchain.ton.wallet_v3cr3 import WalletV3CR3
from app.core.logger import make_log from app.core.logger import make_log
from sqlalchemy import create_engine, text from sqlalchemy import create_engine, inspect
from sqlalchemy.orm import Session
from typing import Optional from typing import Optional
from app.core.models._config import ServiceConfigValue
def _load_seed_from_env_or_generate() -> bytes: def _load_seed_from_env_or_generate() -> bytes:
@ -32,8 +34,8 @@ def _init_seed_via_db() -> bytes:
def db_ready(conn) -> bool: def db_ready(conn) -> bool:
try: try:
r = conn.execute(text("SELECT to_regclass('public.service_config')")).scalar() inspector = inspect(conn)
return r is not None return inspector.has_table('service_config')
except Exception: except Exception:
return False return False
@ -46,15 +48,15 @@ def _init_seed_via_db() -> bytes:
raise TimeoutError("service_config table not available") raise TimeoutError("service_config table not available")
def read_seed() -> Optional[bytes]: def read_seed() -> Optional[bytes]:
row = conn.execute(text("SELECT packed_value FROM service_config WHERE key = :k LIMIT 1"), {"k": "private_key"}).first() try:
with Session(bind=conn) as s:
row = s.query(ServiceConfigValue).filter(ServiceConfigValue.key == 'private_key').first()
if not row: if not row:
return None return None
packed = row[0] or {} packed = row.packed_value or {}
try:
# packed_value is JSON; ensure dict
if isinstance(packed, str): if isinstance(packed, str):
packed = json.loads(packed) packed = json.loads(packed)
seed_hex = packed.get("value") seed_hex = packed.get('value')
return bytes.fromhex(seed_hex) if seed_hex else None return bytes.fromhex(seed_hex) if seed_hex else None
except Exception: except Exception:
return None return None
@ -67,15 +69,16 @@ def _init_seed_via_db() -> bytes:
seed = _load_seed_from_env_or_generate() seed = _load_seed_from_env_or_generate()
# Try insert; if another primary raced, ignore # Try insert; if another primary raced, ignore
try: try:
conn.execute( with Session(bind=conn) as s:
text("INSERT INTO service_config (key, packed_value) VALUES (:k, CAST(:v AS JSON))"), s.add(ServiceConfigValue(key='private_key', packed_value={"value": seed.hex()}))
{"k": "private_key", "v": json.dumps({"value": seed.hex()})} s.commit()
)
conn.commit()
make_log("HotWallet", "Seed saved in service_config by primary", level='info') make_log("HotWallet", "Seed saved in service_config by primary", level='info')
return seed return seed
except Exception: except Exception:
try:
conn.rollback() conn.rollback()
except Exception:
pass
# Read again in case of race # Read again in case of race
seed2 = read_seed() seed2 = read_seed()
if seed2: if seed2:

View File

@ -5,7 +5,6 @@ from httpx import AsyncClient
from app.core._config import PROJECT_HOST from app.core._config import PROJECT_HOST
from app.core._crypto.signer import Signer from app.core._crypto.signer import Signer
from app.core._secrets import hot_seed
from app.core.logger import make_log from app.core.logger import make_log
@ -17,6 +16,8 @@ async def send_status(service: str, status: str):
'status': status, 'status': status,
} }
message_bytes = dumps(message).encode() message_bytes = dumps(message).encode()
# Lazy import to avoid triggering _secrets before DB is ready
from app.core._secrets import hot_seed
signer = Signer(hot_seed) signer = Signer(hot_seed)
message_signature = signer.sign(message_bytes) message_signature = signer.sign(message_bytes)
async with AsyncClient() as client: async with AsyncClient() as client:

View File

@ -1,105 +0,0 @@
version: '3'
services:
maria_db:
image: mariadb:11.2
ports:
- "3307:3306"
env_file:
- .env
volumes:
- /Storage/sqlStorage:/var/lib/mysql
restart: always
healthcheck:
test: [ "CMD", "healthcheck.sh", "--connect", "--innodb_initialized" ]
interval: 10s
timeout: 5s
retries: 3
app:
build:
context: .
dockerfile: Dockerfile
command: python -m app
env_file:
- .env
restart: always
links:
- maria_db
ports:
- "15100:15100"
volumes:
- /Storage/logs:/app/logs
- /Storage/storedContent:/app/data
depends_on:
maria_db:
condition: service_healthy
indexer: # Отправка уведомления о появлении новой NFT-listen. Установка CID поля у всего контента. Проверка следующего за последним индексом item коллекции и поиск нового контента, отправка информации о том что контент найден его загружателю. Присваивание encrypted_content onchain_index
build:
context: .
dockerfile: Dockerfile
restart: always
command: python -m app indexer
env_file:
- .env
links:
- maria_db
volumes:
- /Storage/logs:/app/logs
- /Storage/storedContent:/app/data
depends_on:
maria_db:
condition: service_healthy
ton_daemon: # Работа с TON-сетью. Задачи сервисного кошелька и деплой контрактов
build:
context: .
dockerfile: Dockerfile
command: python -m app ton_daemon
restart: always
env_file:
- .env
links:
- maria_db
volumes:
- /Storage/logs:/app/logs
- /Storage/storedContent:/app/data
depends_on:
maria_db:
condition: service_healthy
license_index: # Проверка кошельков пользователей на новые NFT. Опрос этих NFT на определяемый GET-метод по которому мы определяем что это определенная лицензия и сохранение информации по ней
build:
context: .
dockerfile: Dockerfile
command: python -m app license_index
restart: always
env_file:
- .env
links:
- maria_db
volumes:
- /Storage/logs:/app/logs
- /Storage/storedContent:/app/data
depends_on:
maria_db:
condition: service_healthy
convert_process:
build:
context: .
dockerfile: Dockerfile
command: python -m app convert_process
restart: always
env_file:
- .env
links:
- maria_db
volumes:
- /Storage/logs:/app/logs
- /Storage/storedContent:/app/data
- /var/run/docker.sock:/var/run/docker.sock
depends_on:
maria_db:
condition: service_healthy