uploader-bot/app/core/content/utils.py

198 lines
7.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import asyncio
import os
import string
import aiofiles
from hashlib import sha256
import re # Added import
import unicodedata # Added import
from base58 import b58encode
from datetime import datetime, timedelta
from httpx import AsyncClient
from app.core.logger import make_log
from app.core._config import PROJECT_HOST, UPLOADS_DIR
from app.core._crypto.signer import Signer
from app.core._secrets import hot_seed
from app.core.models.node_storage import StoredContent
async def create_new_content(
db_session, type: str, content_bin: bytes, **kwargs
) -> [StoredContent, bool]: # return content, is_new
assert type.startswith("local/"), "Invalid type"
kwargs = {k: v for k, v in kwargs.items() if not (k in ['id', 'content_id', 'created', 'onchain_index'])}
content_hash_bin = sha256(content_bin).digest()
content_hash_b58 = b58encode(content_hash_bin).decode()
from sqlalchemy import select
result = await db_session.execute(select(StoredContent).where(StoredContent.hash == content_hash_b58))
new_content = result.scalars().first()
if new_content:
return new_content, False
new_content = StoredContent(
type=type,
hash=content_hash_b58,
**kwargs,
created=datetime.now(),
)
db_session.add(new_content)
await db_session.commit()
result = await db_session.execute(select(StoredContent).where(StoredContent.hash == content_hash_b58))
new_content = result.scalars().first()
assert new_content, "Content not created (through utils)"
content_filepath = os.path.join(UPLOADS_DIR, content_hash_b58)
async with aiofiles.open(content_filepath, 'wb') as file:
await file.write(content_bin)
return new_content, True
# New helper functions for string cleaning
def _remove_emojis(text: str) -> str:
"""Removes common emoji characters from a string."""
# This regex covers many common emojis but might not be exhaustive.
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map symbols
"\U0001F1E0-\U0001F1FF" # flags (iOS)
"\U00002702-\U000027B0" # Dingbats
"\U000024C2-\U0001F251" # Various symbols
"]+",
flags=re.UNICODE,
)
return emoji_pattern.sub(r'', text)
def _clean_text_content(text: str, is_hashtag: bool = False) -> str:
"""
Cleans a string by removing emojis and unusual characters.
Level 1: Emoji removal.
Level 2: Unusual character cleaning (specific logic for hashtags).
"""
if not isinstance(text, str):
return ""
# Level 1: Remove emojis
text_no_emojis = _remove_emojis(text)
# Level 2: Clean unusual characters
if is_hashtag:
# Convert to lowercase
processed_text = text_no_emojis.lower()
# Replace hyphens, dots, spaces (and sequences) with a single underscore
processed_text = re.sub(r'[\s.-]+', '_', processed_text)
# Keep only lowercase letters (a-z), digits (0-9), and underscores
cleaned_text = re.sub(r'[^a-z0-9_]', '', processed_text)
# Remove leading/trailing underscores
cleaned_text = cleaned_text.strip('_')
# Consolidate multiple underscores into one
cleaned_text = re.sub(r'_+', '_', cleaned_text)
return cleaned_text
else: # For title, authors, or general text
# Normalize Unicode characters (e.g., NFKD form)
nfkd_form = unicodedata.normalize('NFKD', text_no_emojis)
# Keep letters (Unicode), numbers (Unicode), spaces, and basic punctuation
# This allows for a wider range of characters suitable for titles/names.
cleaned_text_chars = []
for char_in_nfkd in nfkd_form:
if not unicodedata.combining(char_in_nfkd): # remove combining diacritics
# Keep letters, numbers, spaces, and specific punctuation
cat = unicodedata.category(char_in_nfkd)
if cat.startswith('L') or cat.startswith('N') or cat.startswith('Z') or char_in_nfkd in '.,!?-':
cleaned_text_chars.append(char_in_nfkd)
cleaned_text = "".join(cleaned_text_chars)
# Normalize multiple spaces to a single space and strip leading/trailing spaces
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
return cleaned_text
async def create_metadata_for_item(
db_session,
title: str = None,
artist: str = None,
cover_url: str = None,
authors: list = None,
hashtags: list = [],
downloadable: bool = False,
) -> StoredContent:
assert title, "No title provided"
# assert cover_url, "No cover_url provided" # Original comment, kept as is
# Clean title using the new helper function
cleaned_title = _clean_text_content(title, is_hashtag=False)
cleaned_title = cleaned_title[:100].strip() # Truncate and strip after cleaning
assert len(cleaned_title) > 3, f"Cleaned title '{cleaned_title}' (from original '{title}') is too short or became empty after cleaning."
cleaned_artist = None
if artist:
cleaned_artist = _clean_text_content(artist, is_hashtag=False)
cleaned_artist = cleaned_artist[:100].strip()
if not cleaned_artist:
cleaned_artist = None
display_name = f"{cleaned_artist} {cleaned_title}" if cleaned_artist else cleaned_title
# Process and clean hashtags
processed_hashtags = []
if hashtags and isinstance(hashtags, list):
for _h_tag_text in hashtags:
if isinstance(_h_tag_text, str):
cleaned_h = _clean_text_content(_h_tag_text, is_hashtag=True)
if cleaned_h: # Add only if not empty after cleaning
processed_hashtags.append(cleaned_h)
# Ensure uniqueness of hashtags and limit their count (e.g., to first 10 unique)
# Using dict.fromkeys to preserve order while ensuring uniqueness
processed_hashtags = list(dict.fromkeys(processed_hashtags))[:10]
item_metadata = {
'name': display_name,
'title': cleaned_title,
'display_name': display_name,
'downloadable': downloadable,
'tags': processed_hashtags, # New field for storing the list of cleaned hashtags
'attributes': [],
}
if cleaned_artist:
item_metadata['artist'] = cleaned_artist
item_metadata['attributes'].append({
'trait_type': 'Artist',
'value': cleaned_artist,
})
# Generate description from the processed hashtags
item_metadata['description'] = ' '.join([f"#{h}" for h in processed_hashtags if h])
if cover_url:
item_metadata['image'] = cover_url
# Clean authors
cleaned_authors = []
if authors and isinstance(authors, list):
for author_name in (authors or [])[:500]: # Limit number of authors
if isinstance(author_name, str):
# Apply general cleaning to author names
# This replaces the old logic: ''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1])
cleaned_author = _clean_text_content(author_name, is_hashtag=False)
if cleaned_author.strip(): # Ensure not empty
cleaned_authors.append(cleaned_author.strip()[:100]) # Limit length of each author name
item_metadata['authors'] = cleaned_authors
# Upload file
metadata_bin = json.dumps(item_metadata).encode()
metadata_hash = sha256(metadata_bin).digest()
metadata_hash_b58 = b58encode(metadata_hash).decode()
metadata_content, is_new = await create_new_content(
db_session, "local/content_bin", metadata_bin, filename="metadata.json",
meta={'content_type': 'application/json'},
)
return metadata_content