uploader-bot/scripts/debug_sync.py

130 lines
6.2 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import argparse
import asyncio
import json
import logging
import os
import sys
from typing import Any, Dict, List, Optional
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
log = logging.getLogger("debug_sync")
try:
from app.core.content.sync_manager import ContentSyncManager
from app.core.models.content.chunk import ContentChunk
except Exception as e:
print(f"[FATAL] Cannot import app modules: {e}", file=sys.stderr)
sys.exit(2)
def load_chunks_from_json(path: str) -> List[ContentChunk]:
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
chunks_raw = data.get("chunks") if isinstance(data, dict) else data
if not isinstance(chunks_raw, list):
raise ValueError("Invalid chunks JSON: expected list or {'chunks': [...]} object")
out: List[ContentChunk] = []
for d in chunks_raw:
out.append(ContentChunk.from_dict(d))
return out
async def run_request_chunks(target_url: str, content_id: str, indexes: List[int], batch_size: int) -> Dict[str, Any]:
mgr = ContentSyncManager()
log.info("Requesting chunks from %s for content_id=%s indexes=%s", target_url, content_id, indexes)
res = await mgr.request_chunks(target_url, content_id, indexes, batch_size=batch_size)
log.info("Request done. requested=%s received=%s errors=%s", res.get("requested"), res.get("received"), len(res.get("errors", [])))
return res
async def run_provide_chunks(input_json: str, content_id: str, indexes: List[int], batch_limit: int) -> Dict[str, Any]:
"""
Локальная проверка выдачи чанков: читаем заранее подготовленные чанки из JSON,
прогоняем их через provide_chunks (включая локальную проверку целостности).
"""
all_chunks = load_chunks_from_json(input_json)
by_idx: Dict[int, ContentChunk] = {c.chunk_index: c for c in all_chunks}
def storage_reader(cid: str, idx: int) -> Optional[ContentChunk]:
if cid != content_id:
return None
return by_idx.get(idx)
mgr = ContentSyncManager()
log.info("Providing chunks for content_id=%s indexes=%s (batch_limit=%d)", content_id, indexes, batch_limit)
res = await mgr.provide_chunks(content_id, indexes, storage_reader=storage_reader, batch_limit=batch_limit)
log.info("Provide done. ok=%d errors=%d", len(res.get("chunks", [])), len(res.get("errors", [])))
return res
async def run_sync_content(nodes: List[str], content_id: str, have_indexes: List[int], total_chunks: int) -> Dict[str, Any]:
mgr = ContentSyncManager()
log.info("Sync content start: nodes=%s content_id=%s have=%d total=%d", nodes, content_id, len(have_indexes), total_chunks)
res = await mgr.sync_content(nodes, content_id, have_indexes=have_indexes, total_chunks=total_chunks)
log.info("Sync result: downloaded=%s", res.get("downloaded"))
return res
def parse_int_list(s: str) -> List[int]:
if not s:
return []
parts = [p.strip() for p in s.split(",") if p.strip()]
out: List[int] = []
for p in parts:
if "-" in p:
a, b = p.split("-", 1)
out.extend(list(range(int(a), int(b) + 1)))
else:
out.append(int(p))
return out
def main() -> int:
ap = argparse.ArgumentParser(description="Диагностика синхронизации контента между нодами")
sub = ap.add_subparsers(dest="cmd", required=True)
p_req = sub.add_parser("request", help="Запросить чанки у удаленной ноды")
p_req.add_argument("--target", required=True, help="Базовый URL удаленной ноды (например http://localhost:8000)")
p_req.add_argument("--content-id", required=True, help="Идентификатор контента")
p_req.add_argument("--indexes", required=True, help="Список индексов (например '0,1,2,5-10')")
p_req.add_argument("--batch-size", type=int, default=32, help="Размер батча (по умолчанию 32)")
p_prov = sub.add_parser("provide", help="Проверить локальную выдачу чанков из JSON")
p_prov.add_argument("--input-json", required=True, help="Путь к JSON с чанками (list[chunk] или {'chunks': [...]})")
p_prov.add_argument("--content-id", required=True, help="Идентификатор контента")
p_prov.add_argument("--indexes", required=True, help="Список индексов (например '0,1,2,5-10')")
p_prov.add_argument("--batch-limit", type=int, default=128, help="Ограничение размеров ответа")
p_sync = sub.add_parser("sync", help="Полная процедура синхронизации по нескольким нодам")
p_sync.add_argument("--nodes", required=True, help="Список узлов через запятую")
p_sync.add_argument("--content-id", required=True)
p_sync.add_argument("--have-indexes", default="", help="Индексы, которые уже есть локально")
p_sync.add_argument("--total-chunks", type=int, required=True, help="Общее количество чанков")
args = ap.parse_args()
if args.cmd == "request":
indexes = parse_int_list(args.indexes)
res = asyncio.run(run_request_chunks(args.target, args.content_id, indexes, batch_size=args.batch_size))
print(json.dumps(res, ensure_ascii=False, indent=2))
return 0
if args.cmd == "provide":
indexes = parse_int_list(args.indexes)
res = asyncio.run(run_provide_chunks(args.input_json, args.content_id, indexes, batch_limit=args.batch_limit))
print(json.dumps(res, ensure_ascii=False, indent=2))
return 0
if args.cmd == "sync":
nodes = [n.strip() for n in args.nodes.split(",") if n.strip()]
have = parse_int_list(args.have_indexes)
res = asyncio.run(run_sync_content(nodes, args.content_id, have_indexes=have, total_chunks=args.total_chunks))
print(json.dumps(res, ensure_ascii=False, indent=2))
return 0
return 2
if __name__ == "__main__":
raise SystemExit(main())