import json import asyncio import os import string import aiofiles from hashlib import sha256 import re # Added import import unicodedata # Added import from base58 import b58encode from datetime import datetime, timedelta from httpx import AsyncClient from app.core.logger import make_log from app.core._config import PROJECT_HOST, UPLOADS_DIR from app.core._crypto.signer import Signer from app.core._secrets import hot_seed from app.core.models.node_storage import StoredContent async def create_new_content( db_session, type: str, content_bin: bytes, **kwargs ) -> [StoredContent, bool]: # return content, is_new assert type.startswith("local/"), "Invalid type" kwargs = {k: v for k, v in kwargs.items() if not (k in ['id', 'content_id', 'created', 'onchain_index'])} content_hash_bin = sha256(content_bin).digest() content_hash_b58 = b58encode(content_hash_bin).decode() from sqlalchemy import select result = await db_session.execute(select(StoredContent).where(StoredContent.hash == content_hash_b58)) new_content = result.scalars().first() if new_content: return new_content, False new_content = StoredContent( type=type, hash=content_hash_b58, **kwargs, created=datetime.now(), ) db_session.add(new_content) await db_session.commit() result = await db_session.execute(select(StoredContent).where(StoredContent.hash == content_hash_b58)) new_content = result.scalars().first() assert new_content, "Content not created (through utils)" content_filepath = os.path.join(UPLOADS_DIR, content_hash_b58) async with aiofiles.open(content_filepath, 'wb') as file: await file.write(content_bin) return new_content, True # New helper functions for string cleaning def _remove_emojis(text: str) -> str: """Removes common emoji characters from a string.""" # This regex covers many common emojis but might not be exhaustive. emoji_pattern = re.compile( "[" "\U0001F600-\U0001F64F" # emoticons "\U0001F300-\U0001F5FF" # symbols & pictographs "\U0001F680-\U0001F6FF" # transport & map symbols "\U0001F1E0-\U0001F1FF" # flags (iOS) "\U00002702-\U000027B0" # Dingbats "\U000024C2-\U0001F251" # Various symbols "]+", flags=re.UNICODE, ) return emoji_pattern.sub(r'', text) def _clean_text_content(text: str, is_hashtag: bool = False) -> str: """ Cleans a string by removing emojis and unusual characters. Level 1: Emoji removal. Level 2: Unusual character cleaning (specific logic for hashtags). """ if not isinstance(text, str): return "" # Level 1: Remove emojis text_no_emojis = _remove_emojis(text) # Level 2: Clean unusual characters if is_hashtag: # Convert to lowercase processed_text = text_no_emojis.lower() # Replace hyphens, dots, spaces (and sequences) with a single underscore processed_text = re.sub(r'[\s.-]+', '_', processed_text) # Keep only lowercase letters (a-z), digits (0-9), and underscores cleaned_text = re.sub(r'[^a-z0-9_]', '', processed_text) # Remove leading/trailing underscores cleaned_text = cleaned_text.strip('_') # Consolidate multiple underscores into one cleaned_text = re.sub(r'_+', '_', cleaned_text) return cleaned_text else: # For title, authors, or general text # Normalize Unicode characters (e.g., NFKD form) nfkd_form = unicodedata.normalize('NFKD', text_no_emojis) # Keep letters (Unicode), numbers (Unicode), spaces, and basic punctuation # This allows for a wider range of characters suitable for titles/names. cleaned_text_chars = [] for char_in_nfkd in nfkd_form: if not unicodedata.combining(char_in_nfkd): # remove combining diacritics # Keep letters, numbers, spaces, and specific punctuation cat = unicodedata.category(char_in_nfkd) if cat.startswith('L') or cat.startswith('N') or cat.startswith('Z') or char_in_nfkd in '.,!?-': cleaned_text_chars.append(char_in_nfkd) cleaned_text = "".join(cleaned_text_chars) # Normalize multiple spaces to a single space and strip leading/trailing spaces cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() return cleaned_text async def create_metadata_for_item( db_session, title: str = None, artist: str = None, cover_url: str = None, authors: list = None, hashtags: list = [], downloadable: bool = False, ) -> StoredContent: assert title, "No title provided" # assert cover_url, "No cover_url provided" # Original comment, kept as is # Clean title using the new helper function cleaned_title = _clean_text_content(title, is_hashtag=False) cleaned_title = cleaned_title[:100].strip() # Truncate and strip after cleaning assert len(cleaned_title) > 3, f"Cleaned title '{cleaned_title}' (from original '{title}') is too short or became empty after cleaning." cleaned_artist = None if artist: cleaned_artist = _clean_text_content(artist, is_hashtag=False) cleaned_artist = cleaned_artist[:100].strip() if not cleaned_artist: cleaned_artist = None display_name = f"{cleaned_artist} – {cleaned_title}" if cleaned_artist else cleaned_title # Process and clean hashtags processed_hashtags = [] if hashtags and isinstance(hashtags, list): for _h_tag_text in hashtags: if isinstance(_h_tag_text, str): cleaned_h = _clean_text_content(_h_tag_text, is_hashtag=True) if cleaned_h: # Add only if not empty after cleaning processed_hashtags.append(cleaned_h) # Ensure uniqueness of hashtags and limit their count (e.g., to first 10 unique) # Using dict.fromkeys to preserve order while ensuring uniqueness processed_hashtags = list(dict.fromkeys(processed_hashtags))[:10] item_metadata = { 'name': display_name, 'title': cleaned_title, 'display_name': display_name, 'downloadable': downloadable, 'tags': processed_hashtags, # New field for storing the list of cleaned hashtags 'attributes': [], } if cleaned_artist: item_metadata['artist'] = cleaned_artist item_metadata['attributes'].append({ 'trait_type': 'Artist', 'value': cleaned_artist, }) # Generate description from the processed hashtags item_metadata['description'] = ' '.join([f"#{h}" for h in processed_hashtags if h]) if cover_url: item_metadata['image'] = cover_url # Clean authors cleaned_authors = [] if authors and isinstance(authors, list): for author_name in (authors or [])[:500]: # Limit number of authors if isinstance(author_name, str): # Apply general cleaning to author names # This replaces the old logic: ''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1]) cleaned_author = _clean_text_content(author_name, is_hashtag=False) if cleaned_author.strip(): # Ensure not empty cleaned_authors.append(cleaned_author.strip()[:100]) # Limit length of each author name item_metadata['authors'] = cleaned_authors # Upload file metadata_bin = json.dumps(item_metadata).encode() metadata_hash = sha256(metadata_bin).digest() metadata_hash_b58 = b58encode(metadata_hash).decode() metadata_content, is_new = await create_new_content( db_session, "local/content_bin", metadata_bin, filename="metadata.json", meta={'content_type': 'application/json'}, ) return metadata_content