import asyncio from typing import Dict, List, Optional, Tuple from urllib.parse import urlencode import httpx from sqlalchemy import select from app.core.logger import make_log from app.core.storage import db_session from app.core.models import KnownNode, NodeEvent from app.core.events.service import ( store_remote_events, upsert_cursor, LOCAL_PUBLIC_KEY, ) from app.core.models.events import NodeEventCursor from app.core._secrets import hot_pubkey, hot_seed from app.core.network.nodesig import sign_headers from base58 import b58encode def _node_public_base(node: KnownNode) -> Optional[str]: meta = node.meta or {} public_host = (meta.get('public_host') or '').strip() if public_host: base = public_host.rstrip('/') if base.startswith('http://') or base.startswith('https://'): return base scheme = 'https' if node.port == 443 else 'http' return f"{scheme}://{base.lstrip('/')}" scheme = 'https' if node.port == 443 else 'http' host = (node.ip or '').strip() if not host: return None default_port = 443 if scheme == 'https' else 80 if node.port and node.port != default_port: return f"{scheme}://{host}:{node.port}" return f"{scheme}://{host}" async def _fetch_events_for_node(node: KnownNode, limit: int = 100) -> Tuple[List[Dict], int]: base = _node_public_base(node) if not base: return [], 0 async with db_session() as session: cursor = (await session.execute( select(NodeEventCursor).where(NodeEventCursor.source_public_key == node.public_key) )).scalar_one_or_none() since = cursor.last_seq if cursor else 0 query = urlencode({"since": since, "limit": limit}) path = f"/api/v1/network.events?{query}" url = f"{base}{path}" pk_b58 = b58encode(hot_pubkey).decode() headers = sign_headers("GET", path, b"", hot_seed, pk_b58) async with httpx.AsyncClient(timeout=20.0) as client: try: resp = await client.get(url, headers=headers) if resp.status_code == 403: make_log("Events", f"Access denied by node {node.public_key}", level="warning") return [], since resp.raise_for_status() data = resp.json() except Exception as exc: make_log("Events", f"Fetch events failed from {node.public_key}: {exc}", level="debug") return [], since events = data.get("events") or [] next_since = int(data.get("next_since") or since) return events, next_since async def _apply_event(session, event: NodeEvent): if event.event_type == "stars_payment": from app.core.models import StarsInvoice payload = event.payload or {} invoice_id = payload.get("invoice_id") telegram_id = payload.get("telegram_id") content_hash = payload.get("content_hash") amount = payload.get("amount") if not invoice_id or not telegram_id or not content_hash: return invoice = (await session.execute(select(StarsInvoice).where(StarsInvoice.external_id == invoice_id))).scalar_one_or_none() if not invoice: invoice = StarsInvoice( external_id=invoice_id, user_id=payload.get("user_id"), type=payload.get('type') or 'access', telegram_id=telegram_id, amount=amount, content_hash=content_hash, paid=True, paid_at=event.created_at, payment_node_id=payload.get("payment_node", {}).get("public_key"), payment_node_public_host=payload.get("payment_node", {}).get("public_host"), bot_username=payload.get("bot_username"), is_remote=True, ) session.add(invoice) else: invoice.paid = True invoice.paid_at = invoice.paid_at or event.created_at invoice.payment_node_id = payload.get("payment_node", {}).get("public_key") invoice.payment_node_public_host = payload.get("payment_node", {}).get("public_host") invoice.bot_username = payload.get("bot_username") or invoice.bot_username invoice.telegram_id = telegram_id or invoice.telegram_id invoice.is_remote = invoice.is_remote or True if payload.get('type'): invoice.type = payload['type'] event.status = 'applied' event.applied_at = event.applied_at or event.received_at elif event.event_type == "content_indexed": # The index scout will pick up via remote_content_index; we only mark event applied event.status = 'recorded' elif event.event_type == "node_registered": event.status = 'recorded' else: event.status = 'recorded' async def main_fn(memory): make_log("Events", "Sync service started", level="info") while True: try: async with db_session() as session: nodes = (await session.execute(select(KnownNode))).scalars().all() trusted_nodes = [ n for n in nodes if isinstance(n.meta, dict) and n.meta.get("role") == "trusted" and n.public_key != LOCAL_PUBLIC_KEY ] trusted_keys = {n.public_key for n in trusted_nodes} for node in trusted_nodes: events, next_since = await _fetch_events_for_node(node) if not events: if next_since: async with db_session() as session: await upsert_cursor(session, node.public_key, next_since, node.meta.get("public_host") if isinstance(node.meta, dict) else None) await session.commit() continue async with db_session() as session: stored = await store_remote_events( session, events, allowed_public_keys=trusted_keys, ) for ev in stored: await _apply_event(session, ev) if stored: await session.commit() await upsert_cursor(session, node.public_key, next_since, node.meta.get("public_host") if isinstance(node.meta, dict) else None) await session.commit() except Exception as exc: make_log("Events", f"Sync loop error: {exc}", level="error") await asyncio.sleep(10)