uploader-bot/app/core/background/index_scout_v3.py

186 lines
8.5 KiB
Python

import asyncio
import os
from typing import List, Optional
import httpx
import random
import shutil
from sqlalchemy import select
from app.core.logger import make_log
from app.core.storage import db_session
from app.core.models.my_network import KnownNode
from app.core.models.content_v3 import EncryptedContent, ContentDerivative
from app.core.ipfs_client import pin_add, find_providers, swarm_connect
INTERVAL_SEC = 60
ENV_PIN_CONCURRENCY = int(os.getenv('SYNC_MAX_CONCURRENT_PINS', '4'))
ENV_DISK_WATERMARK_PCT = int(os.getenv('SYNC_DISK_LOW_WATERMARK_PCT', '90'))
async def fetch_index(base_url: str, etag: Optional[str], since: Optional[str]) -> tuple[List[dict], Optional[str]]:
try:
headers = {}
params = {}
if since:
params['since'] = since
url = f"{base_url.rstrip('/')}/api/v1/content.delta" if since else f"{base_url.rstrip('/')}/api/v1/content.index"
if etag:
headers['If-None-Match'] = etag
async with httpx.AsyncClient(timeout=20) as client:
r = await client.get(url, headers=headers, params=params)
if r.status_code != 200:
if r.status_code == 304:
return [], etag
return [], etag
j = r.json()
new_etag = r.headers.get('ETag') or etag
return j.get('items') or [], (j.get('next_since') or new_etag or etag)
except Exception:
return [], etag
async def upsert_content(item: dict):
cid = item.get('encrypted_cid')
if not cid:
return
async with db_session() as session:
row = (await session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == cid))).scalars().first()
if not row:
row = EncryptedContent(
encrypted_cid=cid,
title=item.get('title') or cid,
description=item.get('description') or '',
content_type=item.get('content_type') or 'application/octet-stream',
enc_size_bytes=item.get('size_bytes'),
preview_enabled=bool(item.get('preview_enabled')),
preview_conf=item.get('preview_conf') or {},
salt_b64=item.get('salt_b64'),
)
session.add(row)
else:
row.title = item.get('title') or row.title
row.description = item.get('description') or row.description
row.content_type = item.get('content_type') or row.content_type
row.enc_size_bytes = item.get('size_bytes') or row.enc_size_bytes
row.preview_enabled = bool(item.get('preview_enabled')) if item.get('preview_enabled') is not None else row.preview_enabled
if item.get('preview_conf'):
row.preview_conf = item['preview_conf']
if item.get('salt_b64'):
row.salt_b64 = item['salt_b64']
await session.commit()
# Fetch thumbnail via HTTP if provided and not present locally
cover_url = item.get('cover_url')
if cover_url:
try:
async with db_session() as session:
ec = (await session.execute(select(EncryptedContent).where(EncryptedContent.encrypted_cid == cid))).scalars().first()
have_thumb = (await session.execute(select(ContentDerivative).where(ContentDerivative.content_id == ec.id, ContentDerivative.kind == 'decrypted_thumbnail', ContentDerivative.status == 'ready'))).scalars().first()
if not have_thumb:
import httpx, tempfile, os
async with httpx.AsyncClient(timeout=30) as client:
r = await client.get(cover_url)
r.raise_for_status()
tmp = tempfile.NamedTemporaryFile(delete=False)
tmp.write(r.content)
tmp.close()
# Save into store
from app.core.background.convert_v3_service import _save_derivative
h, size = await _save_derivative(tmp.name, os.path.basename(cover_url) or 'thumb.jpg')
cd = ContentDerivative(
content_id=ec.id,
kind='decrypted_thumbnail',
local_path=os.path.join(os.getenv('UPLOADS_DIR', '/app/data'), h),
content_type=r.headers.get('Content-Type') or 'image/jpeg',
size_bytes=size,
status='ready',
)
session.add(cd)
await session.commit()
except Exception as e:
make_log('index_scout_v3', f"thumbnail fetch failed for {cid}: {e}", level='warning')
async def main_fn(memory):
make_log('index_scout_v3', 'Service started', level='info')
sem = None
while True:
try:
# Read runtime config from ServiceConfig (fallback to env)
from app.core.models._config import ServiceConfig
async with db_session() as session:
max_pins = int(await ServiceConfig(session).get('SYNC_MAX_CONCURRENT_PINS', ENV_PIN_CONCURRENCY))
disk_pct = int(await ServiceConfig(session).get('SYNC_DISK_LOW_WATERMARK_PCT', ENV_DISK_WATERMARK_PCT))
if sem is None or sem._value != max_pins:
sem = asyncio.Semaphore(max_pins)
async with db_session() as session:
nodes = (await session.execute(select(KnownNode))).scalars().all()
for n in nodes:
base = f"http://{n.ip}:{n.port}"
# jitter 0..30s per node to reduce stampede
await asyncio.sleep(random.uniform(0, 30))
etag = (n.meta or {}).get('index_etag')
since = (n.meta or {}).get('index_since')
items, marker = await fetch_index(base, etag, since)
if not items and marker == etag:
continue
# update node markers
try:
async with db_session() as session:
row = (await session.execute(select(KnownNode).where(KnownNode.id == n.id))).scalars().first()
if row:
meta = row.meta or {}
meta['index_etag'] = marker
meta['index_since'] = marker if (marker and 'T' in str(marker)) else meta.get('index_since')
row.meta = meta
await session.commit()
except Exception:
pass
if not items:
continue
make_log('index_scout_v3', f"Fetched {len(items)} from {base}")
# Check disk watermark
try:
from app.core._config import UPLOADS_DIR
du = shutil.disk_usage(UPLOADS_DIR)
used_pct = int(100 * (1 - du.free / du.total))
if used_pct >= disk_pct:
make_log('index_scout_v3', f"Disk watermark reached ({used_pct}%), skipping pins")
continue
except Exception:
pass
async def _pin_one(cid: str):
async with sem:
try:
# Try to pre-connect to discovered providers
try:
provs = await find_providers(cid, max_results=5)
for p in provs:
for addr in (p.get('addrs') or [])[:2]:
try:
await swarm_connect(addr)
except Exception:
pass
except Exception:
pass
await pin_add(cid, recursive=True)
except Exception as e:
make_log('index_scout_v3', f"pin {cid} failed: {e}", level='warning')
tasks = []
for it in items:
await upsert_content(it)
cid = it.get('encrypted_cid')
if cid:
tasks.append(asyncio.create_task(_pin_one(cid)))
if tasks:
await asyncio.gather(*tasks)
except Exception as e:
make_log('index_scout_v3', f"loop error: {e}", level='error')
await asyncio.sleep(INTERVAL_SEC)