uploader-bot/app/core/content/utils.py

66 lines
1.9 KiB
Python

import json
from hashlib import sha256
from base58 import b58encode
from httpx import AsyncClient
from app.core._config import PROJECT_HOST
from app.core._crypto.signer import Signer
from app.core._secrets import hot_seed
from app.core.models.node_storage import StoredContent
async def create_metadata_for_item(
db_session,
title: str = None,
cover_url: str = None,
authors: list = None
) -> StoredContent:
assert title, "No title provided"
assert cover_url, "No cover_url provided"
assert len(title) > 3, "Title too short"
title = title[:100].strip()
item_metadata = {
'name': title,
'description': '@MY Content Ownership Proof NFT',
'attributes': [
# {
# 'trait_type': 'Artist',
# 'value': 'Unknown'
# },
]
}
if cover_url:
item_metadata['image'] = cover_url
item_metadata['authors'] = [
''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1]) for _a in (authors or [])[:500]
]
signer = Signer(hot_seed)
# Upload file
metadata_bin = json.dumps(item_metadata).encode()
metadata_hash = sha256(metadata_bin).digest()
metadata_hash_b58 = b58encode(metadata_hash).decode()
async with AsyncClient() as client:
response = await client.post(
f"{PROJECT_HOST}/api/v1/storage",
files={"file": ('metadata.json', metadata_bin, 'application/json')},
headers={
'X-Service-Signature': signer.sign(metadata_bin),
'X-Message-Hash': metadata_hash_b58,
}
)
assert response.status_code == 200
response_json = response.json()
metadata_sha256 = response_json['content_sha256']
metadata_content = db_session.query(StoredContent).filter(StoredContent.hash == metadata_sha256).first()
if metadata_content:
return metadata_content
raise Exception("Metadata not created")