64 lines
1.9 KiB
Python
64 lines
1.9 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import Dict, Set
|
|
|
|
from app.core.network.config import HANDSHAKE_RATE_LIMIT_PER_MIN, HANDSHAKE_TS_TOLERANCE_SEC
|
|
|
|
|
|
def check_rate_limit(memory, remote_ip: str) -> bool:
|
|
"""Simple per-IP rate limit within current minute window.
|
|
Returns True if allowed, False if limited.
|
|
"""
|
|
now = int(time.time())
|
|
minute = now // 60
|
|
rl = getattr(memory, "_handshake_rl", None)
|
|
if rl is None or rl.get("minute") != minute:
|
|
rl = {"minute": minute, "counts": {}}
|
|
memory._handshake_rl = rl
|
|
counts = rl["counts"]
|
|
cnt = counts.get(remote_ip, 0)
|
|
if cnt >= HANDSHAKE_RATE_LIMIT_PER_MIN:
|
|
return False
|
|
counts[remote_ip] = cnt + 1
|
|
return True
|
|
|
|
|
|
def check_timestamp_fresh(ts: int) -> bool:
|
|
now = int(time.time())
|
|
return abs(now - int(ts)) <= HANDSHAKE_TS_TOLERANCE_SEC
|
|
|
|
|
|
def check_and_remember_nonce(memory, pubkey_b58: str, nonce: str) -> bool:
|
|
"""Return True if nonce is new; remember nonce with TTL ~ tolerance window.
|
|
We keep a compact in-memory set per pubkey.
|
|
"""
|
|
now = int(time.time())
|
|
store = getattr(memory, "_handshake_nonces", None)
|
|
if store is None:
|
|
store = {}
|
|
memory._handshake_nonces = store
|
|
|
|
entry = store.get(pubkey_b58)
|
|
if entry is None:
|
|
entry = {"nonces": {}, "updated": now}
|
|
store[pubkey_b58] = entry
|
|
|
|
nonces: Dict[str, int] = entry["nonces"]
|
|
# prune old nonces
|
|
to_delete = [k for k, t in nonces.items() if now - int(t) > HANDSHAKE_TS_TOLERANCE_SEC]
|
|
for k in to_delete:
|
|
nonces.pop(k, None)
|
|
|
|
if nonce in nonces:
|
|
return False
|
|
# prevent unbounded growth
|
|
if len(nonces) > 2048:
|
|
# drop half oldest
|
|
for k, _ in sorted(nonces.items(), key=lambda kv: kv[1])[:1024]:
|
|
nonces.pop(k, None)
|
|
nonces[nonce] = now
|
|
entry["updated"] = now
|
|
return True
|
|
|