From 8f2efee524301455f71beb80c89ebc46140f3004 Mon Sep 17 00:00:00 2001 From: root Date: Sat, 16 Aug 2025 15:08:00 +0000 Subject: [PATCH] add files --- app/api/fastapi_compat_routes.py | 4 - app/api/fastapi_node_routes.py | 9 +- app/bot/middleware.py | 140 ++++++++++++++--------- app/bot/poller.py | 67 +++++++++++ app/core/models/_telegram/wrapped_bot.py | 78 +++++++++---- app/fastapi_main.py | 23 +++- app/services/telegram_poller.py | 52 +++++++++ bootstrap.json | 10 +- 8 files changed, 291 insertions(+), 92 deletions(-) create mode 100644 app/bot/poller.py create mode 100644 app/services/telegram_poller.py diff --git a/app/api/fastapi_compat_routes.py b/app/api/fastapi_compat_routes.py index 611a7ea..a2a6ba5 100644 --- a/app/api/fastapi_compat_routes.py +++ b/app/api/fastapi_compat_routes.py @@ -81,7 +81,6 @@ async def platform_metadata(): } -# Legacy index and favicon @router.get("/") async def index_root(): return PlainTextResponse("MY Network Node", status_code=200) @@ -91,7 +90,6 @@ async def favicon(): return PlainTextResponse("", status_code=204) -# Legacy node endpoints @router.get("/api/v1/node") async def v1_node(): from app.core.crypto import get_ed25519_manager @@ -105,7 +103,6 @@ async def v1_node_friendly(): return PlainTextResponse(f"Node ID: {cm.node_id}\nIndexer height: 0\nServices: none\n") -# Legacy auth endpoints @router.post("/api/v1/auth.twa") async def v1_auth_twa(payload: dict): user_ref = payload.get("user") or {} @@ -141,7 +138,6 @@ async def v1_storage_upload(file: UploadFile = File(...)): file_path = os.path.join(backend.files_path, file_hash) async with aiofiles.open(file_path, 'wb') as f: await f.write(data) - # Возвращаем hash без записи ORM, чтобы избежать конфликтов схем return {"hash": file_hash} except HTTPException: raise diff --git a/app/api/fastapi_node_routes.py b/app/api/fastapi_node_routes.py index 46023a0..e6ada15 100644 --- a/app/api/fastapi_node_routes.py +++ b/app/api/fastapi_node_routes.py @@ -20,12 +20,10 @@ router = APIRouter(prefix="/api/node", tags=["node-communication"]) async def validate_node_request(request: Request) -> Dict[str, Any]: """Валидация межузлового запроса с обязательной проверкой подписи""" - # Заголовки required_headers = ["x-node-communication", "x-node-id", "x-node-public-key", "x-node-signature"] for header in required_headers: if header not in request.headers: raise HTTPException(status_code=400, detail=f"Missing required header: {header}") - if request.headers.get("x-node-communication") != "true": raise HTTPException(status_code=400, detail="Not a valid inter-node communication") @@ -35,18 +33,15 @@ async def validate_node_request(request: Request) -> Dict[str, Any]: node_id = request.headers.get("x-node-id") public_key = request.headers.get("x-node-public-key") - # Тело запроса body = await request.body() if not body: raise HTTPException(status_code=400, detail="Empty message body") - - # JSON try: message_data = json.loads(body.decode()) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="Invalid JSON in request body") - # Anti-replay (необязательно для обратной совместимости) + # Optional anti-replay try: ts = message_data.get("timestamp") nonce = message_data.get("nonce") @@ -62,10 +57,8 @@ async def validate_node_request(request: Request) -> Dict[str, Any]: raise HTTPException(status_code=400, detail="replay detected") await cache.set(cache_key, True, ttl=600) except Exception: - # ignore for backward compatibility pass - # Подпись is_valid = crypto_manager.verify_signature(message_data, signature, public_key) if not is_valid: logger.warning(f"Invalid signature from node {node_id}") diff --git a/app/bot/middleware.py b/app/bot/middleware.py index f926c4a..8400572 100644 --- a/app/bot/middleware.py +++ b/app/bot/middleware.py @@ -1,93 +1,124 @@ from app.core.logger import make_log, logger from app.core.models._telegram import Wrapped_CBotChat from app.core.models.user import User -from app.core.storage import db_session + from aiogram import BaseMiddleware, types from app.core.models.messages import KnownTelegramMessage from datetime import datetime +# Bot handlers historically use synchronous SQLAlchemy patterns. +# Keep a dedicated sync engine/session for bot middleware to preserve legacy behavior. +import re +from typing import Optional +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker, Session +from app.core.config import get_settings + + +_SYNC_ENGINE = None +_SYNC_FACTORY: Optional[sessionmaker] = None + + +def _to_sync_dsn(async_dsn: str) -> str: + # Convert postgresql+asyncpg:// to postgresql+psycopg2:// for synchronous engine + return re.sub(r"\+asyncpg", "+psycopg2", async_dsn) + + +def _ensure_sync_session() -> Session: + global _SYNC_ENGINE, _SYNC_FACTORY + if _SYNC_ENGINE is None or _SYNC_FACTORY is None: + settings = get_settings() + dsn = _to_sync_dsn(settings.DATABASE_URL) + _SYNC_ENGINE = create_engine(dsn, pool_pre_ping=True, future=True) + _SYNC_FACTORY = sessionmaker(bind=_SYNC_ENGINE, autocommit=False, autoflush=True) + return _SYNC_FACTORY() + class UserDataMiddleware(BaseMiddleware): - async def __call__(self, handler, event, data): - update_body = event.message or event.callback_query or event.inline_query or event.pre_checkout_query - if not update_body: - return - if update_body.from_user.is_bot is True: + async def __call__(self, handler, event, data): + update_body = event.message or event.callback_query or getattr(event, 'inline_query', None) or getattr(event, 'pre_checkout_query', None) + if not update_body or getattr(update_body, 'from_user', None) is None: + return + if getattr(update_body.from_user, 'is_bot', False): return user_id = update_body.from_user.id assert user_id >= 1 - # TODO: maybe make users cache + # Use sync session for bot handlers compatibility + from app.core.models.user.user import User as DbUser + from app.core.models.messages import KnownTelegramMessage as DbKnownMsg + from app.core.logging import logger as app_logger - with db_session(auto_commit=False) as session: + session = _ensure_sync_session() + try: + # Load or create user try: - user = session.query(User).filter_by(telegram_id=user_id).first() - except BaseException as e: - logger.error(f"Error when middleware getting user: {e}") + user = session.query(DbUser).filter(DbUser.telegram_id == user_id).first() + except Exception as e: + await app_logger.aerror("Middleware get user failed", error=str(e)) user = None if user is None: - logger.debug(f"User {user_id} not found. Creating new user") - user = User( + await app_logger.adebug("Creating new user", telegram_id=user_id) + user = DbUser( telegram_id=user_id, - username=update_body.from_user.username, - lang_code='en', - last_use=datetime.now(), - meta=dict(first_name=update_body.from_user.first_name or '', - last_name=update_body.from_user.last_name or '', username=update_body.from_user.username, - language_code=update_body.from_user.language_code, - is_premium=update_body.from_user.is_premium), - created=datetime.now() + username=getattr(update_body.from_user, 'username', None), + language_code=getattr(update_body.from_user, 'language_code', 'en'), + meta={ + 'first_name': getattr(update_body.from_user, 'first_name', '') or '', + 'last_name': getattr(update_body.from_user, 'last_name', '') or '', + 'username': getattr(update_body.from_user, 'username', None), + 'language_code': getattr(update_body.from_user, 'language_code', None), + 'is_premium': getattr(update_body.from_user, 'is_premium', False), + } ) session.add(user) session.commit() else: - if user.username != update_body.from_user.username: - user.username = update_body.from_user.username - - updated_meta_fields = {} - if user.meta.get('first_name') != update_body.from_user.first_name: - updated_meta_fields['first_name'] = update_body.from_user.first_name - - if user.meta.get('last_name') != update_body.from_user.last_name: - updated_meta_fields['last_name'] = update_body.from_user.last_name - - user.meta = { - **user.meta, - **updated_meta_fields - } - - user.last_use = datetime.now() - session.commit() + # Update username/metadata + changed = False + if user.username != getattr(update_body.from_user, 'username', None): + user.username = getattr(update_body.from_user, 'username', None) + changed = True + meta = dict(user.meta or {}) + if meta.get('first_name') != getattr(update_body.from_user, 'first_name', None): + meta['first_name'] = getattr(update_body.from_user, 'first_name', None) + changed = True + if meta.get('last_name') != getattr(update_body.from_user, 'last_name', None): + meta['last_name'] = getattr(update_body.from_user, 'last_name', None) + changed = True + user.meta = meta + user.last_activity = datetime.utcnow() + if changed: + session.commit() data['user'] = user + # Pass sync session for routers expecting .query() data['db_session'] = session + # chat_wrap can work with sync sessions too data['chat_wrap'] = Wrapped_CBotChat(data['bot'], chat_id=user_id, db_session=session, user=user) - data['memory'] = data['dispatcher']._s_memory + # De-duplicate known messages if getattr(update_body, 'text', None): - message_type = 'common' - if update_body.text.startswith('/start'): - message_type = 'start_command' - - if session.query(KnownTelegramMessage).filter_by( - chat_id=update_body.chat.id, - message_id=update_body.message_id, - from_user=True - ).first(): - make_log("UserDataMiddleware", f"Message {update_body.message_id} already processed", level='debug') + existed = session.query(DbKnownMsg).filter( + (DbKnownMsg.chat_id == update_body.chat.id) & + (DbKnownMsg.message_id == update_body.message_id) & + (DbKnownMsg.from_user == True) + ).first() + if existed: + await app_logger.adebug("Message already processed", message_id=update_body.message_id) return - new_message = KnownTelegramMessage( - type=message_type, + new_message = DbKnownMsg( + type='start_command' if str(update_body.text).startswith('/start') else 'common', bot_id=data['chat_wrap'].bot_id, chat_id=update_body.chat.id, message_id=update_body.message_id, from_user=True, from_telegram_id=user_id, - created=datetime.now(), + created=datetime.utcnow(), meta={} ) session.add(new_message) @@ -95,3 +126,8 @@ class UserDataMiddleware(BaseMiddleware): result = await handler(event, data) return result + finally: + try: + session.close() + except Exception: + pass diff --git a/app/bot/poller.py b/app/bot/poller.py new file mode 100644 index 0000000..211509c --- /dev/null +++ b/app/bot/poller.py @@ -0,0 +1,67 @@ +""" +Lightweight Telegram bot poller to ensure the bot responds. +Does not depend on legacy sync DB middleware. Provides minimal /start handler. +""" +import asyncio +import logging +from typing import Optional + +from aiogram import Bot, Dispatcher, Router, types +from aiogram.filters import Command + +from app.core.config import get_settings + +logger = logging.getLogger(__name__) + + +def build_router(name: str) -> Router: + r = Router(name=name) + + @r.message(Command("start")) + async def cmd_start(message: types.Message): + await message.answer( + "MY Network bot online. Use /help to get options.") + + @r.message(Command("help")) + async def cmd_help(message: types.Message): + await message.answer("Available: /start, /help") + + return r + + +async def _run_single_bot(token: str, name: str): + bot = Bot(token) + dp = Dispatcher() + dp.include_router(build_router(name)) + logger.info("Starting Telegram bot polling", extra={"name": name}) + try: + await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types()) + except Exception as e: + logger.error("Bot polling failed", extra={"name": name, "error": str(e)}) + await bot.session.close() + + +async def start_bots_if_configured() -> list[asyncio.Task]: + settings = get_settings() + tasks: list[asyncio.Task] = [] + # Prefer TELEGRAM_WEBHOOK_ENABLED to avoid double-processing + webhook_enabled = bool(getattr(settings, 'TELEGRAM_WEBHOOK_ENABLED', False)) + if webhook_enabled: + logger.info("Telegram webhook enabled; skipping polling start") + return tasks + + # Main bot + token: Optional[str] = getattr(settings, 'TELEGRAM_API_KEY', None) + if token: + tasks.append(asyncio.create_task(_run_single_bot(token, 'main'))) + + # Client bot + ct: Optional[str] = getattr(settings, 'CLIENT_TELEGRAM_API_KEY', None) + if ct: + tasks.append(asyncio.create_task(_run_single_bot(ct, 'client'))) + + if tasks: + logger.info("Telegram bots polling started", extra={"count": len(tasks)}) + else: + logger.info("No Telegram tokens configured; polling not started") + return tasks diff --git a/app/core/models/_telegram/wrapped_bot.py b/app/core/models/_telegram/wrapped_bot.py index 4cb1f30..6e701f2 100644 --- a/app/core/models/_telegram/wrapped_bot.py +++ b/app/core/models/_telegram/wrapped_bot.py @@ -5,6 +5,8 @@ from sqlalchemy import and_ from app.core.logger import make_log from app.core.models.messages import KnownTelegramMessage +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy import select from app.core._config import TELEGRAM_API_KEY, CLIENT_TELEGRAM_API_KEY from app.core.models._telegram.templates.player import PlayerTemplates @@ -56,25 +58,36 @@ class Wrapped_CBotChat(T, PlayerTemplates): if self.db_session: if message_type == 'common': ci = 0 - for oc_msg in self.db_session.query(KnownTelegramMessage).filter( - and_( - KnownTelegramMessage.type == 'common', - KnownTelegramMessage.bot_id == self.bot_id, - KnownTelegramMessage.chat_id == self._chat_id, - KnownTelegramMessage.deleted == False - ) - ).all(): + if isinstance(self.db_session, AsyncSession): + res = await self.db_session.execute(select(KnownTelegramMessage).where( + and_( + KnownTelegramMessage.type == 'common', + KnownTelegramMessage.bot_id == self.bot_id, + KnownTelegramMessage.chat_id == self._chat_id, + KnownTelegramMessage.deleted == False + ) + )) + old = res.scalars().all() + else: + old = self.db_session.query(KnownTelegramMessage).filter( + and_( + KnownTelegramMessage.type == 'common', + KnownTelegramMessage.bot_id == self.bot_id, + KnownTelegramMessage.chat_id == self._chat_id, + KnownTelegramMessage.deleted == False + ) + ).all() + for oc_msg in old: make_log(self, f"Delete old message {oc_msg.message_id} {oc_msg.type} {oc_msg.bot_id} {oc_msg.chat_id}") await self.delete_message(oc_msg.message_id) ci += 1 - make_log(self, f"Deleted {ci} old messages", level='debug') if isinstance(result, types.Message): message_id = getattr(result, 'message_id', None) assert message_id, "No message_id" - self.db_session.add( - KnownTelegramMessage( + if isinstance(self.db_session, AsyncSession): + self.db_session.add(KnownTelegramMessage( type=message_type, bot_id=self.bot_id, chat_id=self._chat_id, @@ -83,9 +96,20 @@ class Wrapped_CBotChat(T, PlayerTemplates): created=datetime.now(), meta=message_meta or {}, content_id=content_id - ) - ) - self.db_session.commit() + )) + await self.db_session.commit() + else: + self.db_session.add(KnownTelegramMessage( + type=message_type, + bot_id=self.bot_id, + chat_id=self._chat_id, + message_id=message_id, + from_user=False, + created=datetime.now(), + meta=message_meta or {}, + content_id=content_id + )) + self.db_session.commit() else: make_log(self, f"Unknown result type: {type(result)}", level='warning') @@ -137,14 +161,28 @@ class Wrapped_CBotChat(T, PlayerTemplates): message_id )): if self.db_session: - known_message = self.db_session.query(KnownTelegramMessage).filter( - KnownTelegramMessage.bot_id == self.bot_id, - KnownTelegramMessage.chat_id == self._chat_id, - KnownTelegramMessage.message_id == message_id - ).first() + known_message = None + if isinstance(self.db_session, AsyncSession): + res = await self.db_session.execute(select(KnownTelegramMessage).where( + and_( + KnownTelegramMessage.bot_id == self.bot_id, + KnownTelegramMessage.chat_id == self._chat_id, + KnownTelegramMessage.message_id == message_id + ) + )) + known_message = res.scalars().first() + else: + known_message = self.db_session.query(KnownTelegramMessage).filter( + KnownTelegramMessage.bot_id == self.bot_id, + KnownTelegramMessage.chat_id == self._chat_id, + KnownTelegramMessage.message_id == message_id + ).first() if known_message: known_message.deleted = True - self.db_session.commit() + if isinstance(self.db_session, AsyncSession): + await self.db_session.commit() + else: + self.db_session.commit() except Exception as e: make_log(self, f"Error deleting message {self._chat_id}/{message_id}. Error: {e}", level='warning') return None diff --git a/app/fastapi_main.py b/app/fastapi_main.py index c507df9..4ae481e 100644 --- a/app/fastapi_main.py +++ b/app/fastapi_main.py @@ -121,9 +121,16 @@ async def lifespan(app: FastAPI): # Проверка готовности системы await logger.ainfo("System initialization completed successfully") - + + # Start Telegram bot polling (non-blocking) if configured + try: + from app.services.telegram_poller import start_bots_if_configured + app.state._bot_tasks = await start_bots_if_configured() + except Exception as _e: + await logger.awarning('Telegram bot polling not started', error=str(_e)) + yield - + except Exception as e: await logger.aerror(f"Failed to initialize application: {e}") raise @@ -149,7 +156,17 @@ async def lifespan(app: FastAPI): await logger.ainfo("Cache connections closed") except Exception as e: await logger.aerror(f"Error closing cache: {e}") - + + # Stop Telegram bot polling tasks if any + try: + tasks = getattr(app.state, '_bot_tasks', []) or [] + for t in tasks: + t.cancel() + if tasks: + await logger.ainfo('Telegram bot polling tasks cancelled', count=len(tasks)) + except Exception as _e: + await logger.awarning('Failed to cancel bot tasks', error=str(_e)) + await logger.ainfo("Application shutdown completed") diff --git a/app/services/telegram_poller.py b/app/services/telegram_poller.py new file mode 100644 index 0000000..9656581 --- /dev/null +++ b/app/services/telegram_poller.py @@ -0,0 +1,52 @@ +""" +Project Telegram polling startup: launches both main and client bots with full routers and middleware. +Cleans webhooks before polling to ensure updates delivery. +""" +import asyncio +import logging +from typing import Optional + +from aiogram import Bot, Dispatcher +from aiogram.fsm.storage.memory import MemoryStorage + +from app.core.config import get_settings +from app.bot.middleware import UserDataMiddleware +from app.bot.routers.index import main_router as main_bot_router +from app.client_bot.routers.index import main_router as client_bot_router + +logger = logging.getLogger(__name__) + + +async def _run_bot(token: str, name: str, router) -> None: + bot = Bot(token) + dp = Dispatcher(storage=MemoryStorage()) + dp.update.outer_middleware(UserDataMiddleware()) + dp.include_router(router) + logger.info("Preparing Telegram bot for polling", extra={"name": name}) + # Ensure webhook is removed so polling works + await bot.delete_webhook(drop_pending_updates=False) + logger.info("Webhook cleared", extra={"name": name}) + logger.info("Starting Telegram bot polling", extra={"name": name}) + await dp.start_polling(bot, allowed_updates=dp.resolve_used_update_types()) + + +async def start_bots_if_configured() -> list[asyncio.Task]: + settings = get_settings() + tasks: list[asyncio.Task] = [] + if bool(getattr(settings, 'TELEGRAM_WEBHOOK_ENABLED', False)): + logger.info("Telegram webhook enabled; skipping polling start") + return tasks + + main_token: Optional[str] = getattr(settings, 'TELEGRAM_API_KEY', None) + if main_token: + tasks.append(asyncio.create_task(_run_bot(main_token, 'main', main_bot_router))) + + client_token: Optional[str] = getattr(settings, 'CLIENT_TELEGRAM_API_KEY', None) + if client_token: + tasks.append(asyncio.create_task(_run_bot(client_token, 'client', client_bot_router))) + + if tasks: + logger.info("Telegram bots polling started", extra={"count": len(tasks)}) + else: + logger.info("No Telegram tokens configured; polling not started") + return tasks diff --git a/bootstrap.json b/bootstrap.json index e297cb5..110a266 100644 --- a/bootstrap.json +++ b/bootstrap.json @@ -1,14 +1,14 @@ { "version": "3.0.0", - "network_id": "my-network-1755317385", - "created_at": "2025-08-16T04:09:45Z", + "network_id": "my-network-1755348796", + "created_at": "2025-08-16T12:53:16Z", "bootstrap_nodes": [ { - "id": "node-3a2c6a21e3401fce", - "node_id": "node-3a2c6a21e3401fce", + "id": "node-bea1d09bc687311b", + "node_id": "node-bea1d09bc687311b", "address": "2a02:6b40:2000:16b1::1", "port": 8000, - "public_key": "3a2c6a21e3401fceed1fb63c45d068f20e21b48159db3a961a2c43e8701071d4", + "public_key": "bea1d09bc687311b17b048789be8d8950b88904aa68a0d9992a83cb3851e8bd6", "trusted": true, "node_type": "bootstrap" }