377 lines
19 KiB
Python
377 lines
19 KiB
Python
import asyncio
|
|
import os
|
|
from datetime import datetime
|
|
from typing import List, Optional
|
|
|
|
import httpx
|
|
from urllib.parse import urlparse
|
|
import random
|
|
import shutil
|
|
from sqlalchemy import select
|
|
|
|
from app.core.logger import make_log
|
|
from app.core.storage import db_session
|
|
from app.core.models.my_network import KnownNode, RemoteContentIndex
|
|
from app.core.models.events import NodeEvent
|
|
from app.core.models.content_v3 import EncryptedContent, ContentDerivative
|
|
from app.core.ipfs_client import pin_add, pin_ls, find_providers, swarm_connect, add_streamed_file
|
|
from app.core.events.service import LOCAL_PUBLIC_KEY
|
|
|
|
|
|
INTERVAL_SEC = 60
|
|
ENV_PIN_CONCURRENCY = int(os.getenv('SYNC_MAX_CONCURRENT_PINS', '4'))
|
|
ENV_DISK_WATERMARK_PCT = int(os.getenv('SYNC_DISK_LOW_WATERMARK_PCT', '90'))
|
|
|
|
|
|
async def fetch_index(base_url: str, etag: Optional[str], since: Optional[str]) -> tuple[List[dict], Optional[str]]:
|
|
try:
|
|
headers = {}
|
|
params = {}
|
|
if since:
|
|
params['since'] = since
|
|
url = f"{base_url.rstrip('/')}/api/v1/content.delta" if since else f"{base_url.rstrip('/')}/api/v1/content.index"
|
|
if etag:
|
|
headers['If-None-Match'] = etag
|
|
# follow_redirects handles peers that force HTTPS and issue 301s
|
|
async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client:
|
|
r = await client.get(url, headers=headers, params=params)
|
|
if r.status_code != 200:
|
|
if r.status_code == 304:
|
|
return [], etag
|
|
return [], etag
|
|
j = r.json()
|
|
new_etag = r.headers.get('ETag') or etag
|
|
return j.get('items') or [], (j.get('next_since') or new_etag or etag)
|
|
except Exception:
|
|
return [], etag
|
|
|
|
|
|
async def upsert_content(item: dict):
|
|
cid = item.get('encrypted_cid')
|
|
if not cid:
|
|
return
|
|
async with db_session() as session:
|
|
row = (await session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == cid))).scalars().first()
|
|
if not row:
|
|
row = EncryptedContent(
|
|
encrypted_cid=cid,
|
|
title=item.get('title') or cid,
|
|
description=item.get('description') or '',
|
|
content_type=item.get('content_type') or 'application/octet-stream',
|
|
enc_size_bytes=item.get('size_bytes'),
|
|
preview_enabled=bool(item.get('preview_enabled')),
|
|
preview_conf=item.get('preview_conf') or {},
|
|
salt_b64=item.get('salt_b64'),
|
|
)
|
|
session.add(row)
|
|
else:
|
|
row.title = item.get('title') or row.title
|
|
row.description = item.get('description') or row.description
|
|
row.content_type = item.get('content_type') or row.content_type
|
|
row.enc_size_bytes = item.get('size_bytes') or row.enc_size_bytes
|
|
row.preview_enabled = bool(item.get('preview_enabled')) if item.get('preview_enabled') is not None else row.preview_enabled
|
|
if item.get('preview_conf'):
|
|
row.preview_conf = item['preview_conf']
|
|
if item.get('salt_b64'):
|
|
row.salt_b64 = item['salt_b64']
|
|
await session.commit()
|
|
|
|
# Fetch thumbnail via HTTP if provided and not present locally
|
|
cover_url = item.get('cover_url')
|
|
if cover_url:
|
|
try:
|
|
async with db_session() as session:
|
|
ec = (await session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == cid))).scalars().first()
|
|
have_thumb = (await session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id, ContentDerivative.kind == 'decrypted_thumbnail', ContentDerivative.status == 'ready'))).scalars().first()
|
|
if not have_thumb:
|
|
import httpx, tempfile, os
|
|
async with httpx.AsyncClient(timeout=30) as client:
|
|
r = await client.get(cover_url)
|
|
r.raise_for_status()
|
|
tmp = tempfile.NamedTemporaryFile(delete=False)
|
|
tmp.write(r.content)
|
|
tmp.close()
|
|
# Save into store
|
|
from app.core.background.convert_v3_service import _save_derivative
|
|
h, size = await _save_derivative(tmp.name, os.path.basename(cover_url) or 'thumb.jpg')
|
|
cd = ContentDerivative(
|
|
content_id=ec.id,
|
|
kind='decrypted_thumbnail',
|
|
local_path=os.path.join(os.getenv('UPLOADS_DIR', '/app/data'), h),
|
|
content_type=r.headers.get('Content-Type') or 'image/jpeg',
|
|
size_bytes=size,
|
|
status='ready',
|
|
)
|
|
session.add(cd)
|
|
await session.commit()
|
|
except Exception as e:
|
|
make_log('index_scout_v3', f"thumbnail fetch failed for {cid}: {e}", level='warning')
|
|
|
|
|
|
def _node_base_url(node: KnownNode) -> Optional[str]:
|
|
meta = node.meta or {}
|
|
public_host = (meta.get('public_host') or '').strip()
|
|
if public_host:
|
|
base = public_host.rstrip('/')
|
|
if base.startswith('http://') or base.startswith('https://'):
|
|
return base
|
|
scheme = 'https' if node.port == 443 else 'http'
|
|
return f"{scheme}://{base.lstrip('/')}"
|
|
scheme = 'https' if node.port == 443 else 'http'
|
|
host = (node.ip or '').strip()
|
|
if not host:
|
|
return None
|
|
default_port = 443 if scheme == 'https' else 80
|
|
if node.port and node.port != default_port:
|
|
return f"{scheme}://{host}:{node.port}"
|
|
return f"{scheme}://{host}"
|
|
|
|
|
|
async def _update_remote_index(node_id: int, items: List[dict], *, incremental: bool):
|
|
if not items:
|
|
return
|
|
async with db_session() as session:
|
|
existing_rows = (await session.execute(
|
|
select(RemoteContentIndex).where(RemoteContentIndex.remote_node_id == node_id)
|
|
)).scalars().all()
|
|
existing_map = {row.encrypted_hash: row for row in existing_rows if row.encrypted_hash}
|
|
seen = set()
|
|
now = datetime.utcnow()
|
|
for item in items:
|
|
cid = item.get('encrypted_cid')
|
|
if not cid:
|
|
continue
|
|
seen.add(cid)
|
|
payload_meta = {
|
|
'title': item.get('title'),
|
|
'description': item.get('description'),
|
|
'size_bytes': item.get('size_bytes'),
|
|
'preview_enabled': item.get('preview_enabled'),
|
|
'preview_conf': item.get('preview_conf'),
|
|
'issuer_node_id': item.get('issuer_node_id'),
|
|
'salt_b64': item.get('salt_b64'),
|
|
}
|
|
meta_clean = {k: v for k, v in payload_meta.items() if v is not None}
|
|
row = existing_map.get(cid)
|
|
if row:
|
|
row.content_type = item.get('content_type') or row.content_type
|
|
row.meta = {**(row.meta or {}), **meta_clean}
|
|
row.last_updated = now
|
|
else:
|
|
row = RemoteContentIndex(
|
|
remote_node_id=node_id,
|
|
content_type=item.get('content_type') or 'application/octet-stream',
|
|
encrypted_hash=cid,
|
|
meta=meta_clean,
|
|
last_updated=now,
|
|
)
|
|
session.add(row)
|
|
if not incremental and existing_map:
|
|
for hash_value, row in list(existing_map.items()):
|
|
if hash_value not in seen:
|
|
await session.delete(row)
|
|
await session.commit()
|
|
|
|
|
|
async def main_fn(memory):
|
|
make_log('index_scout_v3', 'Service started', level='info')
|
|
sem = None
|
|
while True:
|
|
try:
|
|
# Read runtime config from ServiceConfig (fallback to env)
|
|
from app.core.models._config import ServiceConfig
|
|
async with db_session() as session:
|
|
max_pins = int(await ServiceConfig(session).get('SYNC_MAX_CONCURRENT_PINS', ENV_PIN_CONCURRENCY))
|
|
disk_pct = int(await ServiceConfig(session).get('SYNC_DISK_LOW_WATERMARK_PCT', ENV_DISK_WATERMARK_PCT))
|
|
if sem is None or sem._value != max_pins:
|
|
sem = asyncio.Semaphore(max_pins)
|
|
async with db_session() as session:
|
|
nodes = (await session.execute(select(KnownNode))).scalars().all()
|
|
node_by_pk = {n.public_key: n for n in nodes if n.public_key}
|
|
async with db_session() as session:
|
|
pending_events = (await session.execute(
|
|
select(NodeEvent)
|
|
.where(NodeEvent.event_type == 'content_indexed', NodeEvent.status.in_(('recorded', 'local', 'processing')))
|
|
.order_by(NodeEvent.created_at.asc())
|
|
.limit(25)
|
|
)).scalars().all()
|
|
for ev in pending_events:
|
|
if ev.status != 'processing':
|
|
ev.status = 'processing'
|
|
await session.commit()
|
|
for ev in pending_events:
|
|
payload = ev.payload or {}
|
|
cid = payload.get('encrypted_cid') or payload.get('content_cid')
|
|
if ev.origin_public_key == LOCAL_PUBLIC_KEY:
|
|
async with db_session() as session:
|
|
ref = await session.get(NodeEvent, ev.id)
|
|
if ref:
|
|
ref.status = 'applied'
|
|
ref.applied_at = datetime.utcnow()
|
|
await session.commit()
|
|
continue
|
|
if not cid:
|
|
async with db_session() as session:
|
|
ref = await session.get(NodeEvent, ev.id)
|
|
if ref:
|
|
ref.status = 'applied'
|
|
ref.applied_at = datetime.utcnow()
|
|
await session.commit()
|
|
continue
|
|
node = node_by_pk.get(ev.origin_public_key)
|
|
if not node:
|
|
async with db_session() as session:
|
|
node = (await session.execute(select(KnownNode).where(KnownNode.public_key == ev.origin_public_key))).scalar_one_or_none()
|
|
if node:
|
|
node_by_pk[node.public_key] = node
|
|
if not node:
|
|
make_log('index_scout_v3', f"Event {ev.uid} refers to unknown node {ev.origin_public_key}", level='debug')
|
|
async with db_session() as session:
|
|
ref = await session.get(NodeEvent, ev.id)
|
|
if ref:
|
|
ref.status = 'recorded'
|
|
await session.commit()
|
|
continue
|
|
try:
|
|
await _pin_one(node, cid)
|
|
async with db_session() as session:
|
|
ref = await session.get(NodeEvent, ev.id)
|
|
if ref:
|
|
ref.status = 'applied'
|
|
ref.applied_at = datetime.utcnow()
|
|
await session.commit()
|
|
except Exception as exc:
|
|
make_log('index_scout_v3', f"Event pin failed for {cid}: {exc}", level='warning')
|
|
async with db_session() as session:
|
|
ref = await session.get(NodeEvent, ev.id)
|
|
if ref:
|
|
ref.status = 'recorded'
|
|
await session.commit()
|
|
for n in nodes:
|
|
base = _node_base_url(n)
|
|
if not base:
|
|
continue
|
|
# jitter 0..30s per node to reduce stampede
|
|
await asyncio.sleep(random.uniform(0, 30))
|
|
etag = (n.meta or {}).get('index_etag')
|
|
since = (n.meta or {}).get('index_since')
|
|
items, marker = await fetch_index(base, etag, since)
|
|
if not items and marker == etag:
|
|
continue
|
|
# update node markers
|
|
try:
|
|
async with db_session() as session:
|
|
row = (await session.execute(select(KnownNode).where(KnownNode.id == n.id))).scalars().first()
|
|
if row:
|
|
meta = row.meta or {}
|
|
meta['index_etag'] = marker
|
|
meta['index_since'] = marker if (marker and 'T' in str(marker)) else meta.get('index_since')
|
|
row.meta = meta
|
|
await session.commit()
|
|
except Exception:
|
|
pass
|
|
|
|
if not items:
|
|
continue
|
|
make_log('index_scout_v3', f"Fetched {len(items)} from {base}")
|
|
try:
|
|
await _update_remote_index(n.id, items, incremental=bool(since))
|
|
except Exception as exc:
|
|
make_log('index_scout_v3', f"remote index update failed for node {n.id}: {exc}", level='warning')
|
|
|
|
# Check disk watermark
|
|
try:
|
|
from app.core._config import UPLOADS_DIR
|
|
du = shutil.disk_usage(UPLOADS_DIR)
|
|
used_pct = int(100 * (1 - du.free / du.total))
|
|
if used_pct >= disk_pct:
|
|
make_log('index_scout_v3', f"Disk watermark reached ({used_pct}%), skipping pins")
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
async def _pin_one(node: KnownNode, cid: str):
|
|
async with sem:
|
|
try:
|
|
node_ipfs_meta = (node.meta or {}).get('ipfs') or {}
|
|
multiaddrs = node_ipfs_meta.get('multiaddrs') or []
|
|
for addr in multiaddrs:
|
|
try:
|
|
await swarm_connect(addr)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
existing = await pin_ls(cid)
|
|
if existing and existing.get('Keys'):
|
|
make_log('index_scout_v3', f"pin {cid} already present", level='debug')
|
|
return
|
|
except Exception:
|
|
pass
|
|
# Try to pre-connect to discovered providers
|
|
try:
|
|
provs = await find_providers(cid, max_results=5)
|
|
for p in provs:
|
|
for addr in (p.get('addrs') or [])[:2]:
|
|
try:
|
|
await swarm_connect(addr)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
try:
|
|
await asyncio.wait_for(pin_add(cid, recursive=True), timeout=60)
|
|
return
|
|
except httpx.HTTPStatusError as http_err:
|
|
body = (http_err.response.text or '').lower() if http_err.response else ''
|
|
if 'already pinned' in body or 'pin already set' in body:
|
|
make_log('index_scout_v3', f"pin {cid} already present", level='debug')
|
|
return
|
|
raise
|
|
except Exception as e:
|
|
# Attempt HTTP gateway fallback before logging failure
|
|
fallback_sources = []
|
|
node_host = node.meta.get('public_host') if isinstance(node.meta, dict) else None
|
|
try:
|
|
# Derive gateway host: prefer public_host domain if present
|
|
parsed = urlparse(node_host) if node_host else None
|
|
gateway_host = parsed.hostname if parsed and parsed.hostname else (node.ip or '').split(':')[0]
|
|
gateway_port = parsed.port if (parsed and parsed.port not in (None, 80, 443)) else 8080
|
|
if gateway_host:
|
|
gateway_url = f"http://{gateway_host}:{gateway_port}/ipfs/{cid}"
|
|
make_log('index_scout_v3', f"fallback download start {cid} via {gateway_url}", level='debug')
|
|
async with httpx.AsyncClient(timeout=None) as client:
|
|
resp = await client.get(gateway_url)
|
|
resp.raise_for_status()
|
|
data = resp.content
|
|
chunk_bytes = int(os.getenv('CRYPTO_CHUNK_BYTES', '1048576'))
|
|
add_params = {
|
|
'cid-version': 1,
|
|
'raw-leaves': 'true',
|
|
'chunker': f'size-{chunk_bytes}',
|
|
'hash': 'sha2-256',
|
|
'pin': 'true',
|
|
}
|
|
result = await add_streamed_file([data], filename=f'{cid}.bin', params=add_params)
|
|
if str(result.get('Hash')) != str(cid):
|
|
raise ValueError(f"gateway add returned mismatched CID {result.get('Hash')}")
|
|
make_log('index_scout_v3', f"pin {cid} fetched via gateway {gateway_host}:{gateway_port}", level='info')
|
|
return
|
|
else:
|
|
fallback_sources.append('gateway-host-missing')
|
|
except Exception as fallback_err:
|
|
fallback_sources.append(str(fallback_err))
|
|
make_log('index_scout_v3', f"pin {cid} failed: {e}; fallback={'; '.join(fallback_sources) if fallback_sources else 'none'}", level='warning')
|
|
|
|
tasks = []
|
|
for it in items:
|
|
await upsert_content(it)
|
|
cid = it.get('encrypted_cid')
|
|
if cid:
|
|
make_log('index_scout_v3', f"queue pin {cid}")
|
|
tasks.append(asyncio.create_task(_pin_one(n, cid)))
|
|
if tasks:
|
|
await asyncio.gather(*tasks)
|
|
except Exception as e:
|
|
make_log('index_scout_v3', f"loop error: {e}", level='error')
|
|
await asyncio.sleep(INTERVAL_SEC)
|