uploader-bot/app/core/validation/trust_manager.py

119 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

from __future__ import annotations
import logging
from dataclasses import asdict
from typing import Dict, Optional
from app.core.models.validation.validation_models import TrustScore, NodeTrust
logger = logging.getLogger(__name__)
class TrustManager:
"""
Управление доверием между нодами.
- Хранит score (0.0-1.0), blacklist и флаг manual_override.
- Предоставляет API для оценки/обновления/проверки доверия.
"""
def __init__(self, default_score: float = 0.5, min_trusted: float = 0.6):
self._nodes: Dict[str, NodeTrust] = {}
self.default_score = max(0.0, min(1.0, float(default_score)))
self.min_trusted = max(0.0, min(1.0, float(min_trusted)))
logger.debug("TrustManager initialized: default_score=%s, min_trusted=%s", self.default_score, self.min_trusted)
def _get_or_create(self, node_id: str) -> NodeTrust:
if node_id not in self._nodes:
self._nodes[node_id] = NodeTrust(node_id=node_id, score=self.default_score)
logger.info("TrustManager: new node registered with default score: %s", node_id)
return self._nodes[node_id]
def assess_node_trust(self, node_id: str) -> TrustScore:
"""
Вернуть текущий TrustScore для ноды.
"""
state = self._get_or_create(node_id)
logger.debug("assess_node_trust: %s -> score=%.3f, blacklisted=%s, override=%s",
node_id, state.score, state.blacklisted, state.manual_override)
return TrustScore(node_id=node_id, score=state.score, reason=("blacklisted" if state.blacklisted else None))
def update_trust_score(self, node_id: str, delta: float, *, reason: Optional[str] = None) -> TrustScore:
"""
Обновить score ноды на delta в диапазоне [0.0, 1.0].
Положительное delta увеличивает доверие, отрицательное — уменьшает.
"""
state = self._get_or_create(node_id)
prev = state.score
state.score = max(0.0, min(1.0, prev + float(delta)))
if reason:
state.note = reason
logger.info("update_trust_score: %s: %.3f -> %.3f (reason=%s)", node_id, prev, state.score, reason)
return TrustScore(node_id=node_id, score=state.score, reason=reason)
def set_blacklist(self, node_id: str, blacklisted: bool = True, *, note: Optional[str] = None) -> NodeTrust:
"""
Добавить/убрать ноду из blacklist.
"""
state = self._get_or_create(node_id)
state.blacklisted = bool(blacklisted)
if note:
state.note = note
logger.warning("set_blacklist: %s -> %s", node_id, state.blacklisted)
return state
def set_manual_override(self, node_id: str, override: bool = True, *, note: Optional[str] = None) -> NodeTrust:
"""
Установить ручной override доверия для ноды (форсированное доверие).
"""
state = self._get_or_create(node_id)
state.manual_override = bool(override)
if note:
state.note = note
logger.warning("set_manual_override: %s -> %s", node_id, state.manual_override)
return state
def is_node_trusted(self, node_id: str) -> bool:
"""
Возвращает True если нода считается доверенной:
- НЕ находится в blacklist
- Имеет score >= min_trusted
- ЛИБО установлен manual_override (в этом случае blacklist игнорируется только если override True)
"""
state = self._get_or_create(node_id)
if state.manual_override:
logger.debug("is_node_trusted: %s -> True (manual_override)", node_id)
return True
if state.blacklisted:
logger.debug("is_node_trusted: %s -> False (blacklisted)", node_id)
return False
trusted = state.score >= self.min_trusted
logger.debug("is_node_trusted: %s -> %s (score=%.3f, min_trusted=%.3f)", node_id, trusted, state.score, self.min_trusted)
return trusted
def export_state(self) -> Dict[str, Dict]:
"""
Экспорт текущего состояния (для сериализации/персистентности).
"""
return {nid: self._nodes[nid].to_dict() for nid in self._nodes}
def import_state(self, data: Dict[str, Dict]) -> None:
"""
Импорт состояния (восстановление из персистентного хранилища).
"""
self._nodes.clear()
for nid, raw in data.items():
try:
self._nodes[nid] = NodeTrust(
node_id=raw["node_id"],
score=float(raw.get("score", self.default_score)),
blacklisted=bool(raw.get("blacklisted", False)),
manual_override=bool(raw.get("manual_override", False)),
note=raw.get("note"),
updated_at=raw.get("updated_at"),
)
except Exception as e:
logger.error("Failed to import node trust record %s: %s", nid, e)
logger.info("TrustManager state imported: nodes=%d", len(self._nodes))