import json import asyncio import os import string import aiofiles from hashlib import sha256 from base58 import b58encode from datetime import datetime, timedelta from httpx import AsyncClient from app.core.logger import make_log from app.core._config import PROJECT_HOST, UPLOADS_DIR from app.core._crypto.signer import Signer from app.core._secrets import hot_seed from app.core.models.node_storage import StoredContent async def create_new_content( db_session, type: str, content_bin: bytes, **kwargs ) -> [StoredContent, bool]: # return content, is_new assert type.startswith("local/"), "Invalid type" kwargs = {k: v for k, v in kwargs.items() if not (k in ['id', 'content_id', 'created', 'onchain_index'])} content_hash_bin = sha256(content_bin).digest() content_hash_b58 = b58encode(content_hash_bin).decode() new_content = db_session.query(StoredContent).filter(StoredContent.hash == content_hash_b58).first() if new_content: return new_content, False new_content = StoredContent( type=type, hash=content_hash_b58, **kwargs, created=datetime.now(), ) db_session.add(new_content) db_session.commit() new_content = db_session.query(StoredContent).filter(StoredContent.hash == content_hash_b58).first() assert new_content, "Content not created (through utils)" content_filepath = os.path.join(UPLOADS_DIR, content_hash_b58) async with aiofiles.open(content_filepath, 'wb') as file: await file.write(content_bin) return new_content, True async def create_metadata_for_item( db_session, title: str = None, cover_url: str = None, authors: list = None, hashtags: list = [], downloadable: bool = False, ) -> StoredContent: assert title, "No title provided" # assert cover_url, "No cover_url provided" assert len(title) > 3, "Title too short" title = title[:100].strip() item_metadata = { 'name': title, 'description': ' '.join([f"#{''.join([(_h_ch if (_h_ch.lower() in string.ascii_lowercase) else '_') for _h_ch in _h])}" for _h in hashtags]), 'attributes': [ # { # 'trait_type': 'Artist', # 'value': 'Unknown' # }, ], 'downloadable': downloadable, } if cover_url: item_metadata['image'] = cover_url item_metadata['authors'] = [ ''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1]) for _a in (authors or [])[:500] ] # Upload file metadata_bin = json.dumps(item_metadata).encode() metadata_hash = sha256(metadata_bin).digest() metadata_hash_b58 = b58encode(metadata_hash).decode() metadata_content, is_new = await create_new_content( db_session, "local/content_bin", metadata_bin, filename="metadata.json", meta={'content_type': 'application/json'}, ) return metadata_content