From 590afd2475d5822b3e5cdff5f81ca41f183020ff Mon Sep 17 00:00:00 2001 From: user Date: Sun, 1 Jun 2025 11:58:51 +0300 Subject: [PATCH] text pre-filtering --- app/api/routes/_blockchain.py | 2 +- app/core/content/utils.py | 107 +++++++++++++++++++++++++++++++--- 2 files changed, 100 insertions(+), 9 deletions(-) diff --git a/app/api/routes/_blockchain.py b/app/api/routes/_blockchain.py index 48232ef..a6a47aa 100644 --- a/app/api/routes/_blockchain.py +++ b/app/api/routes/_blockchain.py @@ -83,7 +83,7 @@ async def s_api_v1_blockchain_send_new_content_message(request): image_content = None - content_title = f"{', '.join(request.json['authors'])} - {request.json['title']}" if request.json['authors'] else request.json['title'] + content_title = f"{', '.join(request.json['authors'])} – {request.json['title']}" if request.json['authors'] else request.json['title'] metadata_content = await create_metadata_for_item( request.ctx.db_session, diff --git a/app/core/content/utils.py b/app/core/content/utils.py index 53ad875..a607d3d 100644 --- a/app/core/content/utils.py +++ b/app/core/content/utils.py @@ -4,6 +4,8 @@ import os import string import aiofiles from hashlib import sha256 +import re # Added import +import unicodedata # Added import from base58 import b58encode from datetime import datetime, timedelta @@ -46,6 +48,67 @@ async def create_new_content( return new_content, True +# New helper functions for string cleaning +def _remove_emojis(text: str) -> str: + """Removes common emoji characters from a string.""" + # This regex covers many common emojis but might not be exhaustive. + emoji_pattern = re.compile( + "[" + "\U0001F600-\U0001F64F" # emoticons + "\U0001F300-\U0001F5FF" # symbols & pictographs + "\U0001F680-\U0001F6FF" # transport & map symbols + "\U0001F1E0-\U0001F1FF" # flags (iOS) + "\U00002702-\U000027B0" # Dingbats + "\U000024C2-\U0001F251" # Various symbols + "]+", + flags=re.UNICODE, + ) + return emoji_pattern.sub(r'', text) + +def _clean_text_content(text: str, is_hashtag: bool = False) -> str: + """ + Cleans a string by removing emojis and unusual characters. + Level 1: Emoji removal. + Level 2: Unusual character cleaning (specific logic for hashtags). + """ + if not isinstance(text, str): + return "" + + # Level 1: Remove emojis + text_no_emojis = _remove_emojis(text) + + # Level 2: Clean unusual characters + if is_hashtag: + # Convert to lowercase + processed_text = text_no_emojis.lower() + # Replace hyphens, dots, spaces (and sequences) with a single underscore + processed_text = re.sub(r'[\s.-]+', '_', processed_text) + # Keep only lowercase letters (a-z), digits (0-9), and underscores + cleaned_text = re.sub(r'[^a-z0-9_]', '', processed_text) + # Remove leading/trailing underscores + cleaned_text = cleaned_text.strip('_') + # Consolidate multiple underscores into one + cleaned_text = re.sub(r'_+', '_', cleaned_text) + return cleaned_text + else: # For title, authors, or general text + # Normalize Unicode characters (e.g., NFKD form) + nfkd_form = unicodedata.normalize('NFKD', text_no_emojis) + # Keep letters (Unicode), numbers (Unicode), spaces, and basic punctuation + # This allows for a wider range of characters suitable for titles/names. + cleaned_text_chars = [] + for char_in_nfkd in nfkd_form: + if not unicodedata.combining(char_in_nfkd): # remove combining diacritics + # Keep letters, numbers, spaces, and specific punctuation + cat = unicodedata.category(char_in_nfkd) + if cat.startswith('L') or cat.startswith('N') or cat.startswith('Z') or char_in_nfkd in '.,!?-': + cleaned_text_chars.append(char_in_nfkd) + + cleaned_text = "".join(cleaned_text_chars) + # Normalize multiple spaces to a single space and strip leading/trailing spaces + cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip() + return cleaned_text + + async def create_metadata_for_item( db_session, title: str = None, @@ -55,13 +118,28 @@ async def create_metadata_for_item( downloadable: bool = False, ) -> StoredContent: assert title, "No title provided" - # assert cover_url, "No cover_url provided" - assert len(title) > 3, "Title too short" - title = title[:100].strip() + # assert cover_url, "No cover_url provided" # Original comment, kept as is + + # Clean title using the new helper function + cleaned_title = _clean_text_content(title, is_hashtag=False) + cleaned_title = cleaned_title[:100].strip() # Truncate and strip after cleaning + assert len(cleaned_title) > 3, f"Cleaned title '{cleaned_title}' (from original '{title}') is too short or became empty after cleaning." + + # Process and clean hashtags + processed_hashtags = [] + if hashtags and isinstance(hashtags, list): + for _h_tag_text in hashtags: + if isinstance(_h_tag_text, str): + cleaned_h = _clean_text_content(_h_tag_text, is_hashtag=True) + if cleaned_h: # Add only if not empty after cleaning + processed_hashtags.append(cleaned_h) + + # Ensure uniqueness of hashtags and limit their count (e.g., to first 10 unique) + # Using dict.fromkeys to preserve order while ensuring uniqueness + processed_hashtags = list(dict.fromkeys(processed_hashtags))[:10] item_metadata = { - 'name': title, - 'description': ' '.join([f"#{''.join([(_h_ch if (_h_ch.lower() in string.ascii_lowercase) else '_') for _h_ch in _h])}" for _h in hashtags]), + 'name': cleaned_title, 'attributes': [ # { # 'trait_type': 'Artist', @@ -69,13 +147,26 @@ async def create_metadata_for_item( # }, ], 'downloadable': downloadable, + 'tags': processed_hashtags, # New field for storing the list of cleaned hashtags } + + # Generate description from the processed hashtags + item_metadata['description'] = ' '.join([f"#{h}" for h in processed_hashtags if h]) + if cover_url: item_metadata['image'] = cover_url - item_metadata['authors'] = [ - ''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1]) for _a in (authors or [])[:500] - ] + # Clean authors + cleaned_authors = [] + if authors and isinstance(authors, list): + for author_name in (authors or [])[:500]: # Limit number of authors + if isinstance(author_name, str): + # Apply general cleaning to author names + # This replaces the old logic: ''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1]) + cleaned_author = _clean_text_content(author_name, is_hashtag=False) + if cleaned_author.strip(): # Ensure not empty + cleaned_authors.append(cleaned_author.strip()[:100]) # Limit length of each author name + item_metadata['authors'] = cleaned_authors # Upload file metadata_bin = json.dumps(item_metadata).encode()