142 lines
6.9 KiB
Python
142 lines
6.9 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
|
|
from sqlalchemy import select
|
|
|
|
from app.core.logger import make_log
|
|
from app.core.models.node_storage import StoredContent
|
|
from app.core.network.dht import dht_config
|
|
from app.core.network.nodes import list_known_public_nodes
|
|
import httpx
|
|
from app.core.storage import db_session
|
|
|
|
|
|
async def replication_daemon(app):
|
|
await asyncio.sleep(5)
|
|
memory = getattr(app.ctx, "memory", None)
|
|
if not memory:
|
|
make_log("Replication", "No memory context; replication daemon exiting", level="warning")
|
|
return
|
|
make_log("Replication", "daemon started")
|
|
while True:
|
|
try:
|
|
membership_state = memory.membership.state
|
|
async with db_session(auto_commit=False) as session:
|
|
rows = await session.execute(select(StoredContent.hash))
|
|
content_hashes = [row[0] for row in rows.all()]
|
|
# Build allowed (trusted) node_ids set
|
|
from app.core.models.my_network import KnownNode
|
|
from app.core._utils.b58 import b58decode
|
|
from app.core.network.dht.crypto import compute_node_id
|
|
trusted_rows = (await session.execute(select(KnownNode))).scalars().all()
|
|
allowed_nodes = set()
|
|
for kn in trusted_rows:
|
|
role = (kn.meta or {}).get('role') if kn.meta else None
|
|
if role == 'trusted' and kn.public_key:
|
|
try:
|
|
nid = compute_node_id(b58decode(kn.public_key))
|
|
allowed_nodes.add(nid)
|
|
except Exception:
|
|
pass
|
|
# Always include ourselves
|
|
allowed_nodes.add(memory.node_id)
|
|
for content_hash in content_hashes:
|
|
try:
|
|
state = memory.replication.ensure_replication(content_hash, membership_state, allowed_nodes=allowed_nodes)
|
|
memory.replication.heartbeat(content_hash, memory.node_id)
|
|
make_log("Replication", f"Replicated {content_hash} leader={state.leader}", level="debug")
|
|
except Exception as exc:
|
|
make_log("Replication", f"ensure failed for {content_hash}: {exc}", level="warning")
|
|
except Exception as exc:
|
|
make_log("Replication", f"daemon iteration failed: {exc}", level="error")
|
|
await asyncio.sleep(dht_config.heartbeat_interval)
|
|
|
|
|
|
async def heartbeat_daemon(app):
|
|
await asyncio.sleep(dht_config.heartbeat_interval // 2)
|
|
memory = getattr(app.ctx, "memory", None)
|
|
if not memory:
|
|
return
|
|
while True:
|
|
try:
|
|
async with db_session(auto_commit=False) as session:
|
|
rows = await session.execute(select(StoredContent.hash))
|
|
content_hashes = [row[0] for row in rows.all()]
|
|
for content_hash in content_hashes:
|
|
memory.replication.heartbeat(content_hash, memory.node_id)
|
|
except Exception as exc:
|
|
make_log("Replication", f"heartbeat failed: {exc}", level="warning")
|
|
await asyncio.sleep(dht_config.heartbeat_interval)
|
|
|
|
|
|
async def dht_gossip_daemon(app):
|
|
# Периодически публикуем снимок DHT на известные публичные ноды
|
|
await asyncio.sleep(7)
|
|
memory = getattr(app.ctx, "memory", None)
|
|
if not memory:
|
|
return
|
|
while True:
|
|
try:
|
|
# собираем список публичных хостов доверенных узлов
|
|
async with db_session(auto_commit=True) as session:
|
|
from sqlalchemy import select
|
|
from app.core.models.my_network import KnownNode
|
|
nodes = (await session.execute(select(KnownNode))).scalars().all()
|
|
urls = []
|
|
peers = []
|
|
for n in nodes:
|
|
role = (n.meta or {}).get('role') if n.meta else None
|
|
if role != 'trusted':
|
|
continue
|
|
pub = (n.meta or {}).get('public_host') or n.ip
|
|
if not pub:
|
|
continue
|
|
base = pub.rstrip('/')
|
|
if not base.startswith('http'):
|
|
base = f"http://{base}:{n.port or 80}"
|
|
url = base + "/api/v1/dht.put"
|
|
urls.append(url)
|
|
peers.append(base)
|
|
if urls:
|
|
snapshot = memory.dht_store.snapshot()
|
|
records = list(snapshot.values())
|
|
payload = {
|
|
'public_key': None, # получатель обязует себя проверять подпись записи, не отправителя тут
|
|
'records': records,
|
|
}
|
|
# public_key не обязателен в пакетном режиме, записи содержат собственные подписи
|
|
timeout = httpx.Timeout(5.0, read=10.0)
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
# Throttle/backoff per peer
|
|
from time import time as _t
|
|
from app.core.network.dht.prometheus import record_gossip_success, record_gossip_failure, record_gossip_skipped
|
|
if not hasattr(dht_gossip_daemon, '_backoff'):
|
|
dht_gossip_daemon._backoff = {}
|
|
backoff = dht_gossip_daemon._backoff # type: ignore
|
|
for url, peer in zip(urls, peers):
|
|
now = _t()
|
|
st = backoff.get(peer) or { 'fail': 0, 'next': 0.0 }
|
|
if now < st['next']:
|
|
record_gossip_skipped(peer, 'backoff')
|
|
continue
|
|
try:
|
|
r = await client.post(url, json=payload)
|
|
if 200 <= r.status_code < 300:
|
|
backoff[peer] = { 'fail': 0, 'next': 0.0 }
|
|
record_gossip_success(peer)
|
|
else:
|
|
raise Exception(f"HTTP {r.status_code}")
|
|
except Exception as exc:
|
|
st['fail'] = st.get('fail', 0) + 1
|
|
base = max(1, int(dht_config.gossip_backoff_base_sec))
|
|
cap = max(base, int(dht_config.gossip_backoff_cap_sec))
|
|
wait = min(cap, base * (2 ** max(0, st['fail'] - 1)))
|
|
st['next'] = now + wait
|
|
backoff[peer] = st
|
|
record_gossip_failure(peer, wait)
|
|
make_log('DHT.gossip', f'gossip failed {url}: {exc}; backoff {wait}s', level='debug')
|
|
except Exception as exc:
|
|
make_log('DHT.gossip', f'iteration error: {exc}', level='warning')
|
|
await asyncio.sleep(dht_config.gossip_interval_sec)
|