from __future__ import annotations import os from typing import AsyncIterator, Dict, Any, Iterable, Optional import httpx IPFS_API_URL = os.getenv("IPFS_API_URL", "http://ipfs:5001") IPFS_GATEWAY_URL = os.getenv("IPFS_GATEWAY_URL", "http://ipfs:8080") async def add_streamed_file(stream_iter: Iterable[bytes], filename: str = "file.bin", params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: """ Stream-encrypt pipeline can pass a generator of bytes here. We stream to /api/v0/add as multipart. Returns dict with fields from IPFS: { Name, Hash, Size }. """ params = params or {} # Ensure deterministic chunking and CIDv1 default_params = { "cid-version": 1, "raw-leaves": "true", "chunker": f"size-{int(os.getenv('CRYPTO_CHUNK_BYTES', '1048576'))}", "pin": "true", "wrap-with-directory": "false", "progress": "true", } q = {**default_params, **params} async with httpx.AsyncClient(timeout=None) as client: files = {"file": (filename, stream_iter, "application/octet-stream")} r = await client.post(f"{IPFS_API_URL}/api/v0/add", params=q, files=files) r.raise_for_status() # /add may emit NDJSON lines; most often single JSON try: data = r.json() except Exception: # Fallback: last non-empty line last = [ln for ln in r.text.splitlines() if ln.strip()][-1] import json as _json data = _json.loads(last) return data async def pin_add(cid: str, recursive: bool = True) -> Dict[str, Any]: async with httpx.AsyncClient(timeout=None) as client: r = await client.post(f"{IPFS_API_URL}/api/v0/pin/add", params={"arg": cid, "recursive": str(recursive).lower(), "progress": "true"}) r.raise_for_status() return r.json() async def pin_ls(cid: str) -> Dict[str, Any]: async with httpx.AsyncClient(timeout=30) as client: r = await client.post(f"{IPFS_API_URL}/api/v0/pin/ls", params={"arg": cid}) r.raise_for_status() return r.json() async def swarm_connect(multiaddr: str) -> Dict[str, Any]: async with httpx.AsyncClient(timeout=10) as client: r = await client.post(f"{IPFS_API_URL}/api/v0/swarm/connect", params={"arg": multiaddr}) r.raise_for_status() return r.json() async def cat_stream(cid: str): client = httpx.AsyncClient(timeout=None) try: async with client.stream("POST", f"{IPFS_API_URL}/api/v0/cat", params={"arg": cid}) as r: r.raise_for_status() async for chunk in r.aiter_bytes(): if chunk: yield chunk finally: await client.aclose() async def find_providers(cid: str, max_results: int = 8): """Query DHT for providers of a CID and return a list of {peer, addrs[]}. Uses /api/v0/dht/findprovs and parses NDJSON stream. """ out = [] async with httpx.AsyncClient(timeout=30) as client: async with client.stream("POST", f"{IPFS_API_URL}/api/v0/dht/findprovs", params={"arg": cid}) as r: r.raise_for_status() async for line in r.aiter_lines(): if not line: continue try: j = httpx.Response(200, text=line).json() except Exception: import json as _json try: j = _json.loads(line) except Exception: continue # Entries can include 'Extra' or 'Responses' resps = j.get('Responses') or [] for resp in resps: peer = resp.get('ID') or resp.get('ID', '') addrs = resp.get('Addrs') or [] if peer: out.append({"peer": peer, "addrs": addrs}) if len(out) >= max_results: return out return out async def bitswap_stat() -> Dict[str, Any]: async with httpx.AsyncClient(timeout=10) as client: r = await client.post(f"{IPFS_API_URL}/api/v0/bitswap/stat") r.raise_for_status() return r.json() async def repo_stat() -> Dict[str, Any]: async with httpx.AsyncClient(timeout=10) as client: r = await client.post(f"{IPFS_API_URL}/api/v0/repo/stat") r.raise_for_status() return r.json()