import json import asyncio import os import aiofiles from hashlib import sha256 from base58 import b58encode from datetime import datetime, timedelta from httpx import AsyncClient from app.core.logger import make_log from app.core._config import PROJECT_HOST, UPLOADS_DIR from app.core._crypto.signer import Signer from app.core._secrets import hot_seed from app.core.models.node_storage import StoredContent async def create_new_content( db_session, type: str, content_bin: bytes, **kwargs ) -> [StoredContent, bool]: # return content, is_new assert type.startswith("local/"), "Invalid type" kwargs = {k: v for k, v in kwargs.items() if not (k in ['id', 'content_id', 'created', 'onchain_index'])} content_hash_bin = sha256(content_bin).digest() content_hash_b58 = b58encode(content_hash_bin).decode() new_content = db_session.query(StoredContent).filter(StoredContent.hash == content_hash_b58).first() if new_content: return new_content, False new_content = StoredContent( type=type, hash=content_hash_b58, **kwargs, created=datetime.now(), ) db_session.add(new_content) db_session.commit() new_content = db_session.query(StoredContent).filter(StoredContent.hash == content_hash_b58).first() assert new_content, "Content not created (through utils)" content_filepath = os.path.join(UPLOADS_DIR, content_hash_b58) async with aiofiles.open(content_filepath, 'wb') as file: await file.write(content_bin) return new_content, True async def create_metadata_for_item( db_session, title: str = None, cover_url: str = None, authors: list = None, hashtags: list = [], ) -> StoredContent: assert title, "No title provided" # assert cover_url, "No cover_url provided" assert len(title) > 3, "Title too short" title = title[:100].strip() item_metadata = { 'name': title, 'description': ' '.join([f"#{_h.replace(' ', '_')}" for _h in hashtags]), 'attributes': [ # { # 'trait_type': 'Artist', # 'value': 'Unknown' # }, ], } if cover_url: item_metadata['image'] = cover_url item_metadata['authors'] = [ ''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1]) for _a in (authors or [])[:500] ] # Upload file metadata_bin = json.dumps(item_metadata).encode() metadata_hash = sha256(metadata_bin).digest() metadata_hash_b58 = b58encode(metadata_hash).decode() metadata_content, is_new = await create_new_content( db_session, "local/content_bin", metadata_bin, filename="metadata.json", meta={'content_type': 'application/json'}, ) return metadata_content