153 lines
6.5 KiB
Python
153 lines
6.5 KiB
Python
import asyncio
|
|
from typing import Dict, List, Optional, Tuple
|
|
from urllib.parse import urlencode
|
|
|
|
import httpx
|
|
from sqlalchemy import select
|
|
|
|
from app.core.logger import make_log
|
|
from app.core.storage import db_session
|
|
from app.core.models import KnownNode, NodeEvent
|
|
from app.core.events.service import (
|
|
store_remote_events,
|
|
upsert_cursor,
|
|
LOCAL_PUBLIC_KEY,
|
|
)
|
|
from app.core.models.events import NodeEventCursor
|
|
from app.core._secrets import hot_pubkey, hot_seed
|
|
from app.core.network.nodesig import sign_headers
|
|
from base58 import b58encode
|
|
|
|
|
|
def _node_public_base(node: KnownNode) -> Optional[str]:
|
|
meta = node.meta or {}
|
|
public_host = (meta.get('public_host') or '').strip()
|
|
if public_host:
|
|
base = public_host.rstrip('/')
|
|
if base.startswith('http://') or base.startswith('https://'):
|
|
return base
|
|
scheme = 'https' if node.port == 443 else 'http'
|
|
return f"{scheme}://{base.lstrip('/')}"
|
|
scheme = 'https' if node.port == 443 else 'http'
|
|
host = (node.ip or '').strip()
|
|
if not host:
|
|
return None
|
|
default_port = 443 if scheme == 'https' else 80
|
|
if node.port and node.port != default_port:
|
|
return f"{scheme}://{host}:{node.port}"
|
|
return f"{scheme}://{host}"
|
|
|
|
|
|
async def _fetch_events_for_node(node: KnownNode, limit: int = 100) -> Tuple[List[Dict], int]:
|
|
base = _node_public_base(node)
|
|
if not base:
|
|
return [], 0
|
|
async with db_session() as session:
|
|
cursor = (await session.execute(
|
|
select(NodeEventCursor).where(NodeEventCursor.source_public_key == node.public_key)
|
|
)).scalar_one_or_none()
|
|
since = cursor.last_seq if cursor else 0
|
|
query = urlencode({"since": since, "limit": limit})
|
|
path = f"/api/v1/network.events?{query}"
|
|
url = f"{base}{path}"
|
|
pk_b58 = b58encode(hot_pubkey).decode()
|
|
headers = sign_headers("GET", path, b"", hot_seed, pk_b58)
|
|
async with httpx.AsyncClient(timeout=20.0) as client:
|
|
try:
|
|
resp = await client.get(url, headers=headers)
|
|
if resp.status_code == 403:
|
|
make_log("Events", f"Access denied by node {node.public_key}", level="warning")
|
|
return [], since
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
except Exception as exc:
|
|
make_log("Events", f"Fetch events failed from {node.public_key}: {exc}", level="debug")
|
|
return [], since
|
|
events = data.get("events") or []
|
|
next_since = int(data.get("next_since") or since)
|
|
return events, next_since
|
|
|
|
|
|
async def _apply_event(session, event: NodeEvent):
|
|
if event.event_type == "stars_payment":
|
|
from app.core.models import StarsInvoice
|
|
payload = event.payload or {}
|
|
invoice_id = payload.get("invoice_id")
|
|
telegram_id = payload.get("telegram_id")
|
|
content_hash = payload.get("content_hash")
|
|
amount = payload.get("amount")
|
|
if not invoice_id or not telegram_id or not content_hash:
|
|
return
|
|
invoice = (await session.execute(select(StarsInvoice).where(StarsInvoice.external_id == invoice_id))).scalar_one_or_none()
|
|
if not invoice:
|
|
invoice = StarsInvoice(
|
|
external_id=invoice_id,
|
|
user_id=payload.get("user_id"),
|
|
type=payload.get('type') or 'access',
|
|
telegram_id=telegram_id,
|
|
amount=amount,
|
|
content_hash=content_hash,
|
|
paid=True,
|
|
paid_at=event.created_at,
|
|
payment_node_id=payload.get("payment_node", {}).get("public_key"),
|
|
payment_node_public_host=payload.get("payment_node", {}).get("public_host"),
|
|
bot_username=payload.get("bot_username"),
|
|
is_remote=True,
|
|
)
|
|
session.add(invoice)
|
|
else:
|
|
invoice.paid = True
|
|
invoice.paid_at = invoice.paid_at or event.created_at
|
|
invoice.payment_node_id = payload.get("payment_node", {}).get("public_key")
|
|
invoice.payment_node_public_host = payload.get("payment_node", {}).get("public_host")
|
|
invoice.bot_username = payload.get("bot_username") or invoice.bot_username
|
|
invoice.telegram_id = telegram_id or invoice.telegram_id
|
|
invoice.is_remote = invoice.is_remote or True
|
|
if payload.get('type'):
|
|
invoice.type = payload['type']
|
|
event.status = 'applied'
|
|
event.applied_at = event.applied_at or event.received_at
|
|
elif event.event_type == "content_indexed":
|
|
# The index scout will pick up via remote_content_index; we only mark event applied
|
|
event.status = 'recorded'
|
|
elif event.event_type == "node_registered":
|
|
event.status = 'recorded'
|
|
else:
|
|
event.status = 'recorded'
|
|
|
|
|
|
async def main_fn(memory):
|
|
make_log("Events", "Sync service started", level="info")
|
|
while True:
|
|
try:
|
|
async with db_session() as session:
|
|
nodes = (await session.execute(select(KnownNode))).scalars().all()
|
|
trusted_nodes = [
|
|
n for n in nodes
|
|
if isinstance(n.meta, dict) and n.meta.get("role") == "trusted" and n.public_key != LOCAL_PUBLIC_KEY
|
|
]
|
|
trusted_keys = {n.public_key for n in trusted_nodes}
|
|
for node in trusted_nodes:
|
|
events, next_since = await _fetch_events_for_node(node)
|
|
if not events:
|
|
if next_since:
|
|
async with db_session() as session:
|
|
await upsert_cursor(session, node.public_key, next_since, node.meta.get("public_host") if isinstance(node.meta, dict) else None)
|
|
await session.commit()
|
|
continue
|
|
async with db_session() as session:
|
|
stored = await store_remote_events(
|
|
session,
|
|
events,
|
|
allowed_public_keys=trusted_keys,
|
|
)
|
|
for ev in stored:
|
|
await _apply_event(session, ev)
|
|
if stored:
|
|
await session.commit()
|
|
await upsert_cursor(session, node.public_key, next_since, node.meta.get("public_host") if isinstance(node.meta, dict) else None)
|
|
await session.commit()
|
|
except Exception as exc:
|
|
make_log("Events", f"Sync loop error: {exc}", level="error")
|
|
await asyncio.sleep(10)
|