import json from hashlib import sha256 from base58 import b58encode from httpx import AsyncClient from app.core._config import PROJECT_HOST from app.core._crypto.signer import Signer from app.core._secrets import hot_seed from app.core.models.node_storage import StoredContent async def create_metadata_for_item( db_session, title: str = None, cover_url: str = None, authors: list = None ) -> StoredContent: assert title, "No title provided" assert cover_url, "No cover_url provided" assert len(title) > 3, "Title too short" title = title[:100].strip() item_metadata = { 'name': title, 'description': '@MY Content Ownership Proof NFT', 'attributes': [ # { # 'trait_type': 'Artist', # 'value': 'Unknown' # }, ] } if cover_url: item_metadata['image'] = cover_url item_metadata['authors'] = [ ''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1]) for _a in (authors or [])[:500] ] signer = Signer(hot_seed) # Upload file metadata_bin = json.dumps(item_metadata).encode() metadata_hash = sha256(metadata_bin).digest() metadata_hash_b58 = b58encode(metadata_hash).decode() async with AsyncClient() as client: response = await client.post( f"{PROJECT_HOST}/api/v1/storage", files={"file": ('metadata.json', metadata_bin, 'application/json')}, headers={ 'X-Service-Signature': signer.sign(metadata_bin), 'X-Message-Hash': metadata_hash_b58, } ) assert response.status_code == 200 response_json = response.json() metadata_sha256 = response_json['content_sha256'] metadata_content = db_session.query(StoredContent).filter(StoredContent.hash == metadata_sha256).first() if metadata_content: return metadata_content raise Exception("Metadata not created")