From f140181c45aebc55b4dd5bbd6db7703b1fb11cc6 Mon Sep 17 00:00:00 2001 From: unexpected Date: Wed, 15 Oct 2025 16:57:21 +0000 Subject: [PATCH] events & global sync. unstable --- app/__main__.py | 3 + app/api/__init__.py | 6 + app/api/routes/admin.py | 330 +++++++++++++++++++++- app/api/routes/content.py | 34 ++- app/api/routes/network.py | 17 ++ app/api/routes/network_events.py | 77 +++++ app/api/routes/upload_tus.py | 22 +- app/bot/routers/home.py | 30 ++ app/client_bot/routers/home.py | 30 ++ app/core/_utils/create_maria_tables.py | 34 +++ app/core/background/event_sync_service.py | 152 ++++++++++ app/core/background/index_scout_v3.py | 148 +++++++++- app/core/background/indexer_service.py | 31 ++ app/core/background/license_service.py | 42 ++- app/core/events/__init__.py | 17 ++ app/core/events/service.py | 185 ++++++++++++ app/core/models/__init__.py | 1 + app/core/models/events.py | 48 ++++ app/core/models/transaction.py | 7 + app/core/models/user/__init__.py | 3 +- app/core/network/nodesig.py | 10 +- 21 files changed, 1196 insertions(+), 31 deletions(-) create mode 100644 app/api/routes/network_events.py create mode 100644 app/core/background/event_sync_service.py create mode 100644 app/core/events/__init__.py create mode 100644 app/core/events/service.py create mode 100644 app/core/models/events.py diff --git a/app/__main__.py b/app/__main__.py index 6d12449..970ceae 100644 --- a/app/__main__.py +++ b/app/__main__.py @@ -151,6 +151,9 @@ if __name__ == '__main__': elif startup_target == 'derivative_janitor': from app.core.background.derivative_cache_janitor import main_fn as target_fn time.sleep(5) + elif startup_target == 'events_sync': + from app.core.background.event_sync_service import main_fn as target_fn + time.sleep(5) startup_fn = startup_fn or target_fn assert startup_fn diff --git a/app/api/__init__.py b/app/api/__init__.py index fac6588..4a47eb9 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -20,6 +20,7 @@ from app.api.routes.network import ( s_api_v1_network_nodes, s_api_v1_network_handshake, ) +from app.api.routes.network_events import s_api_v1_network_events from app.api.routes.auth import s_api_v1_auth_twa, s_api_v1_auth_select_wallet, s_api_v1_auth_me from app.api.routes.statics import s_api_tonconnect_manifest, s_api_platform_metadata from app.api.routes.node_storage import s_api_v1_storage_post, s_api_v1_storage_get, \ @@ -36,9 +37,11 @@ from app.api.routes.admin import ( s_api_v1_admin_blockchain, s_api_v1_admin_cache_cleanup, s_api_v1_admin_cache_setlimits, + s_api_v1_admin_events, s_api_v1_admin_licenses, s_api_v1_admin_login, s_api_v1_admin_logout, + s_api_v1_admin_users_setadmin, s_api_v1_admin_node_setrole, s_api_v1_admin_nodes, s_api_v1_admin_overview, @@ -66,6 +69,7 @@ app.add_route(s_api_system_send_status, "/api/system.sendStatus", methods=["POST app.add_route(s_api_v1_network_info, "/api/v1/network.info", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_network_nodes, "/api/v1/network.nodes", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_network_handshake, "/api/v1/network.handshake", methods=["POST", "OPTIONS"]) +app.add_route(s_api_v1_network_events, "/api/v1/network.events", methods=["GET", "OPTIONS"]) app.add_route(s_api_tonconnect_manifest, "/api/tonconnect-manifest.json", methods=["GET", "OPTIONS"]) app.add_route(s_api_platform_metadata, "/api/platform-metadata.json", methods=["GET", "OPTIONS"]) @@ -102,8 +106,10 @@ app.add_route(s_api_v1_admin_overview, "/api/v1/admin.overview", methods=["GET", app.add_route(s_api_v1_admin_storage, "/api/v1/admin.storage", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_uploads, "/api/v1/admin.uploads", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_users, "/api/v1/admin.users", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_users_setadmin, "/api/v1/admin.users.setAdmin", methods=["POST", "OPTIONS"]) app.add_route(s_api_v1_admin_licenses, "/api/v1/admin.licenses", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_stars, "/api/v1/admin.stars", methods=["GET", "OPTIONS"]) +app.add_route(s_api_v1_admin_events, "/api/v1/admin.events", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_system, "/api/v1/admin.system", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_blockchain, "/api/v1/admin.blockchain", methods=["GET", "OPTIONS"]) app.add_route(s_api_v1_admin_node_setrole, "/api/v1/admin.node.setRole", methods=["POST", "OPTIONS"]) diff --git a/app/api/routes/admin.py b/app/api/routes/admin.py index 6a6bc82..8013822 100644 --- a/app/api/routes/admin.py +++ b/app/api/routes/admin.py @@ -7,11 +7,12 @@ import shutil from collections import defaultdict from datetime import datetime from pathlib import Path -from typing import Any, Dict, List, Optional +from typing import Any, Dict, List, Optional, Tuple +from urllib.parse import urlparse from base58 import b58encode from sanic import response -from sqlalchemy import Integer, String, and_, case, cast, func, or_, select +from sqlalchemy import Integer, String, and_, case, cast, func, or_, select, Text from app.api.routes._system import get_git_info from app.core._blockchain.ton.platform import platform @@ -20,6 +21,7 @@ from app.core._config import ( BACKEND_LOGS_DIR_HOST, LOG_DIR, CLIENT_TELEGRAM_BOT_USERNAME, + TELEGRAM_BOT_USERNAME, PROJECT_HOST, UPLOADS_DIR, ) @@ -34,7 +36,8 @@ from app.core.models.content_v3 import ( IpfsSync, UploadSession, ) -from app.core.models.my_network import KnownNode +from app.core.models.my_network import KnownNode, RemoteContentIndex +from app.core.models.events import NodeEvent from app.core.models.tasks import BlockchainTask from app.core.models.node_storage import StoredContent from app.core.models.user import User @@ -44,6 +47,7 @@ from app.core.models.wallet_connection import WalletConnection from app.core.models.user_activity import UserActivity from app.core._utils.share_links import build_content_links from app.core.content.content_id import ContentId +from app.core.events.service import record_event MIN_ONCHAIN_INDEX = int(os.getenv("MIN_ONCHAIN_INDEX", "8")) @@ -161,6 +165,38 @@ def _service_states(request) -> List[Dict[str, Any]]: return items +def _node_public_base(node: KnownNode) -> Optional[str]: + meta = node.meta or {} + public_host = (meta.get('public_host') or '').strip() + if public_host: + base = public_host.rstrip('/') + if base.startswith('http://') or base.startswith('https://'): + return base + scheme = 'https' if node.port == 443 else 'http' + return f"{scheme}://{base.lstrip('/')}" + scheme = 'https' if node.port == 443 else 'http' + host = (node.ip or '').strip() + if not host: + return None + default_port = 443 if scheme == 'https' else 80 + if node.port and node.port != default_port: + return f"{scheme}://{host}:{node.port}" + return f"{scheme}://{host}" + + +def _node_gateway_base(node: KnownNode) -> Optional[str]: + meta = node.meta or {} + public_host = meta.get('public_host') or node.ip or '' + if not public_host: + return None + parsed = urlparse(public_host if '://' in public_host else f"https://{public_host}") + hostname = parsed.hostname or (node.ip or '').strip() + if not hostname: + return None + port = parsed.port or 8080 + return f"http://{hostname}:{port}" + + def _format_dt(value: Optional[datetime]) -> Optional[str]: return value.isoformat() + 'Z' if isinstance(value, datetime) else None @@ -478,6 +514,18 @@ async def s_api_v1_admin_uploads(request): for content_id, count in license_rows: license_counts[int(content_id)] = int(count) + remote_map: Dict[str, List[Tuple[RemoteContentIndex, KnownNode]]] = defaultdict(list) + if encrypted_cids: + remote_rows = (await session.execute( + select(RemoteContentIndex, KnownNode) + .join(KnownNode, RemoteContentIndex.remote_node_id == KnownNode.id) + .where(RemoteContentIndex.encrypted_hash.in_(encrypted_cids)) + )).all() + for remote_row, node in remote_rows: + if not remote_row.encrypted_hash: + continue + remote_map[remote_row.encrypted_hash].append((remote_row, node)) + contents_payload: List[Dict[str, Any]] = [] category_totals: Dict[str, int] = {key: 0 for key in ALLOWED_UPLOAD_FILTERS if key != 'all'} matched_total = 0 @@ -599,6 +647,66 @@ async def s_api_v1_admin_uploads(request): if url ] + distribution_nodes: List[Dict[str, Any]] = [] + meta_local_host = urlparse(PROJECT_HOST) if PROJECT_HOST else None + if stored: + distribution_nodes.append({ + 'node_id': None, + 'is_local': True, + 'host': (meta_local_host.hostname if meta_local_host and meta_local_host.hostname else 'local'), + 'public_host': PROJECT_HOST.rstrip('/') if PROJECT_HOST else None, + 'version': None, + 'role': 'self', + 'last_seen': None, + 'content': { + 'encrypted_cid': content.encrypted_cid, + 'content_type': content.content_type, + 'size_bytes': content.enc_size_bytes, + 'preview_enabled': content.preview_enabled, + 'updated_at': _format_dt(content.updated_at), + 'metadata_cid': metadata_cid, + 'issuer_node_id': None, + }, + 'links': { + 'web_view': web_view_url, + 'api_view': f"{PROJECT_HOST}/api/v1/content.view/{share_target}" if PROJECT_HOST else None, + 'gateway_view': None, + }, + }) + + remote_entries = remote_map.get(content.encrypted_cid, []) + for remote_row, node in remote_entries: + node_meta = node.meta or {} + base_url = _node_public_base(node) + gateway_base = _node_gateway_base(node) + remote_meta = remote_row.meta if isinstance(remote_row.meta, dict) else {} + remote_share_target = remote_meta.get('share_target') or content.encrypted_cid + distribution_nodes.append({ + 'node_id': node.id, + 'is_local': False, + 'host': node.ip, + 'public_host': node_meta.get('public_host'), + 'version': node_meta.get('version'), + 'role': node_meta.get('role') or 'read-only', + 'last_seen': _format_dt(node.last_sync), + 'content': { + 'encrypted_cid': remote_row.encrypted_hash, + 'content_type': remote_row.content_type, + 'size_bytes': remote_meta.get('size_bytes'), + 'preview_enabled': remote_meta.get('preview_enabled'), + 'updated_at': _format_dt(remote_row.last_updated), + 'metadata_cid': remote_meta.get('metadata_cid'), + 'issuer_node_id': remote_meta.get('issuer_node_id'), + }, + 'links': { + 'web_view': f"{base_url}/viewContent?content={remote_share_target}" if base_url else None, + 'api_view': f"{base_url}/api/v1/content.view/{remote_share_target}" if base_url else None, + 'gateway_view': f"{gateway_base}/ipfs/{content.encrypted_cid}" if gateway_base else None, + }, + }) + if len(distribution_nodes) > 1: + distribution_nodes.sort(key=lambda entry: (0 if entry.get('is_local') else 1, entry.get('host') or '')) + upload_state_norm = (latest_upload.state or '').lower() if latest_upload else '' conversion_state_norm = (conversion_state or '').lower() if conversion_state else '' ipfs_state_norm = (ipfs_sync.pin_state or '').lower() if (ipfs_sync and ipfs_sync.pin_state) else '' @@ -733,6 +841,10 @@ async def s_api_v1_admin_uploads(request): 'download_primary': primary_download, 'download_derivatives': derivative_downloads, }, + 'distribution': { + 'local_present': bool(stored), + 'nodes': distribution_nodes, + }, 'flags': flags, }) @@ -821,6 +933,7 @@ async def s_api_v1_admin_users(request): 'items': [], 'summary': { 'users_returned': 0, + 'admins_total': 0, 'wallets_total': 0, 'wallets_active': 0, 'licenses_total': 0, @@ -828,6 +941,7 @@ async def s_api_v1_admin_users(request): 'stars_total': 0, 'stars_paid': 0, 'stars_unpaid': 0, + 'stars_amount_total': 0, 'stars_amount_paid': 0, 'stars_amount_unpaid': 0, 'unique_ips_total': 0, @@ -994,6 +1108,7 @@ async def s_api_v1_admin_users(request): items: List[Dict[str, Any]] = [] summary = { 'users_returned': 0, + 'admins_total': 0, 'wallets_total': 0, 'wallets_active': 0, 'licenses_total': 0, @@ -1009,6 +1124,8 @@ async def s_api_v1_admin_users(request): for user in user_rows: summary['users_returned'] += 1 + if getattr(user, 'is_admin', False): + summary['admins_total'] += 1 meta = user.meta or {} wallet_list = wallet_map.get(user.id, []) @@ -1077,6 +1194,7 @@ async def s_api_v1_admin_users(request): 'created_at': _format_dt(user.created), 'updated_at': _format_dt(user.updated), 'last_use': _format_dt(user.last_use), + 'is_admin': bool(user.is_admin), 'meta': { 'ref_id': meta.get('ref_id'), 'referrer_id': meta.get('referrer_id'), @@ -1111,6 +1229,59 @@ async def s_api_v1_admin_users(request): return response.json(base_payload) +async def s_api_v1_admin_users_setadmin(request): + if (unauth := _ensure_admin(request)): + return unauth + + data = request.json or {} + try: + user_id = int(data.get('user_id')) + except (TypeError, ValueError): + return response.json({"error": "BAD_USER_ID"}, status=400) + + is_admin_raw = data.get('is_admin') + if isinstance(is_admin_raw, str): + normalized = is_admin_raw.strip().lower() + if normalized in {'1', 'true', 'yes', 'y', 'on'}: + is_admin = True + elif normalized in {'0', 'false', 'no', 'n', 'off'}: + is_admin = False + else: + return response.json({"error": "BAD_FLAG"}, status=400) + else: + is_admin = bool(is_admin_raw) + + session = request.ctx.db_session + user = await session.get(User, user_id) + if not user: + return response.json({"error": "NOT_FOUND"}, status=404) + + user.is_admin = is_admin + user.updated = datetime.utcnow() + try: + await record_event( + session, + 'user_role_changed', + { + 'user_id': user.id, + 'telegram_id': user.telegram_id, + 'is_admin': is_admin, + }, + origin_host=PROJECT_HOST, + ) + except Exception as exc: + make_log('Admin', f"Failed to record user_role_changed event: {exc}", level='warning') + + await session.commit() + return response.json({ + "ok": True, + "user": { + "id": user.id, + "is_admin": bool(user.is_admin), + } + }) + + async def s_api_v1_admin_licenses(request): if (unauth := _ensure_admin(request)): return unauth @@ -1597,6 +1768,15 @@ async def s_api_v1_admin_stars(request): 'amount': invoice.amount, 'paid': bool(invoice.paid), 'invoice_url': invoice.invoice_url, + 'telegram_id': invoice.telegram_id, + 'bot_username': invoice.bot_username, + 'payment_node': { + 'public_key': invoice.payment_node_id, + 'host': invoice.payment_node_public_host, + }, + 'payment_tx_id': invoice.payment_tx_id, + 'paid_at': _format_dt(invoice.paid_at), + 'is_remote': bool(invoice.is_remote), 'created_at': _format_dt(invoice.created), 'user': user_payload, 'content': content_payload, @@ -1618,6 +1798,136 @@ async def s_api_v1_admin_stars(request): return response.json(base_payload) +async def s_api_v1_admin_events(request): + if (unauth := _ensure_admin(request)): + return unauth + + session = request.ctx.db_session + + try: + limit = int(request.args.get('limit') or 50) + except (TypeError, ValueError): + limit = 50 + limit = max(1, min(limit, 200)) + + try: + offset = int(request.args.get('offset') or 0) + except (TypeError, ValueError): + offset = 0 + offset = max(0, offset) + + type_param = (request.args.get('type') or '').strip() + status_param = (request.args.get('status') or '').strip() + origin_param = (request.args.get('origin') or '').strip() + search_param = (request.args.get('search') or '').strip() + + filters = [] + applied_filters: Dict[str, Any] = {} + + if type_param: + type_values = [value.strip() for value in type_param.split(',') if value.strip()] + if type_values: + filters.append(NodeEvent.event_type.in_(type_values)) + applied_filters['type'] = type_values + + if status_param: + status_values = [value.strip() for value in status_param.split(',') if value.strip()] + if status_values: + filters.append(NodeEvent.status.in_(status_values)) + applied_filters['status'] = status_values + + if origin_param: + origin_values = [value.strip() for value in origin_param.split(',') if value.strip()] + if origin_values: + filters.append(NodeEvent.origin_public_key.in_(origin_values)) + applied_filters['origin'] = origin_values + + if search_param: + search_like = f"%{search_param}%" + filters.append(or_( + NodeEvent.uid.ilike(search_like), + cast(NodeEvent.payload, Text).ilike(search_like), + )) + applied_filters['search'] = search_param + + total_stmt = select(func.count()).select_from(NodeEvent) + if filters: + total_stmt = total_stmt.where(and_(*filters)) + total = (await session.execute(total_stmt)).scalar_one() + + query_stmt = ( + select(NodeEvent) + .order_by(NodeEvent.created_at.desc()) + .offset(offset) + .limit(limit) + ) + if filters: + query_stmt = query_stmt.where(and_(*filters)) + + rows = (await session.execute(query_stmt)).scalars().all() + + def _event_links(row: NodeEvent) -> Dict[str, Optional[str]]: + links: Dict[str, Optional[str]] = {} + payload = row.payload or {} + cid = payload.get('encrypted_cid') or payload.get('content_cid') or payload.get('content_id') + if cid: + links['admin_uploads'] = f"uploads?search={cid}" + if PROJECT_HOST: + links['content_view'] = f"{PROJECT_HOST}/viewContent?content={cid}" + invoice_id = payload.get('invoice_id') + if invoice_id: + links['admin_stars'] = f"stars?search={invoice_id}" + user_id = payload.get('user_id') + telegram_id = payload.get('telegram_id') + if user_id: + links['admin_user'] = f"users?search={user_id}" + elif telegram_id: + links['admin_user'] = f"users?search={telegram_id}" + return links + + items: List[Dict[str, Any]] = [] + for row in rows: + items.append({ + 'id': row.id, + 'origin_public_key': row.origin_public_key, + 'origin_host': row.origin_host, + 'seq': int(row.seq), + 'uid': row.uid, + 'event_type': row.event_type, + 'status': row.status, + 'created_at': _format_dt(row.created_at), + 'received_at': _format_dt(row.received_at), + 'applied_at': _format_dt(row.applied_at), + 'payload': row.payload or {}, + 'links': _event_links(row), + }) + + type_stmt = select(NodeEvent.event_type, func.count()).group_by(NodeEvent.event_type) + status_stmt = select(NodeEvent.status, func.count()).group_by(NodeEvent.status) + origin_stmt = select(NodeEvent.origin_public_key, func.count()).group_by(NodeEvent.origin_public_key) + if filters: + type_stmt = type_stmt.where(and_(*filters)) + status_stmt = status_stmt.where(and_(*filters)) + origin_stmt = origin_stmt.where(and_(*filters)) + type_rows = (await session.execute(type_stmt)).all() + status_rows = (await session.execute(status_stmt)).all() + origin_rows = (await session.execute(origin_stmt)).all() + + payload = { + 'total': int(total or 0), + 'limit': limit, + 'offset': offset, + 'filters': applied_filters, + 'items': items, + 'available_filters': { + 'types': {event_type or 'unknown': int(count or 0) for event_type, count in type_rows}, + 'statuses': {status or 'unknown': int(count or 0) for status, count in status_rows}, + 'origins': {origin or 'unknown': int(count or 0) for origin, count in origin_rows}, + } + } + return response.json(payload) + + async def s_api_v1_admin_system(request): if (unauth := _ensure_admin(request)): return unauth @@ -1643,6 +1953,19 @@ async def s_api_v1_admin_system(request): 'LOG_LEVEL': os.getenv('LOG_LEVEL'), 'TESTNET': os.getenv('TESTNET'), } + telegram_bots: List[Dict[str, Any]] = [] + if TELEGRAM_BOT_USERNAME: + telegram_bots.append({ + 'role': 'uploader', + 'username': TELEGRAM_BOT_USERNAME, + 'url': f'https://t.me/{TELEGRAM_BOT_USERNAME}', + }) + if CLIENT_TELEGRAM_BOT_USERNAME: + telegram_bots.append({ + 'role': 'client', + 'username': CLIENT_TELEGRAM_BOT_USERNAME, + 'url': f'https://t.me/{CLIENT_TELEGRAM_BOT_USERNAME}', + }) blockchain_counts_rows = (await session.execute( select(BlockchainTask.status, func.count()).group_by(BlockchainTask.status) @@ -1666,6 +1989,7 @@ async def s_api_v1_admin_system(request): 'services': _service_states(request), 'blockchain_tasks': blockchain_counts, 'latest_index_items': index_entries, + 'telegram_bots': telegram_bots, } return response.json(payload) diff --git a/app/api/routes/content.py b/app/api/routes/content.py index eeedc12..c5db81c 100644 --- a/app/api/routes/content.py +++ b/app/api/routes/content.py @@ -1,6 +1,6 @@ from datetime import datetime, timedelta from sanic import response -from sqlalchemy import select, and_, func +from sqlalchemy import select, and_, func, or_ from aiogram import Bot, types from sqlalchemy import and_ from app.core.logger import make_log @@ -9,7 +9,7 @@ from app.core.models.node_storage import StoredContent from app.core.models.keys import KnownKey from app.core.models import StarsInvoice from app.core.models.content.user_content import UserContent -from app.core._config import CLIENT_TELEGRAM_API_KEY, PROJECT_HOST +from app.core._config import CLIENT_TELEGRAM_API_KEY, CLIENT_TELEGRAM_BOT_USERNAME, PROJECT_HOST from app.core.models.content_v3 import EncryptedContent as ECv3, ContentDerivative as CDv3, UploadSession from app.core.content.content_id import ContentId import json @@ -103,18 +103,26 @@ async def s_api_v1_content_view(request, content_address: str): have_access = False if request.ctx.user: user_wallet_address = await request.ctx.user.wallet_address_async(request.ctx.db_session) + user_telegram_id = getattr(request.ctx.user, 'telegram_id', None) + or_clauses = [StarsInvoice.user_id == request.ctx.user.id] + if user_telegram_id is not None: + or_clauses.append(StarsInvoice.telegram_id == user_telegram_id) + stars_access = False + if or_clauses: + stars_access = bool((await request.ctx.db_session.execute(select(StarsInvoice).where( + and_( + StarsInvoice.content_hash == content['encrypted_content'].hash, + StarsInvoice.paid.is_(True), + or_(*or_clauses) + ) + ))).scalars().first()) + have_access = ( (content['encrypted_content'].owner_address == user_wallet_address) or bool((await request.ctx.db_session.execute(select(UserContent).where( and_(UserContent.owner_address == user_wallet_address, UserContent.status == 'active', UserContent.content_id == content['encrypted_content'].id) - ))).scalars().first()) \ - or bool((await request.ctx.db_session.execute(select(StarsInvoice).where( - and_( - StarsInvoice.user_id == request.ctx.user.id, - StarsInvoice.content_hash == content['encrypted_content'].hash, - StarsInvoice.paid == True - ) ))).scalars().first()) + or stars_access ) if not have_access: @@ -123,8 +131,10 @@ async def s_api_v1_content_view(request, content_address: str): current_star_rate = 0.00000001 stars_cost = int(int(content['encrypted_content'].meta['license']['resale']['price']) / 1e9 / current_star_rate * 1.2) - if request.ctx.user.telegram_id in [5587262915, 6861699286]: + if getattr(request.ctx.user, 'is_admin', False): stars_cost = 2 + else: + stars_cost = int(int(content['encrypted_content'].meta['license']['resale']['price']) / 1e9 / current_star_rate * 1.2) invoice_id = f"access_{uuid.uuid4().hex}" exist_invoice = (await request.ctx.db_session.execute(select(StarsInvoice).where( @@ -155,7 +165,9 @@ async def s_api_v1_content_view(request, content_address: str): amount=stars_cost, user_id=request.ctx.user.id, content_hash=content['encrypted_content'].hash, - invoice_url=invoice_url + invoice_url=invoice_url, + telegram_id=getattr(request.ctx.user, 'telegram_id', None), + bot_username=CLIENT_TELEGRAM_BOT_USERNAME, ) ) await request.ctx.db_session.commit() diff --git a/app/api/routes/network.py b/app/api/routes/network.py index cb68e6e..e338f7f 100644 --- a/app/api/routes/network.py +++ b/app/api/routes/network.py @@ -17,6 +17,8 @@ from app.core.network.semver import compatibility from app.core.network.guard import check_rate_limit, check_timestamp_fresh, check_and_remember_nonce from app.core.network.config import HANDSHAKE_TS_TOLERANCE_SEC from app.core.ipfs_client import swarm_connect +from app.core._config import PROJECT_HOST +from app.core.events.service import record_event def _port_from_public_host(public_host: str) -> int: @@ -171,6 +173,21 @@ async def s_api_v1_network_handshake(request): } ) await _connect_ipfs_multiaddrs(ipfs_meta.get("multiaddrs")) + try: + await record_event( + request.ctx.db_session, + 'node_registered', + { + 'public_key': str(data.get("public_key")), + 'public_host': data.get("public_host"), + 'node_type': data.get("node_type"), + 'version': peer_version, + 'capabilities': data.get("capabilities", {}), + }, + origin_host=PROJECT_HOST, + ) + except Exception as ev_exc: + make_log("Events", f"Failed to record node_registered event: {ev_exc}", level="warning") except Exception as e: make_log("Handshake", f"Upsert peer failed: {e}", level='warning') diff --git a/app/api/routes/network_events.py b/app/api/routes/network_events.py new file mode 100644 index 0000000..351e21f --- /dev/null +++ b/app/api/routes/network_events.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +from typing import Dict, Any + +from sanic import response +from sqlalchemy import select + +from app.core.logger import make_log +from app.core.models import NodeEvent, KnownNode +from app.core.network.nodesig import verify_request +from app.core.network.guard import check_rate_limit +from app.core._config import PROJECT_HOST +from app.core.events.service import LOCAL_PUBLIC_KEY + + +def _origin_host() -> str | None: + return PROJECT_HOST.rstrip('/') if PROJECT_HOST else None + + +async def s_api_v1_network_events(request): + remote_ip = (request.headers.get('X-Forwarded-For') or request.remote_addr or request.ip or '').split(',')[0].strip() + if not check_rate_limit(request.app.ctx.memory, remote_ip): + return response.json({"error": "RATE_LIMIT"}, status=429) + + ok, node_id, reason = verify_request(request, request.app.ctx.memory) + if not ok: + return response.json({"error": reason or "UNAUTHORIZED"}, status=401) + + session = request.ctx.db_session + trusted = (await session.execute( + select(KnownNode).where(KnownNode.public_key == node_id) + )).scalar_one_or_none() + role = (trusted.meta or {}).get('role') if trusted and trusted.meta else None + if role != 'trusted': + make_log("Events", f"Rejected events fetch from non-trusted node {node_id}", level="warning") + return response.json({"error": "FORBIDDEN"}, status=403) + + try: + since = int(request.args.get('since') or 0) + except (TypeError, ValueError): + since = 0 + since = max(since, 0) + + try: + limit = int(request.args.get('limit') or 100) + except (TypeError, ValueError): + limit = 100 + limit = max(1, min(limit, 200)) + + result = await session.execute( + select(NodeEvent) + .where(NodeEvent.origin_public_key == LOCAL_PUBLIC_KEY, NodeEvent.seq > since) + .order_by(NodeEvent.seq.asc()) + .limit(limit) + ) + rows = result.scalars().all() + + events: list[Dict[str, Any]] = [] + next_since = since + for row in rows: + next_since = max(next_since, int(row.seq)) + events.append({ + "origin_public_key": row.origin_public_key, + "origin_host": row.origin_host or _origin_host(), + "seq": int(row.seq), + "uid": row.uid, + "event_type": row.event_type, + "payload": row.payload, + "signature": row.signature, + "created_at": (row.created_at.isoformat() + 'Z') if row.created_at else None, + }) + + payload = { + "events": events, + "next_since": next_since, + } + return response.json(payload) diff --git a/app/api/routes/upload_tus.py b/app/api/routes/upload_tus.py index 27a4e77..8e9ad0e 100644 --- a/app/api/routes/upload_tus.py +++ b/app/api/routes/upload_tus.py @@ -10,7 +10,7 @@ import aiofiles from base58 import b58encode from sanic import response -from app.core._config import UPLOADS_DIR +from app.core._config import UPLOADS_DIR, PROJECT_HOST from app.core._secrets import hot_pubkey from app.core.crypto.aes_gcm_stream import encrypt_file_to_encf, CHUNK_BYTES from app.core.crypto.keywrap import wrap_dek, KeyWrapError @@ -20,6 +20,7 @@ from app.core.models.content_v3 import EncryptedContent, ContentKey, IpfsSync, C from app.core.models.node_storage import StoredContent from app.core.storage import db_session from app.core._utils.resolve_content import resolve_content +from app.core.events.service import record_event from sqlalchemy import select @@ -235,6 +236,25 @@ async def s_api_v1_upload_tus_hook(request): sig = "" session.add(ContentIndexItem(encrypted_cid=encrypted_cid, payload=item, sig=sig)) + try: + await record_event( + session, + 'content_uploaded', + { + 'encrypted_cid': encrypted_cid, + 'content_hash': encrypted_hash_b58, + 'title': title, + 'description': description, + 'content_type': content_type, + 'size_bytes': enc_size, + 'user_id': request.ctx.user.id if getattr(request.ctx, 'user', None) else None, + 'telegram_id': getattr(getattr(request.ctx, 'user', None), 'telegram_id', None), + }, + origin_host=PROJECT_HOST, + ) + except Exception as exc: + make_log("Events", f"Failed to record content_uploaded event: {exc}", level="warning") + await session.commit() # Update upload session with result and purge staging to avoid duplicates diff --git a/app/bot/routers/home.py b/app/bot/routers/home.py index 686d34c..48fb17b 100644 --- a/app/bot/routers/home.py +++ b/app/bot/routers/home.py @@ -7,6 +7,7 @@ from sqlalchemy import select, and_ from app.core._keyboards import get_inline_keyboard from app.core._utils.tg_process_template import tg_process_template from app.core.models.wallet_connection import WalletConnection +from app.core._config import PROJECT_HOST main_router = Router() @@ -83,6 +84,35 @@ async def t_home_menu(__msg, **extra): return await send_home_menu(chat_wrap, user, wallet_connection, message_id=message_id) +async def t_admin_panel(message: types.Message, **extra): + user = extra.get('user') + chat_wrap = extra.get('chat_wrap') + admin_host = (PROJECT_HOST or '').rstrip('/') + if not user or not getattr(user, 'is_admin', False): + await chat_wrap.send_message("Доступ к админ-панели ограничен.") + return + if not admin_host: + await chat_wrap.send_message("Адрес админ-панели не настроен на этой ноде.") + return + admin_url = f"{admin_host}/admin" + buttons = [] + if admin_url.startswith('https://'): + buttons.append({ + 'text': 'Открыть в Telegram', + 'web_app': types.WebAppInfo(url=admin_url), + }) + buttons.append({ + 'text': 'Открыть в браузере', + 'url': admin_url, + }) + keyboard = get_inline_keyboard([buttons]) if buttons else None + await chat_wrap.send_message( + "Админ-панель доступна по кнопке ниже.", + keyboard=keyboard, + ) + + main_router.message.register(t_home_menu, Command('start')) +main_router.message.register(t_admin_panel, Command('admin')) main_router.callback_query.register(t_home_menu, F.data == 'home') router = main_router diff --git a/app/client_bot/routers/home.py b/app/client_bot/routers/home.py index 6422578..90a3f4c 100644 --- a/app/client_bot/routers/home.py +++ b/app/client_bot/routers/home.py @@ -9,6 +9,7 @@ from app.core._utils.tg_process_template import tg_process_template from app.core.logger import make_log from app.core.models.wallet_connection import WalletConnection from app.core.models.node_storage import StoredContent +from app.core._config import PROJECT_HOST main_router = Router() @@ -95,6 +96,35 @@ async def t_home_menu(__msg, **extra): return await send_home_menu(chat_wrap, user, wallet_connection, message_id=message_id) +async def t_admin_panel(message: types.Message, **extra): + user = extra.get('user') + chat_wrap = extra.get('chat_wrap') + admin_host = (PROJECT_HOST or '').rstrip('/') + if not user or not getattr(user, 'is_admin', False): + await chat_wrap.send_message("Доступ к админ-панели ограничен.") + return + if not admin_host: + await chat_wrap.send_message("Адрес админ-панели не настроен на этой ноде.") + return + admin_url = f"{admin_host}/admin" + buttons = [] + if admin_url.startswith('https://'): + buttons.append({ + 'text': 'Открыть в Telegram', + 'web_app': types.WebAppInfo(url=admin_url), + }) + buttons.append({ + 'text': 'Открыть в браузере', + 'url': admin_url, + }) + keyboard = get_inline_keyboard([buttons]) if buttons else None + await chat_wrap.send_message( + "Админ-панель доступна по кнопке ниже.", + keyboard=keyboard, + ) + + main_router.message.register(t_home_menu, Command('start')) +main_router.message.register(t_admin_panel, Command('admin')) main_router.callback_query.register(t_home_menu, F.data == 'home') router = main_router diff --git a/app/core/_utils/create_maria_tables.py b/app/core/_utils/create_maria_tables.py index 33b8538..67911ee 100644 --- a/app/core/_utils/create_maria_tables.py +++ b/app/core/_utils/create_maria_tables.py @@ -1,4 +1,6 @@ from sqlalchemy.ext.asyncio import AsyncEngine +from sqlalchemy import text + from app.core.models import BlockchainTask from app.core.models.base import AlchemyBase @@ -9,4 +11,36 @@ async def create_db_tables(engine: AsyncEngine): BlockchainTask() async with engine.begin() as conn: await conn.run_sync(AlchemyBase.metadata.create_all) + await conn.execute(text(""" + ALTER TABLE users + ADD COLUMN IF NOT EXISTS is_admin BOOLEAN DEFAULT FALSE + """)) + await conn.execute(text(""" + ALTER TABLE stars_invoices + ADD COLUMN IF NOT EXISTS telegram_id BIGINT + """)) + await conn.execute(text(""" + ALTER TABLE stars_invoices + ADD COLUMN IF NOT EXISTS paid_at TIMESTAMPTZ + """)) + await conn.execute(text(""" + ALTER TABLE stars_invoices + ADD COLUMN IF NOT EXISTS payment_tx_id VARCHAR(256) + """)) + await conn.execute(text(""" + ALTER TABLE stars_invoices + ADD COLUMN IF NOT EXISTS payment_node_id VARCHAR(128) + """)) + await conn.execute(text(""" + ALTER TABLE stars_invoices + ADD COLUMN IF NOT EXISTS payment_node_public_host VARCHAR(256) + """)) + await conn.execute(text(""" + ALTER TABLE stars_invoices + ADD COLUMN IF NOT EXISTS bot_username VARCHAR(128) + """)) + await conn.execute(text(""" + ALTER TABLE stars_invoices + ADD COLUMN IF NOT EXISTS is_remote BOOLEAN DEFAULT FALSE + """)) diff --git a/app/core/background/event_sync_service.py b/app/core/background/event_sync_service.py new file mode 100644 index 0000000..31b8070 --- /dev/null +++ b/app/core/background/event_sync_service.py @@ -0,0 +1,152 @@ +import asyncio +from typing import Dict, List, Optional, Tuple +from urllib.parse import urlencode + +import httpx +from sqlalchemy import select + +from app.core.logger import make_log +from app.core.storage import db_session +from app.core.models import KnownNode, NodeEvent +from app.core.events.service import ( + store_remote_events, + upsert_cursor, + LOCAL_PUBLIC_KEY, +) +from app.core.models.events import NodeEventCursor +from app.core._secrets import hot_pubkey, hot_seed +from app.core.network.nodesig import sign_headers +from base58 import b58encode + + +def _node_public_base(node: KnownNode) -> Optional[str]: + meta = node.meta or {} + public_host = (meta.get('public_host') or '').strip() + if public_host: + base = public_host.rstrip('/') + if base.startswith('http://') or base.startswith('https://'): + return base + scheme = 'https' if node.port == 443 else 'http' + return f"{scheme}://{base.lstrip('/')}" + scheme = 'https' if node.port == 443 else 'http' + host = (node.ip or '').strip() + if not host: + return None + default_port = 443 if scheme == 'https' else 80 + if node.port and node.port != default_port: + return f"{scheme}://{host}:{node.port}" + return f"{scheme}://{host}" + + +async def _fetch_events_for_node(node: KnownNode, limit: int = 100) -> Tuple[List[Dict], int]: + base = _node_public_base(node) + if not base: + return [], 0 + async with db_session() as session: + cursor = (await session.execute( + select(NodeEventCursor).where(NodeEventCursor.source_public_key == node.public_key) + )).scalar_one_or_none() + since = cursor.last_seq if cursor else 0 + query = urlencode({"since": since, "limit": limit}) + path = f"/api/v1/network.events?{query}" + url = f"{base}{path}" + pk_b58 = b58encode(hot_pubkey).decode() + headers = sign_headers("GET", path, b"", hot_seed, pk_b58) + async with httpx.AsyncClient(timeout=20.0) as client: + try: + resp = await client.get(url, headers=headers) + if resp.status_code == 403: + make_log("Events", f"Access denied by node {node.public_key}", level="warning") + return [], since + resp.raise_for_status() + data = resp.json() + except Exception as exc: + make_log("Events", f"Fetch events failed from {node.public_key}: {exc}", level="debug") + return [], since + events = data.get("events") or [] + next_since = int(data.get("next_since") or since) + return events, next_since + + +async def _apply_event(session, event: NodeEvent): + if event.event_type == "stars_payment": + from app.core.models import StarsInvoice + payload = event.payload or {} + invoice_id = payload.get("invoice_id") + telegram_id = payload.get("telegram_id") + content_hash = payload.get("content_hash") + amount = payload.get("amount") + if not invoice_id or not telegram_id or not content_hash: + return + invoice = (await session.execute(select(StarsInvoice).where(StarsInvoice.external_id == invoice_id))).scalar_one_or_none() + if not invoice: + invoice = StarsInvoice( + external_id=invoice_id, + user_id=payload.get("user_id"), + type=payload.get('type') or 'access', + telegram_id=telegram_id, + amount=amount, + content_hash=content_hash, + paid=True, + paid_at=event.created_at, + payment_node_id=payload.get("payment_node", {}).get("public_key"), + payment_node_public_host=payload.get("payment_node", {}).get("public_host"), + bot_username=payload.get("bot_username"), + is_remote=True, + ) + session.add(invoice) + else: + invoice.paid = True + invoice.paid_at = invoice.paid_at or event.created_at + invoice.payment_node_id = payload.get("payment_node", {}).get("public_key") + invoice.payment_node_public_host = payload.get("payment_node", {}).get("public_host") + invoice.bot_username = payload.get("bot_username") or invoice.bot_username + invoice.telegram_id = telegram_id or invoice.telegram_id + invoice.is_remote = invoice.is_remote or True + if payload.get('type'): + invoice.type = payload['type'] + event.status = 'applied' + event.applied_at = event.applied_at or event.received_at + elif event.event_type == "content_indexed": + # The index scout will pick up via remote_content_index; we only mark event applied + event.status = 'recorded' + elif event.event_type == "node_registered": + event.status = 'recorded' + else: + event.status = 'recorded' + + +async def main_fn(memory): + make_log("Events", "Sync service started", level="info") + while True: + try: + async with db_session() as session: + nodes = (await session.execute(select(KnownNode))).scalars().all() + trusted_nodes = [ + n for n in nodes + if isinstance(n.meta, dict) and n.meta.get("role") == "trusted" and n.public_key != LOCAL_PUBLIC_KEY + ] + trusted_keys = {n.public_key for n in trusted_nodes} + for node in trusted_nodes: + events, next_since = await _fetch_events_for_node(node) + if not events: + if next_since: + async with db_session() as session: + await upsert_cursor(session, node.public_key, next_since, node.meta.get("public_host") if isinstance(node.meta, dict) else None) + await session.commit() + continue + async with db_session() as session: + stored = await store_remote_events( + session, + events, + allowed_public_keys=trusted_keys, + ) + for ev in stored: + await _apply_event(session, ev) + if stored: + await session.commit() + await upsert_cursor(session, node.public_key, next_since, node.meta.get("public_host") if isinstance(node.meta, dict) else None) + await session.commit() + except Exception as exc: + make_log("Events", f"Sync loop error: {exc}", level="error") + await asyncio.sleep(10) diff --git a/app/core/background/index_scout_v3.py b/app/core/background/index_scout_v3.py index ae0eeb3..0bdaf81 100644 --- a/app/core/background/index_scout_v3.py +++ b/app/core/background/index_scout_v3.py @@ -1,5 +1,6 @@ import asyncio import os +from datetime import datetime from typing import List, Optional import httpx @@ -10,9 +11,11 @@ from sqlalchemy import select from app.core.logger import make_log from app.core.storage import db_session -from app.core.models.my_network import KnownNode +from app.core.models.my_network import KnownNode, RemoteContentIndex +from app.core.models.events import NodeEvent from app.core.models.content_v3 import EncryptedContent, ContentDerivative from app.core.ipfs_client import pin_add, pin_ls, find_providers, swarm_connect, add_streamed_file +from app.core.events.service import LOCAL_PUBLIC_KEY INTERVAL_SEC = 60 @@ -105,6 +108,71 @@ async def upsert_content(item: dict): make_log('index_scout_v3', f"thumbnail fetch failed for {cid}: {e}", level='warning') +def _node_base_url(node: KnownNode) -> Optional[str]: + meta = node.meta or {} + public_host = (meta.get('public_host') or '').strip() + if public_host: + base = public_host.rstrip('/') + if base.startswith('http://') or base.startswith('https://'): + return base + scheme = 'https' if node.port == 443 else 'http' + return f"{scheme}://{base.lstrip('/')}" + scheme = 'https' if node.port == 443 else 'http' + host = (node.ip or '').strip() + if not host: + return None + default_port = 443 if scheme == 'https' else 80 + if node.port and node.port != default_port: + return f"{scheme}://{host}:{node.port}" + return f"{scheme}://{host}" + + +async def _update_remote_index(node_id: int, items: List[dict], *, incremental: bool): + if not items: + return + async with db_session() as session: + existing_rows = (await session.execute( + select(RemoteContentIndex).where(RemoteContentIndex.remote_node_id == node_id) + )).scalars().all() + existing_map = {row.encrypted_hash: row for row in existing_rows if row.encrypted_hash} + seen = set() + now = datetime.utcnow() + for item in items: + cid = item.get('encrypted_cid') + if not cid: + continue + seen.add(cid) + payload_meta = { + 'title': item.get('title'), + 'description': item.get('description'), + 'size_bytes': item.get('size_bytes'), + 'preview_enabled': item.get('preview_enabled'), + 'preview_conf': item.get('preview_conf'), + 'issuer_node_id': item.get('issuer_node_id'), + 'salt_b64': item.get('salt_b64'), + } + meta_clean = {k: v for k, v in payload_meta.items() if v is not None} + row = existing_map.get(cid) + if row: + row.content_type = item.get('content_type') or row.content_type + row.meta = {**(row.meta or {}), **meta_clean} + row.last_updated = now + else: + row = RemoteContentIndex( + remote_node_id=node_id, + content_type=item.get('content_type') or 'application/octet-stream', + encrypted_hash=cid, + meta=meta_clean, + last_updated=now, + ) + session.add(row) + if not incremental and existing_map: + for hash_value, row in list(existing_map.items()): + if hash_value not in seen: + await session.delete(row) + await session.commit() + + async def main_fn(memory): make_log('index_scout_v3', 'Service started', level='info') sem = None @@ -119,8 +187,70 @@ async def main_fn(memory): sem = asyncio.Semaphore(max_pins) async with db_session() as session: nodes = (await session.execute(select(KnownNode))).scalars().all() + node_by_pk = {n.public_key: n for n in nodes if n.public_key} + async with db_session() as session: + pending_events = (await session.execute( + select(NodeEvent) + .where(NodeEvent.event_type == 'content_indexed', NodeEvent.status.in_(('recorded', 'local', 'processing'))) + .order_by(NodeEvent.created_at.asc()) + .limit(25) + )).scalars().all() + for ev in pending_events: + if ev.status != 'processing': + ev.status = 'processing' + await session.commit() + for ev in pending_events: + payload = ev.payload or {} + cid = payload.get('encrypted_cid') or payload.get('content_cid') + if ev.origin_public_key == LOCAL_PUBLIC_KEY: + async with db_session() as session: + ref = await session.get(NodeEvent, ev.id) + if ref: + ref.status = 'applied' + ref.applied_at = datetime.utcnow() + await session.commit() + continue + if not cid: + async with db_session() as session: + ref = await session.get(NodeEvent, ev.id) + if ref: + ref.status = 'applied' + ref.applied_at = datetime.utcnow() + await session.commit() + continue + node = node_by_pk.get(ev.origin_public_key) + if not node: + async with db_session() as session: + node = (await session.execute(select(KnownNode).where(KnownNode.public_key == ev.origin_public_key))).scalar_one_or_none() + if node: + node_by_pk[node.public_key] = node + if not node: + make_log('index_scout_v3', f"Event {ev.uid} refers to unknown node {ev.origin_public_key}", level='debug') + async with db_session() as session: + ref = await session.get(NodeEvent, ev.id) + if ref: + ref.status = 'recorded' + await session.commit() + continue + try: + await _pin_one(node, cid) + async with db_session() as session: + ref = await session.get(NodeEvent, ev.id) + if ref: + ref.status = 'applied' + ref.applied_at = datetime.utcnow() + await session.commit() + except Exception as exc: + make_log('index_scout_v3', f"Event pin failed for {cid}: {exc}", level='warning') + async with db_session() as session: + ref = await session.get(NodeEvent, ev.id) + if ref: + ref.status = 'recorded' + await session.commit() for n in nodes: - base = f"http://{n.ip}:{n.port}" + base = _node_base_url(n) + if not base: + continue # jitter 0..30s per node to reduce stampede await asyncio.sleep(random.uniform(0, 30)) etag = (n.meta or {}).get('index_etag') @@ -144,6 +274,10 @@ async def main_fn(memory): if not items: continue make_log('index_scout_v3', f"Fetched {len(items)} from {base}") + try: + await _update_remote_index(n.id, items, incremental=bool(since)) + except Exception as exc: + make_log('index_scout_v3', f"remote index update failed for node {n.id}: {exc}", level='warning') # Check disk watermark try: @@ -156,10 +290,10 @@ async def main_fn(memory): except Exception: pass - async def _pin_one(cid: str): + async def _pin_one(node: KnownNode, cid: str): async with sem: try: - node_ipfs_meta = (n.meta or {}).get('ipfs') or {} + node_ipfs_meta = (node.meta or {}).get('ipfs') or {} multiaddrs = node_ipfs_meta.get('multiaddrs') or [] for addr in multiaddrs: try: @@ -196,11 +330,11 @@ async def main_fn(memory): except Exception as e: # Attempt HTTP gateway fallback before logging failure fallback_sources = [] - node_host = n.meta.get('public_host') if isinstance(n.meta, dict) else None + node_host = node.meta.get('public_host') if isinstance(node.meta, dict) else None try: # Derive gateway host: prefer public_host domain if present parsed = urlparse(node_host) if node_host else None - gateway_host = parsed.hostname if parsed and parsed.hostname else (n.ip or '').split(':')[0] + gateway_host = parsed.hostname if parsed and parsed.hostname else (node.ip or '').split(':')[0] gateway_port = parsed.port if (parsed and parsed.port not in (None, 80, 443)) else 8080 if gateway_host: gateway_url = f"http://{gateway_host}:{gateway_port}/ipfs/{cid}" @@ -234,7 +368,7 @@ async def main_fn(memory): cid = it.get('encrypted_cid') if cid: make_log('index_scout_v3', f"queue pin {cid}") - tasks.append(asyncio.create_task(_pin_one(cid))) + tasks.append(asyncio.create_task(_pin_one(n, cid))) if tasks: await asyncio.gather(*tasks) except Exception as e: diff --git a/app/core/background/indexer_service.py b/app/core/background/indexer_service.py index 53b6acd..9f8ed10 100644 --- a/app/core/background/indexer_service.py +++ b/app/core/background/indexer_service.py @@ -8,6 +8,7 @@ from sqlalchemy import String, and_, desc, cast from tonsdk.boc import Cell from tonsdk.utils import Address from app.core._config import CLIENT_TELEGRAM_BOT_USERNAME, PROJECT_HOST +from app.core.events.service import record_event from app.core._blockchain.ton.platform import platform from app.core._blockchain.ton.toncenter import toncenter from app.core._utils.send_status import send_status @@ -284,6 +285,21 @@ async def indexer_loop(memory, platform_found: bool, seqno: int) -> [bool, int]: **item_metadata_packed } encrypted_stored_content.content_id = item_content_cid_str + try: + await record_event( + session, + 'content_indexed', + { + 'onchain_index': item_index, + 'content_hash': item_content_hash_str, + 'encrypted_cid': item_content_cid_str, + 'item_address': item_address.to_string(1, 1, 1), + 'owner_address': item_owner_address.to_string(1, 1, 1) if item_owner_address else None, + }, + origin_host=PROJECT_HOST, + ) + except Exception as exc: + make_log("Events", f"Failed to record content_indexed event: {exc}", level="warning") await session.commit() return platform_found, seqno @@ -308,6 +324,21 @@ async def indexer_loop(memory, platform_found: bool, seqno: int) -> [bool, int]: updated=datetime.now() ) session.add(onchain_stored_content) + try: + await record_event( + session, + 'content_indexed', + { + 'onchain_index': item_index, + 'content_hash': item_content_hash_str, + 'encrypted_cid': item_content_cid_str, + 'item_address': item_address.to_string(1, 1, 1), + 'owner_address': item_owner_address.to_string(1, 1, 1) if item_owner_address else None, + }, + origin_host=PROJECT_HOST, + ) + except Exception as exc: + make_log("Events", f"Failed to record content_indexed event: {exc}", level="warning") await session.commit() make_log("Indexer", f"Item indexed: {item_content_hash_str}", level="info") last_known_index += 1 diff --git a/app/core/background/license_service.py b/app/core/background/license_service.py index c824ae0..9e94252 100644 --- a/app/core/background/license_service.py +++ b/app/core/background/license_service.py @@ -18,9 +18,12 @@ from app.core.models.wallet_connection import WalletConnection from app.core._keyboards import get_inline_keyboard from app.core.models._telegram import Wrapped_CBotChat from app.core.storage import db_session -from app.core._config import CLIENT_TELEGRAM_API_KEY +from app.core._config import CLIENT_TELEGRAM_API_KEY, CLIENT_TELEGRAM_BOT_USERNAME, PROJECT_HOST from app.core.models.user import User from app.core.models import StarsInvoice +from app.core.events.service import record_event +from app.core._secrets import hot_pubkey +from base58 import b58encode import os import traceback @@ -53,15 +56,42 @@ async def license_index_loop(memory, platform_found: bool, seqno: int) -> [bool, if star_payment.amount == existing_invoice.amount: if not existing_invoice.paid: + user = (await session.execute(select(User).where(User.id == existing_invoice.user_id))).scalars().first() existing_invoice.paid = True + existing_invoice.paid_at = datetime.utcnow() + existing_invoice.telegram_id = getattr(user, 'telegram_id', None) + existing_invoice.payment_tx_id = getattr(star_payment, 'id', None) + existing_invoice.payment_node_id = b58encode(hot_pubkey).decode() + existing_invoice.payment_node_public_host = PROJECT_HOST + existing_invoice.bot_username = CLIENT_TELEGRAM_BOT_USERNAME + existing_invoice.is_remote = False + await record_event( + session, + 'stars_payment', + { + 'invoice_id': existing_invoice.external_id, + 'content_hash': existing_invoice.content_hash, + 'amount': existing_invoice.amount, + 'user_id': existing_invoice.user_id, + 'telegram_id': existing_invoice.telegram_id, + 'bot_username': CLIENT_TELEGRAM_BOT_USERNAME, + 'type': existing_invoice.type, + 'payment_node': { + 'public_key': b58encode(hot_pubkey).decode(), + 'public_host': PROJECT_HOST, + }, + 'paid_at': existing_invoice.paid_at.isoformat() + 'Z' if existing_invoice.paid_at else None, + 'payment_tx_id': existing_invoice.payment_tx_id, + }, + origin_host=PROJECT_HOST, + ) await session.commit() licensed_content = (await session.execute(select(StoredContent).where(StoredContent.hash == existing_invoice.content_hash))).scalars().first() - user = (await session.execute(select(User).where(User.id == existing_invoice.user_id))).scalars().first() - - await (Wrapped_CBotChat(memory._client_telegram_bot, chat_id=user.telegram_id, user=user, db_session=session)).send_content( - session, licensed_content - ) + if user and user.telegram_id and licensed_content: + await (Wrapped_CBotChat(memory._client_telegram_bot, chat_id=user.telegram_id, user=user, db_session=session)).send_content( + session, licensed_content + ) except BaseException as e: make_log("StarsProcessing", f"Local error: {e}" + '\n' + traceback.format_exc(), level="error") diff --git a/app/core/events/__init__.py b/app/core/events/__init__.py new file mode 100644 index 0000000..9419976 --- /dev/null +++ b/app/core/events/__init__.py @@ -0,0 +1,17 @@ +from .service import ( + record_event, + store_remote_events, + verify_event_signature, + next_local_seq, + upsert_cursor, + prune_events, +) + +__all__ = [ + 'record_event', + 'store_remote_events', + 'verify_event_signature', + 'next_local_seq', + 'upsert_cursor', + 'prune_events', +] diff --git a/app/core/events/service.py b/app/core/events/service.py new file mode 100644 index 0000000..1d31ba8 --- /dev/null +++ b/app/core/events/service.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +import json +from datetime import datetime, timedelta, timezone +from typing import Any, Dict, Iterable, List, Optional +from uuid import uuid4 + +from base58 import b58decode, b58encode +import nacl.signing +from sqlalchemy import select, delete +from sqlalchemy.ext.asyncio import AsyncSession + +from app.core.logger import make_log +from app.core._secrets import hot_pubkey, hot_seed +from app.core.models import NodeEvent, NodeEventCursor + + +LOCAL_PUBLIC_KEY = b58encode(hot_pubkey).decode() + + +def _normalize_dt(value: Optional[datetime]) -> datetime: + if value is None: + return datetime.utcnow() + if value.tzinfo is not None: + return value.astimezone(timezone.utc).replace(tzinfo=None) + return value + + +def _parse_iso_dt(iso_value: Optional[str]) -> datetime: + if not iso_value: + return datetime.utcnow() + try: + parsed = datetime.fromisoformat(iso_value.replace('Z', '+00:00')) + except Exception: + return datetime.utcnow() + return _normalize_dt(parsed) + + +def _canonical_blob(data: Dict[str, Any]) -> bytes: + return json.dumps(data, sort_keys=True, separators=(",", ":")).encode() + + +def _sign_event(blob: Dict[str, Any]) -> str: + signing_key = nacl.signing.SigningKey(hot_seed) + signature = signing_key.sign(_canonical_blob(blob)).signature + return b58encode(signature).decode() + + +def verify_event_signature(event: Dict[str, Any]) -> bool: + try: + origin_key = event["origin_public_key"] + signature = event["signature"] + payload = { + "origin_public_key": origin_key, + "origin_host": event.get("origin_host"), + "seq": event["seq"], + "uid": event["uid"], + "event_type": event["event_type"], + "payload": event.get("payload") or {}, + "created_at": event.get("created_at"), + } + verify_key = nacl.signing.VerifyKey(b58decode(origin_key)) + verify_key.verify(_canonical_blob(payload), b58decode(signature)) + return True + except Exception as exc: + make_log("Events", f"Signature validation failed: {exc}", level="warning") + return False + + +async def next_local_seq(session: AsyncSession) -> int: + result = await session.execute( + select(NodeEvent.seq) + .where(NodeEvent.origin_public_key == LOCAL_PUBLIC_KEY) + .order_by(NodeEvent.seq.desc()) + .limit(1) + ) + row = result.scalar_one_or_none() + return int(row or 0) + 1 + + +async def record_event( + session: AsyncSession, + event_type: str, + payload: Dict[str, Any], + origin_host: Optional[str] = None, + created_at: Optional[datetime] = None, +) -> NodeEvent: + seq = await next_local_seq(session) + created_dt = _normalize_dt(created_at) + event_body = { + "origin_public_key": LOCAL_PUBLIC_KEY, + "origin_host": origin_host, + "seq": seq, + "uid": uuid4().hex, + "event_type": event_type, + "payload": payload, + "created_at": created_dt.replace(tzinfo=timezone.utc).isoformat().replace('+00:00', 'Z'), + } + signature = _sign_event(event_body) + node_event = NodeEvent( + origin_public_key=LOCAL_PUBLIC_KEY, + origin_host=origin_host, + seq=seq, + uid=event_body["uid"], + event_type=event_type, + payload=payload, + signature=signature, + created_at=created_dt, + status='local', + ) + session.add(node_event) + await session.flush() + make_log("Events", f"Recorded local event {event_type} seq={seq}") + return node_event + + +async def upsert_cursor(session: AsyncSession, source_public_key: str, seq: int, host: Optional[str]): + existing = (await session.execute( + select(NodeEventCursor).where(NodeEventCursor.source_public_key == source_public_key) + )).scalar_one_or_none() + if existing: + if seq > existing.last_seq: + existing.last_seq = seq + if host: + existing.source_public_host = host + else: + cursor = NodeEventCursor( + source_public_key=source_public_key, + last_seq=seq, + source_public_host=host, + ) + session.add(cursor) + await session.flush() + + +async def store_remote_events( + session: AsyncSession, + events: Iterable[Dict[str, Any]], + allowed_public_keys: Optional[set[str]] = None, +) -> List[NodeEvent]: + stored: List[NodeEvent] = [] + for event in events: + if not verify_event_signature(event): + continue + origin_pk = event["origin_public_key"] + if allowed_public_keys is not None and origin_pk not in allowed_public_keys: + make_log("Events", f"Ignored event from untrusted node {origin_pk}", level="warning") + continue + seq = int(event["seq"]) + exists = (await session.execute( + select(NodeEvent).where( + NodeEvent.origin_public_key == origin_pk, + NodeEvent.seq == seq, + ) + )).scalar_one_or_none() + if exists: + continue + created_dt = _parse_iso_dt(event.get("created_at")) + received_dt = datetime.utcnow() + node_event = NodeEvent( + origin_public_key=origin_pk, + origin_host=event.get("origin_host"), + seq=seq, + uid=event["uid"], + event_type=event["event_type"], + payload=event.get("payload") or {}, + signature=event["signature"], + created_at=created_dt, + status='recorded', + received_at=received_dt, + ) + session.add(node_event) + stored.append(node_event) + await upsert_cursor(session, origin_pk, seq, event.get("origin_host")) + make_log("Events", f"Ingested remote event {event['event_type']} from {origin_pk} seq={seq}", level="debug") + if stored: + await session.flush() + return stored + + +async def prune_events(session: AsyncSession, max_age_days: int = 90): + cutoff = datetime.utcnow() - timedelta(days=max_age_days) + await session.execute( + delete(NodeEvent).where(NodeEvent.created_at < cutoff) + ) diff --git a/app/core/models/__init__.py b/app/core/models/__init__.py index 28d1840..9364194 100644 --- a/app/core/models/__init__.py +++ b/app/core/models/__init__.py @@ -11,6 +11,7 @@ from app.core.models.content.user_content import UserContent, UserAction from app.core.models._config import ServiceConfigValue, ServiceConfig from app.core.models.asset import Asset from app.core.models.my_network import KnownNode, KnownNodeIncident, RemoteContentIndex +from app.core.models.events import NodeEvent, NodeEventCursor from app.core.models.promo import PromoAction from app.core.models.tasks import BlockchainTask from app.core.models.content_v3 import ( diff --git a/app/core/models/events.py b/app/core/models/events.py new file mode 100644 index 0000000..cbdac04 --- /dev/null +++ b/app/core/models/events.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +from datetime import datetime +from sqlalchemy import ( + Column, + Integer, + BigInteger, + String, + DateTime, + JSON, + UniqueConstraint, +) + +from .base import AlchemyBase + + +class NodeEvent(AlchemyBase): + __tablename__ = 'node_events' + __table_args__ = ( + UniqueConstraint('origin_public_key', 'seq', name='uq_node_events_origin_seq'), + UniqueConstraint('uid', name='uq_node_events_uid'), + ) + + id = Column(Integer, autoincrement=True, primary_key=True) + origin_public_key = Column(String(128), nullable=False) + origin_host = Column(String(256), nullable=True) + seq = Column(BigInteger, nullable=False) + uid = Column(String(64), nullable=False) + event_type = Column(String(64), nullable=False) + payload = Column(JSON, nullable=False, default=dict) + signature = Column(String(512), nullable=False) + created_at = Column(DateTime, nullable=False, default=datetime.utcnow) + received_at = Column(DateTime, nullable=False, default=datetime.utcnow) + applied_at = Column(DateTime, nullable=True) + status = Column(String(32), nullable=False, default='recorded') + + +class NodeEventCursor(AlchemyBase): + __tablename__ = 'node_event_cursors' + __table_args__ = ( + UniqueConstraint('source_public_key', name='uq_event_cursor_source'), + ) + + id = Column(Integer, autoincrement=True, primary_key=True) + source_public_key = Column(String(128), nullable=False) + last_seq = Column(BigInteger, nullable=False, default=0) + updated_at = Column(DateTime, nullable=False, default=datetime.utcnow, onupdate=datetime.utcnow) + source_public_host = Column(String(256), nullable=True) diff --git a/app/core/models/transaction.py b/app/core/models/transaction.py index 8a6b94e..21fa1fc 100644 --- a/app/core/models/transaction.py +++ b/app/core/models/transaction.py @@ -49,8 +49,15 @@ class StarsInvoice(AlchemyBase): user_id = Column(Integer, ForeignKey('users.id'), nullable=True) content_hash = Column(String(256), nullable=True) + telegram_id = Column(Integer, nullable=True) invoice_url = Column(String(256), nullable=True) paid = Column(Boolean, nullable=False, default=False) + paid_at = Column(DateTime, nullable=True) + payment_tx_id = Column(String(256), nullable=True) + payment_node_id = Column(String(128), nullable=True) + payment_node_public_host = Column(String(256), nullable=True) + bot_username = Column(String(128), nullable=True) + is_remote = Column(Boolean, nullable=False, default=False) created = Column(DateTime, nullable=False, default=datetime.utcnow) diff --git a/app/core/models/user/__init__.py b/app/core/models/user/__init__.py index 750918d..0276619 100644 --- a/app/core/models/user/__init__.py +++ b/app/core/models/user/__init__.py @@ -1,5 +1,5 @@ from datetime import datetime -from sqlalchemy import Column, Integer, String, BigInteger, DateTime, JSON +from sqlalchemy import Column, Integer, String, BigInteger, DateTime, JSON, Boolean from sqlalchemy.orm import relationship from app.core.auth_v1 import AuthenticationMixin as AuthenticationMixin_V1 @@ -23,6 +23,7 @@ class User(AlchemyBase, DisplayMixin, TranslationCore, AuthenticationMixin_V1, W username = Column(String(512), nullable=True) lang_code = Column(String(8), nullable=False, default="en") meta = Column(JSON, nullable=False, default=dict) + is_admin = Column(Boolean, nullable=False, default=False) last_use = Column(DateTime, nullable=False, default=datetime.utcnow) updated = Column(DateTime, nullable=False, default=datetime.utcnow) diff --git a/app/core/network/nodesig.py b/app/core/network/nodesig.py index e428eb1..1aac32e 100644 --- a/app/core/network/nodesig.py +++ b/app/core/network/nodesig.py @@ -62,9 +62,15 @@ def verify_request(request, memory) -> Tuple[bool, str, str]: import nacl.signing vk = nacl.signing.VerifyKey(b58decode(node_id)) sig = b58decode(sig_b58) - msg = canonical_string(request.method, request.path, request.body or b"", ts, nonce, node_id) + path = request.path + query_string = getattr(request, 'query_string', None) + if query_string: + if not isinstance(query_string, str): + query_string = query_string.decode() if isinstance(query_string, bytes) else str(query_string) + if query_string: + path = f"{path}?{query_string}" + msg = canonical_string(request.method, path, request.body or b"", ts, nonce, node_id) vk.verify(msg, sig) return True, node_id, "" except Exception as e: return False, "", f"BAD_SIGNATURE: {e}" -