uploader-bot/app/core/content/utils.py

87 lines
2.7 KiB
Python

import json
import asyncio
import os
import aiofiles
from hashlib import sha256
from base58 import b58encode
from datetime import datetime, timedelta
from httpx import AsyncClient
from app.core.logger import make_log
from app.core._config import PROJECT_HOST, UPLOADS_DIR
from app.core._crypto.signer import Signer
from app.core._secrets import hot_seed
from app.core.models.node_storage import StoredContent
async def create_new_content(
db_session, type: str, content_bin: bytes, **kwargs
) -> [StoredContent, bool]: # return content, is_new
assert type.startswith("local/"), "Invalid type"
kwargs = {k: v for k, v in kwargs.items() if not (k in ['id', 'content_id', 'created', 'onchain_index'])}
content_hash_bin = sha256(content_bin).digest()
content_hash_b58 = b58encode(content_hash_bin).decode()
new_content = db_session.query(StoredContent).filter(StoredContent.hash == content_hash_b58).first()
if new_content:
return new_content, False
new_content = StoredContent(
type=type,
hash=content_hash_b58,
**kwargs,
created=datetime.now(),
)
db_session.add(new_content)
db_session.commit()
new_content = db_session.query(StoredContent).filter(StoredContent.hash == content_hash_b58).first()
assert new_content, "Content not created (through utils)"
content_filepath = os.path.join(UPLOADS_DIR, content_hash_b58)
async with aiofiles.open(content_filepath, 'wb') as file:
await file.write(content_bin)
return new_content, True
async def create_metadata_for_item(
db_session,
title: str = None,
cover_url: str = None,
authors: list = None,
hashtags: list = [],
) -> StoredContent:
assert title, "No title provided"
# assert cover_url, "No cover_url provided"
assert len(title) > 3, "Title too short"
title = title[:100].strip()
item_metadata = {
'name': title,
'description': ' '.join([f"#{_h}" for _h in hashtags]),
'attributes': [
# {
# 'trait_type': 'Artist',
# 'value': 'Unknown'
# },
],
}
if cover_url:
item_metadata['image'] = cover_url
item_metadata['authors'] = [
''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1]) for _a in (authors or [])[:500]
]
# Upload file
metadata_bin = json.dumps(item_metadata).encode()
metadata_hash = sha256(metadata_bin).digest()
metadata_hash_b58 = b58encode(metadata_hash).decode()
metadata_content, is_new = await create_new_content(
db_session, "local/content_bin", metadata_bin, filename="metadata.json",
meta={'content_type': 'application/json'},
)
return metadata_content