uploader-bot/app/core/content/utils.py

181 lines
7.1 KiB
Python

import json
import asyncio
import os
import string
import aiofiles
from hashlib import sha256
import re # Added import
import unicodedata # Added import
from base58 import b58encode
from datetime import datetime, timedelta
from httpx import AsyncClient
from app.core.logger import make_log
from app.core._config import PROJECT_HOST, UPLOADS_DIR
from app.core._crypto.signer import Signer
from app.core._secrets import hot_seed
from app.core.models.node_storage import StoredContent
async def create_new_content(
db_session, type: str, content_bin: bytes, **kwargs
) -> [StoredContent, bool]: # return content, is_new
assert type.startswith("local/"), "Invalid type"
kwargs = {k: v for k, v in kwargs.items() if not (k in ['id', 'content_id', 'created', 'onchain_index'])}
content_hash_bin = sha256(content_bin).digest()
content_hash_b58 = b58encode(content_hash_bin).decode()
new_content = db_session.query(StoredContent).filter(StoredContent.hash == content_hash_b58).first()
if new_content:
return new_content, False
new_content = StoredContent(
type=type,
hash=content_hash_b58,
**kwargs,
created=datetime.now(),
)
db_session.add(new_content)
db_session.commit()
new_content = db_session.query(StoredContent).filter(StoredContent.hash == content_hash_b58).first()
assert new_content, "Content not created (through utils)"
content_filepath = os.path.join(UPLOADS_DIR, content_hash_b58)
async with aiofiles.open(content_filepath, 'wb') as file:
await file.write(content_bin)
return new_content, True
# New helper functions for string cleaning
def _remove_emojis(text: str) -> str:
"""Removes common emoji characters from a string."""
# This regex covers many common emojis but might not be exhaustive.
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map symbols
"\U0001F1E0-\U0001F1FF" # flags (iOS)
"\U00002702-\U000027B0" # Dingbats
"\U000024C2-\U0001F251" # Various symbols
"]+",
flags=re.UNICODE,
)
return emoji_pattern.sub(r'', text)
def _clean_text_content(text: str, is_hashtag: bool = False) -> str:
"""
Cleans a string by removing emojis and unusual characters.
Level 1: Emoji removal.
Level 2: Unusual character cleaning (specific logic for hashtags).
"""
if not isinstance(text, str):
return ""
# Level 1: Remove emojis
text_no_emojis = _remove_emojis(text)
# Level 2: Clean unusual characters
if is_hashtag:
# Convert to lowercase
processed_text = text_no_emojis.lower()
# Replace hyphens, dots, spaces (and sequences) with a single underscore
processed_text = re.sub(r'[\s.-]+', '_', processed_text)
# Keep only lowercase letters (a-z), digits (0-9), and underscores
cleaned_text = re.sub(r'[^a-z0-9_]', '', processed_text)
# Remove leading/trailing underscores
cleaned_text = cleaned_text.strip('_')
# Consolidate multiple underscores into one
cleaned_text = re.sub(r'_+', '_', cleaned_text)
return cleaned_text
else: # For title, authors, or general text
# Normalize Unicode characters (e.g., NFKD form)
nfkd_form = unicodedata.normalize('NFKD', text_no_emojis)
# Keep letters (Unicode), numbers (Unicode), spaces, and basic punctuation
# This allows for a wider range of characters suitable for titles/names.
cleaned_text_chars = []
for char_in_nfkd in nfkd_form:
if not unicodedata.combining(char_in_nfkd): # remove combining diacritics
# Keep letters, numbers, spaces, and specific punctuation
cat = unicodedata.category(char_in_nfkd)
if cat.startswith('L') or cat.startswith('N') or cat.startswith('Z') or char_in_nfkd in '.,!?-':
cleaned_text_chars.append(char_in_nfkd)
cleaned_text = "".join(cleaned_text_chars)
# Normalize multiple spaces to a single space and strip leading/trailing spaces
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
return cleaned_text
async def create_metadata_for_item(
db_session,
title: str = None,
cover_url: str = None,
authors: list = None,
hashtags: list = [],
downloadable: bool = False,
) -> StoredContent:
assert title, "No title provided"
# assert cover_url, "No cover_url provided" # Original comment, kept as is
# Clean title using the new helper function
cleaned_title = _clean_text_content(title, is_hashtag=False)
cleaned_title = cleaned_title[:100].strip() # Truncate and strip after cleaning
assert len(cleaned_title) > 3, f"Cleaned title '{cleaned_title}' (from original '{title}') is too short or became empty after cleaning."
# Process and clean hashtags
processed_hashtags = []
if hashtags and isinstance(hashtags, list):
for _h_tag_text in hashtags:
if isinstance(_h_tag_text, str):
cleaned_h = _clean_text_content(_h_tag_text, is_hashtag=True)
if cleaned_h: # Add only if not empty after cleaning
processed_hashtags.append(cleaned_h)
# Ensure uniqueness of hashtags and limit their count (e.g., to first 10 unique)
# Using dict.fromkeys to preserve order while ensuring uniqueness
processed_hashtags = list(dict.fromkeys(processed_hashtags))[:10]
item_metadata = {
'name': cleaned_title,
'attributes': [
# {
# 'trait_type': 'Artist',
# 'value': 'Unknown'
# },
],
'downloadable': downloadable,
'tags': processed_hashtags, # New field for storing the list of cleaned hashtags
}
# Generate description from the processed hashtags
item_metadata['description'] = ' '.join([f"#{h}" for h in processed_hashtags if h])
if cover_url:
item_metadata['image'] = cover_url
# Clean authors
cleaned_authors = []
if authors and isinstance(authors, list):
for author_name in (authors or [])[:500]: # Limit number of authors
if isinstance(author_name, str):
# Apply general cleaning to author names
# This replaces the old logic: ''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1])
cleaned_author = _clean_text_content(author_name, is_hashtag=False)
if cleaned_author.strip(): # Ensure not empty
cleaned_authors.append(cleaned_author.strip()[:100]) # Limit length of each author name
item_metadata['authors'] = cleaned_authors
# Upload file
metadata_bin = json.dumps(item_metadata).encode()
metadata_hash = sha256(metadata_bin).digest()
metadata_hash_b58 = b58encode(metadata_hash).decode()
metadata_content, is_new = await create_new_content(
db_session, "local/content_bin", metadata_bin, filename="metadata.json",
meta={'content_type': 'application/json'},
)
return metadata_content