130 lines
6.2 KiB
Python
130 lines
6.2 KiB
Python
#!/usr/bin/env python3
|
||
import argparse
|
||
import asyncio
|
||
import json
|
||
import logging
|
||
import os
|
||
import sys
|
||
from typing import Any, Dict, List, Optional
|
||
|
||
logging.basicConfig(level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
|
||
log = logging.getLogger("debug_sync")
|
||
|
||
try:
|
||
from app.core.content.sync_manager import ContentSyncManager
|
||
from app.core.models.content.chunk import ContentChunk
|
||
except Exception as e:
|
||
print(f"[FATAL] Cannot import app modules: {e}", file=sys.stderr)
|
||
sys.exit(2)
|
||
|
||
|
||
def load_chunks_from_json(path: str) -> List[ContentChunk]:
|
||
with open(path, "r", encoding="utf-8") as f:
|
||
data = json.load(f)
|
||
chunks_raw = data.get("chunks") if isinstance(data, dict) else data
|
||
if not isinstance(chunks_raw, list):
|
||
raise ValueError("Invalid chunks JSON: expected list or {'chunks': [...]} object")
|
||
out: List[ContentChunk] = []
|
||
for d in chunks_raw:
|
||
out.append(ContentChunk.from_dict(d))
|
||
return out
|
||
|
||
|
||
async def run_request_chunks(target_url: str, content_id: str, indexes: List[int], batch_size: int) -> Dict[str, Any]:
|
||
mgr = ContentSyncManager()
|
||
log.info("Requesting chunks from %s for content_id=%s indexes=%s", target_url, content_id, indexes)
|
||
res = await mgr.request_chunks(target_url, content_id, indexes, batch_size=batch_size)
|
||
log.info("Request done. requested=%s received=%s errors=%s", res.get("requested"), res.get("received"), len(res.get("errors", [])))
|
||
return res
|
||
|
||
|
||
async def run_provide_chunks(input_json: str, content_id: str, indexes: List[int], batch_limit: int) -> Dict[str, Any]:
|
||
"""
|
||
Локальная проверка выдачи чанков: читаем заранее подготовленные чанки из JSON,
|
||
прогоняем их через provide_chunks (включая локальную проверку целостности).
|
||
"""
|
||
all_chunks = load_chunks_from_json(input_json)
|
||
by_idx: Dict[int, ContentChunk] = {c.chunk_index: c for c in all_chunks}
|
||
|
||
def storage_reader(cid: str, idx: int) -> Optional[ContentChunk]:
|
||
if cid != content_id:
|
||
return None
|
||
return by_idx.get(idx)
|
||
|
||
mgr = ContentSyncManager()
|
||
log.info("Providing chunks for content_id=%s indexes=%s (batch_limit=%d)", content_id, indexes, batch_limit)
|
||
res = await mgr.provide_chunks(content_id, indexes, storage_reader=storage_reader, batch_limit=batch_limit)
|
||
log.info("Provide done. ok=%d errors=%d", len(res.get("chunks", [])), len(res.get("errors", [])))
|
||
return res
|
||
|
||
|
||
async def run_sync_content(nodes: List[str], content_id: str, have_indexes: List[int], total_chunks: int) -> Dict[str, Any]:
|
||
mgr = ContentSyncManager()
|
||
log.info("Sync content start: nodes=%s content_id=%s have=%d total=%d", nodes, content_id, len(have_indexes), total_chunks)
|
||
res = await mgr.sync_content(nodes, content_id, have_indexes=have_indexes, total_chunks=total_chunks)
|
||
log.info("Sync result: downloaded=%s", res.get("downloaded"))
|
||
return res
|
||
|
||
|
||
def parse_int_list(s: str) -> List[int]:
|
||
if not s:
|
||
return []
|
||
parts = [p.strip() for p in s.split(",") if p.strip()]
|
||
out: List[int] = []
|
||
for p in parts:
|
||
if "-" in p:
|
||
a, b = p.split("-", 1)
|
||
out.extend(list(range(int(a), int(b) + 1)))
|
||
else:
|
||
out.append(int(p))
|
||
return out
|
||
|
||
|
||
def main() -> int:
|
||
ap = argparse.ArgumentParser(description="Диагностика синхронизации контента между нодами")
|
||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||
|
||
p_req = sub.add_parser("request", help="Запросить чанки у удаленной ноды")
|
||
p_req.add_argument("--target", required=True, help="Базовый URL удаленной ноды (например http://localhost:8000)")
|
||
p_req.add_argument("--content-id", required=True, help="Идентификатор контента")
|
||
p_req.add_argument("--indexes", required=True, help="Список индексов (например '0,1,2,5-10')")
|
||
p_req.add_argument("--batch-size", type=int, default=32, help="Размер батча (по умолчанию 32)")
|
||
|
||
p_prov = sub.add_parser("provide", help="Проверить локальную выдачу чанков из JSON")
|
||
p_prov.add_argument("--input-json", required=True, help="Путь к JSON с чанками (list[chunk] или {'chunks': [...]})")
|
||
p_prov.add_argument("--content-id", required=True, help="Идентификатор контента")
|
||
p_prov.add_argument("--indexes", required=True, help="Список индексов (например '0,1,2,5-10')")
|
||
p_prov.add_argument("--batch-limit", type=int, default=128, help="Ограничение размеров ответа")
|
||
|
||
p_sync = sub.add_parser("sync", help="Полная процедура синхронизации по нескольким нодам")
|
||
p_sync.add_argument("--nodes", required=True, help="Список узлов через запятую")
|
||
p_sync.add_argument("--content-id", required=True)
|
||
p_sync.add_argument("--have-indexes", default="", help="Индексы, которые уже есть локально")
|
||
p_sync.add_argument("--total-chunks", type=int, required=True, help="Общее количество чанков")
|
||
|
||
args = ap.parse_args()
|
||
|
||
if args.cmd == "request":
|
||
indexes = parse_int_list(args.indexes)
|
||
res = asyncio.run(run_request_chunks(args.target, args.content_id, indexes, batch_size=args.batch_size))
|
||
print(json.dumps(res, ensure_ascii=False, indent=2))
|
||
return 0
|
||
|
||
if args.cmd == "provide":
|
||
indexes = parse_int_list(args.indexes)
|
||
res = asyncio.run(run_provide_chunks(args.input_json, args.content_id, indexes, batch_limit=args.batch_limit))
|
||
print(json.dumps(res, ensure_ascii=False, indent=2))
|
||
return 0
|
||
|
||
if args.cmd == "sync":
|
||
nodes = [n.strip() for n in args.nodes.split(",") if n.strip()]
|
||
have = parse_int_list(args.have_indexes)
|
||
res = asyncio.run(run_sync_content(nodes, args.content_id, have_indexes=have, total_chunks=args.total_chunks))
|
||
print(json.dumps(res, ensure_ascii=False, indent=2))
|
||
return 0
|
||
|
||
return 2
|
||
|
||
|
||
if __name__ == "__main__":
|
||
raise SystemExit(main()) |