text pre-filtering

This commit is contained in:
user 2025-06-01 11:58:51 +03:00
parent a266c8b710
commit 590afd2475
2 changed files with 100 additions and 9 deletions

View File

@ -83,7 +83,7 @@ async def s_api_v1_blockchain_send_new_content_message(request):
image_content = None
content_title = f"{', '.join(request.json['authors'])} - {request.json['title']}" if request.json['authors'] else request.json['title']
content_title = f"{', '.join(request.json['authors'])} {request.json['title']}" if request.json['authors'] else request.json['title']
metadata_content = await create_metadata_for_item(
request.ctx.db_session,

View File

@ -4,6 +4,8 @@ import os
import string
import aiofiles
from hashlib import sha256
import re # Added import
import unicodedata # Added import
from base58 import b58encode
from datetime import datetime, timedelta
@ -46,6 +48,67 @@ async def create_new_content(
return new_content, True
# New helper functions for string cleaning
def _remove_emojis(text: str) -> str:
"""Removes common emoji characters from a string."""
# This regex covers many common emojis but might not be exhaustive.
emoji_pattern = re.compile(
"["
"\U0001F600-\U0001F64F" # emoticons
"\U0001F300-\U0001F5FF" # symbols & pictographs
"\U0001F680-\U0001F6FF" # transport & map symbols
"\U0001F1E0-\U0001F1FF" # flags (iOS)
"\U00002702-\U000027B0" # Dingbats
"\U000024C2-\U0001F251" # Various symbols
"]+",
flags=re.UNICODE,
)
return emoji_pattern.sub(r'', text)
def _clean_text_content(text: str, is_hashtag: bool = False) -> str:
"""
Cleans a string by removing emojis and unusual characters.
Level 1: Emoji removal.
Level 2: Unusual character cleaning (specific logic for hashtags).
"""
if not isinstance(text, str):
return ""
# Level 1: Remove emojis
text_no_emojis = _remove_emojis(text)
# Level 2: Clean unusual characters
if is_hashtag:
# Convert to lowercase
processed_text = text_no_emojis.lower()
# Replace hyphens, dots, spaces (and sequences) with a single underscore
processed_text = re.sub(r'[\s.-]+', '_', processed_text)
# Keep only lowercase letters (a-z), digits (0-9), and underscores
cleaned_text = re.sub(r'[^a-z0-9_]', '', processed_text)
# Remove leading/trailing underscores
cleaned_text = cleaned_text.strip('_')
# Consolidate multiple underscores into one
cleaned_text = re.sub(r'_+', '_', cleaned_text)
return cleaned_text
else: # For title, authors, or general text
# Normalize Unicode characters (e.g., NFKD form)
nfkd_form = unicodedata.normalize('NFKD', text_no_emojis)
# Keep letters (Unicode), numbers (Unicode), spaces, and basic punctuation
# This allows for a wider range of characters suitable for titles/names.
cleaned_text_chars = []
for char_in_nfkd in nfkd_form:
if not unicodedata.combining(char_in_nfkd): # remove combining diacritics
# Keep letters, numbers, spaces, and specific punctuation
cat = unicodedata.category(char_in_nfkd)
if cat.startswith('L') or cat.startswith('N') or cat.startswith('Z') or char_in_nfkd in '.,!?-':
cleaned_text_chars.append(char_in_nfkd)
cleaned_text = "".join(cleaned_text_chars)
# Normalize multiple spaces to a single space and strip leading/trailing spaces
cleaned_text = re.sub(r'\s+', ' ', cleaned_text).strip()
return cleaned_text
async def create_metadata_for_item(
db_session,
title: str = None,
@ -55,13 +118,28 @@ async def create_metadata_for_item(
downloadable: bool = False,
) -> StoredContent:
assert title, "No title provided"
# assert cover_url, "No cover_url provided"
assert len(title) > 3, "Title too short"
title = title[:100].strip()
# assert cover_url, "No cover_url provided" # Original comment, kept as is
# Clean title using the new helper function
cleaned_title = _clean_text_content(title, is_hashtag=False)
cleaned_title = cleaned_title[:100].strip() # Truncate and strip after cleaning
assert len(cleaned_title) > 3, f"Cleaned title '{cleaned_title}' (from original '{title}') is too short or became empty after cleaning."
# Process and clean hashtags
processed_hashtags = []
if hashtags and isinstance(hashtags, list):
for _h_tag_text in hashtags:
if isinstance(_h_tag_text, str):
cleaned_h = _clean_text_content(_h_tag_text, is_hashtag=True)
if cleaned_h: # Add only if not empty after cleaning
processed_hashtags.append(cleaned_h)
# Ensure uniqueness of hashtags and limit their count (e.g., to first 10 unique)
# Using dict.fromkeys to preserve order while ensuring uniqueness
processed_hashtags = list(dict.fromkeys(processed_hashtags))[:10]
item_metadata = {
'name': title,
'description': ' '.join([f"#{''.join([(_h_ch if (_h_ch.lower() in string.ascii_lowercase) else '_') for _h_ch in _h])}" for _h in hashtags]),
'name': cleaned_title,
'attributes': [
# {
# 'trait_type': 'Artist',
@ -69,13 +147,26 @@ async def create_metadata_for_item(
# },
],
'downloadable': downloadable,
'tags': processed_hashtags, # New field for storing the list of cleaned hashtags
}
# Generate description from the processed hashtags
item_metadata['description'] = ' '.join([f"#{h}" for h in processed_hashtags if h])
if cover_url:
item_metadata['image'] = cover_url
item_metadata['authors'] = [
''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1]) for _a in (authors or [])[:500]
]
# Clean authors
cleaned_authors = []
if authors and isinstance(authors, list):
for author_name in (authors or [])[:500]: # Limit number of authors
if isinstance(author_name, str):
# Apply general cleaning to author names
# This replaces the old logic: ''.join([_a_ch for _a_ch in _a if len(_a_ch.encode()) == 1])
cleaned_author = _clean_text_content(author_name, is_hashtag=False)
if cleaned_author.strip(): # Ensure not empty
cleaned_authors.append(cleaned_author.strip()[:100]) # Limit length of each author name
item_metadata['authors'] = cleaned_authors
# Upload file
metadata_bin = json.dumps(item_metadata).encode()