uploader-bot/app/core/background/event_sync_service.py

153 lines
6.5 KiB
Python

import asyncio
from typing import Dict, List, Optional, Tuple
from urllib.parse import urlencode
import httpx
from sqlalchemy import select
from app.core.logger import make_log
from app.core.storage import db_session
from app.core.models import KnownNode, NodeEvent
from app.core.events.service import (
store_remote_events,
upsert_cursor,
LOCAL_PUBLIC_KEY,
)
from app.core.models.events import NodeEventCursor
from app.core._secrets import hot_pubkey, hot_seed
from app.core.network.nodesig import sign_headers
from base58 import b58encode
def _node_public_base(node: KnownNode) -> Optional[str]:
meta = node.meta or {}
public_host = (meta.get('public_host') or '').strip()
if public_host:
base = public_host.rstrip('/')
if base.startswith('http://') or base.startswith('https://'):
return base
scheme = 'https' if node.port == 443 else 'http'
return f"{scheme}://{base.lstrip('/')}"
scheme = 'https' if node.port == 443 else 'http'
host = (node.ip or '').strip()
if not host:
return None
default_port = 443 if scheme == 'https' else 80
if node.port and node.port != default_port:
return f"{scheme}://{host}:{node.port}"
return f"{scheme}://{host}"
async def _fetch_events_for_node(node: KnownNode, limit: int = 100) -> Tuple[List[Dict], int]:
base = _node_public_base(node)
if not base:
return [], 0
async with db_session() as session:
cursor = (await session.execute(
select(NodeEventCursor).where(NodeEventCursor.source_public_key == node.public_key)
)).scalar_one_or_none()
since = cursor.last_seq if cursor else 0
query = urlencode({"since": since, "limit": limit})
path = f"/api/v1/network.events?{query}"
url = f"{base}{path}"
pk_b58 = b58encode(hot_pubkey).decode()
headers = sign_headers("GET", path, b"", hot_seed, pk_b58)
async with httpx.AsyncClient(timeout=20.0) as client:
try:
resp = await client.get(url, headers=headers)
if resp.status_code == 403:
make_log("Events", f"Access denied by node {node.public_key}", level="warning")
return [], since
resp.raise_for_status()
data = resp.json()
except Exception as exc:
make_log("Events", f"Fetch events failed from {node.public_key}: {exc}", level="debug")
return [], since
events = data.get("events") or []
next_since = int(data.get("next_since") or since)
return events, next_since
async def _apply_event(session, event: NodeEvent):
if event.event_type == "stars_payment":
from app.core.models import StarsInvoice
payload = event.payload or {}
invoice_id = payload.get("invoice_id")
telegram_id = payload.get("telegram_id")
content_hash = payload.get("content_hash")
amount = payload.get("amount")
if not invoice_id or not telegram_id or not content_hash:
return
invoice = (await session.execute(select(StarsInvoice).where(StarsInvoice.external_id == invoice_id))).scalar_one_or_none()
if not invoice:
invoice = StarsInvoice(
external_id=invoice_id,
user_id=payload.get("user_id"),
type=payload.get('type') or 'access',
telegram_id=telegram_id,
amount=amount,
content_hash=content_hash,
paid=True,
paid_at=event.created_at,
payment_node_id=payload.get("payment_node", {}).get("public_key"),
payment_node_public_host=payload.get("payment_node", {}).get("public_host"),
bot_username=payload.get("bot_username"),
is_remote=True,
)
session.add(invoice)
else:
invoice.paid = True
invoice.paid_at = invoice.paid_at or event.created_at
invoice.payment_node_id = payload.get("payment_node", {}).get("public_key")
invoice.payment_node_public_host = payload.get("payment_node", {}).get("public_host")
invoice.bot_username = payload.get("bot_username") or invoice.bot_username
invoice.telegram_id = telegram_id or invoice.telegram_id
invoice.is_remote = invoice.is_remote or True
if payload.get('type'):
invoice.type = payload['type']
event.status = 'applied'
event.applied_at = event.applied_at or event.received_at
elif event.event_type == "content_indexed":
# The index scout will pick up via remote_content_index; we only mark event applied
event.status = 'recorded'
elif event.event_type == "node_registered":
event.status = 'recorded'
else:
event.status = 'recorded'
async def main_fn(memory):
make_log("Events", "Sync service started", level="info")
while True:
try:
async with db_session() as session:
nodes = (await session.execute(select(KnownNode))).scalars().all()
trusted_nodes = [
n for n in nodes
if isinstance(n.meta, dict) and n.meta.get("role") == "trusted" and n.public_key != LOCAL_PUBLIC_KEY
]
trusted_keys = {n.public_key for n in trusted_nodes}
for node in trusted_nodes:
events, next_since = await _fetch_events_for_node(node)
if not events:
if next_since:
async with db_session() as session:
await upsert_cursor(session, node.public_key, next_since, node.meta.get("public_host") if isinstance(node.meta, dict) else None)
await session.commit()
continue
async with db_session() as session:
stored = await store_remote_events(
session,
events,
allowed_public_keys=trusted_keys,
)
for ev in stored:
await _apply_event(session, ev)
if stored:
await session.commit()
await upsert_cursor(session, node.public_key, next_since, node.meta.get("public_host") if isinstance(node.meta, dict) else None)
await session.commit()
except Exception as exc:
make_log("Events", f"Sync loop error: {exc}", level="error")
await asyncio.sleep(10)