from __future__ import annotations import logging from dataclasses import asdict from typing import Dict, Optional from app.core.models.validation.validation_models import TrustScore, NodeTrust logger = logging.getLogger(__name__) class TrustManager: """ Управление доверием между нодами. - Хранит score (0.0-1.0), blacklist и флаг manual_override. - Предоставляет API для оценки/обновления/проверки доверия. """ def __init__(self, default_score: float = 0.5, min_trusted: float = 0.6): self._nodes: Dict[str, NodeTrust] = {} self.default_score = max(0.0, min(1.0, float(default_score))) self.min_trusted = max(0.0, min(1.0, float(min_trusted))) logger.debug("TrustManager initialized: default_score=%s, min_trusted=%s", self.default_score, self.min_trusted) def _get_or_create(self, node_id: str) -> NodeTrust: if node_id not in self._nodes: self._nodes[node_id] = NodeTrust(node_id=node_id, score=self.default_score) logger.info("TrustManager: new node registered with default score: %s", node_id) return self._nodes[node_id] def assess_node_trust(self, node_id: str) -> TrustScore: """ Вернуть текущий TrustScore для ноды. """ state = self._get_or_create(node_id) logger.debug("assess_node_trust: %s -> score=%.3f, blacklisted=%s, override=%s", node_id, state.score, state.blacklisted, state.manual_override) return TrustScore(node_id=node_id, score=state.score, reason=("blacklisted" if state.blacklisted else None)) def update_trust_score(self, node_id: str, delta: float, *, reason: Optional[str] = None) -> TrustScore: """ Обновить score ноды на delta в диапазоне [0.0, 1.0]. Положительное delta увеличивает доверие, отрицательное — уменьшает. """ state = self._get_or_create(node_id) prev = state.score state.score = max(0.0, min(1.0, prev + float(delta))) if reason: state.note = reason logger.info("update_trust_score: %s: %.3f -> %.3f (reason=%s)", node_id, prev, state.score, reason) return TrustScore(node_id=node_id, score=state.score, reason=reason) def set_blacklist(self, node_id: str, blacklisted: bool = True, *, note: Optional[str] = None) -> NodeTrust: """ Добавить/убрать ноду из blacklist. """ state = self._get_or_create(node_id) state.blacklisted = bool(blacklisted) if note: state.note = note logger.warning("set_blacklist: %s -> %s", node_id, state.blacklisted) return state def set_manual_override(self, node_id: str, override: bool = True, *, note: Optional[str] = None) -> NodeTrust: """ Установить ручной override доверия для ноды (форсированное доверие). """ state = self._get_or_create(node_id) state.manual_override = bool(override) if note: state.note = note logger.warning("set_manual_override: %s -> %s", node_id, state.manual_override) return state def is_node_trusted(self, node_id: str) -> bool: """ Возвращает True если нода считается доверенной: - НЕ находится в blacklist - Имеет score >= min_trusted - ЛИБО установлен manual_override (в этом случае blacklist игнорируется только если override True) """ state = self._get_or_create(node_id) if state.manual_override: logger.debug("is_node_trusted: %s -> True (manual_override)", node_id) return True if state.blacklisted: logger.debug("is_node_trusted: %s -> False (blacklisted)", node_id) return False trusted = state.score >= self.min_trusted logger.debug("is_node_trusted: %s -> %s (score=%.3f, min_trusted=%.3f)", node_id, trusted, state.score, self.min_trusted) return trusted def export_state(self) -> Dict[str, Dict]: """ Экспорт текущего состояния (для сериализации/персистентности). """ return {nid: self._nodes[nid].to_dict() for nid in self._nodes} def import_state(self, data: Dict[str, Dict]) -> None: """ Импорт состояния (восстановление из персистентного хранилища). """ self._nodes.clear() for nid, raw in data.items(): try: self._nodes[nid] = NodeTrust( node_id=raw["node_id"], score=float(raw.get("score", self.default_score)), blacklisted=bool(raw.get("blacklisted", False)), manual_override=bool(raw.get("manual_override", False)), note=raw.get("note"), updated_at=raw.get("updated_at"), ) except Exception as e: logger.error("Failed to import node trust record %s: %s", nid, e) logger.info("TrustManager state imported: nodes=%d", len(self._nodes))