from __future__ import annotations import ipaddress from dataclasses import dataclass, field from typing import Dict, Optional from app.core.logger import make_log @dataclass class ASNResolver: cache: Dict[str, int] = field(default_factory=dict) def normalise(self, ip: str | None) -> Optional[str]: if not ip: return None try: return str(ipaddress.ip_address(ip)) except Exception: return None def resolve(self, ip: str | None) -> Optional[int]: norm = self.normalise(ip) if not norm: return None return self.cache.get(norm) def learn(self, ip: str, asn: int) -> None: norm = self.normalise(ip) if not norm: make_log("ASNResolver", f"Invalid IP provided for learn: {ip}", level="warning") return self.cache[norm] = asn async def resolve_async(self, ip: str | None, db_session=None) -> Optional[int]: """Resolve ASN via persistent cache; fallback to RDAP API; store result. - Checks in-memory cache first. - If not found, checks DB table rdap_cache when available. - If still not found, queries a public API and persists. """ norm = self.normalise(ip) if not norm: return None # In-memory cache first if norm in self.cache: return self.cache[norm] # DB lookup if possible try: if db_session is not None: from sqlalchemy import select from app.core.models.rdap import RdapCache row = (await db_session.execute(select(RdapCache).where(RdapCache.ip == norm))).scalars().first() if row and row.asn is not None: self.cache[norm] = int(row.asn) return int(row.asn) except Exception as e: make_log("ASNResolver", f"DB lookup failed for {norm}: {e}", level="warning") # Remote lookup (best-effort) asn: Optional[int] = None try: import httpx url = f"https://api.iptoasn.com/v1/as/ip/{norm}" async with httpx.AsyncClient(timeout=5.0) as client: r = await client.get(url) if r.status_code == 200: j = r.json() num = j.get("as_number") if isinstance(num, int) and num > 0: asn = num except Exception as e: make_log("ASNResolver", f"RDAP lookup failed for {norm}: {e}", level="warning") if asn is not None: self.cache[norm] = asn # Persist to DB if possible try: if db_session is not None: from app.core.models.rdap import RdapCache row = await db_session.get(RdapCache, norm) if row is None: row = RdapCache(ip=norm, asn=asn, source="iptoasn") db_session.add(row) else: row.asn = asn row.source = "iptoasn" await db_session.commit() except Exception as e: make_log("ASNResolver", f"DB persist failed for {norm}: {e}", level="warning") return asn resolver = ASNResolver()