From 2476d3b3b6e1dc07776543bc1c752b4fd9339725 Mon Sep 17 00:00:00 2001 From: user Date: Sat, 26 Apr 2025 13:01:10 +0300 Subject: [PATCH] fix someth --- app/core/background/convert_service.py | 298 +++++++++++++++---------- requirements.txt | 1 + 2 files changed, 183 insertions(+), 116 deletions(-) diff --git a/app/core/background/convert_service.py b/app/core/background/convert_service.py index c49bab8..3c03f79 100644 --- a/app/core/background/convert_service.py +++ b/app/core/background/convert_service.py @@ -4,24 +4,24 @@ import os import uuid import json import shutil -import magic # python-magic +import magic # python-magic for MIME detection from base58 import b58decode, b58encode from sqlalchemy import and_, or_ from app.core.models.node_storage import StoredContent from app.core.models._telegram import Wrapped_CBotChat from app.core._utils.send_status import send_status from app.core.logger import make_log +from app.core.models.user import User from app.core.models import WalletConnection from app.core.storage import db_session from app.core._config import UPLOADS_DIR from app.core.content.content_id import ContentId -import converter_module # наш модуль для конвертации async def convert_loop(memory): with db_session() as session: - # 1) Найти несанкционированный контент - item = session.query(StoredContent).filter( + # Query for unprocessed encrypted content + unprocessed_encrypted_content = session.query(StoredContent).filter( and_( StoredContent.type == "onchain/content", or_( @@ -30,50 +30,51 @@ async def convert_loop(memory): ) ) ).first() - if not item: + if not unprocessed_encrypted_content: make_log("ConvertProcess", "No content to convert", level="debug") return - # 2) Достать расшифрованный файл - decrypted = session.query(StoredContent).filter( - StoredContent.id == item.decrypted_content_id + # Достаем расшифрованный файл + decrypted_content = session.query(StoredContent).filter( + StoredContent.id == unprocessed_encrypted_content.decrypted_content_id ).first() - if not decrypted: + if not decrypted_content: make_log("ConvertProcess", "Decrypted content not found", level="error") return - input_path = f"/Storage/storedContent/{decrypted.hash}" - filename = item.filename - ext = filename.split('.')[-1] if '.' in filename else "" + # Определяем путь и расширение входного файла + input_file_path = f"/Storage/storedContent/{decrypted_content.hash}" + input_ext = (unprocessed_encrypted_content.filename.split('.')[-1] + if '.' in unprocessed_encrypted_content.filename else "mp4") - # 3) Определяем MIME-тип через python-magic + # ==== Новая логика: определение MIME-тип через python-magic ==== try: - mime = magic.from_file(input_path, mime=True) + mime_type = magic.from_file(input_file_path, mime=True) except Exception as e: make_log("ConvertProcess", f"magic probe failed: {e}", level="warning") - mime = "" + mime_type = "" - if mime.startswith("video/"): - kind = "video" - elif mime.startswith("audio/"): - kind = "audio" + if mime_type.startswith("video/"): + content_kind = "video" + elif mime_type.startswith("audio/"): + content_kind = "audio" else: - kind = "other" + content_kind = "other" - make_log("ConvertProcess", f"Detected kind={kind}, mime={mime}", level="info") + make_log("ConvertProcess", f"Detected content_kind={content_kind}, mime={mime_type}", level="info") - # 4) Если не видео и не аудио — сохраняем raw copy и выходим - if kind == "other": - raw_hash = item.hash - raw = StoredContent( + # Для прочих типов сохраняем raw копию и выходим + if content_kind == "other": + raw_hash = unprocessed_encrypted_content.hash + raw_content = StoredContent( type="local/content_raw", hash=raw_hash, - user_id=item.user_id, - filename=filename, + user_id=unprocessed_encrypted_content.user_id, + filename=unprocessed_encrypted_content.filename, meta={'source': 'raw_copy'}, created=datetime.now(), ) - session.add(raw) + session.add(raw_content) session.commit() # Копируем файл в UPLOADS_DIR @@ -82,127 +83,192 @@ async def convert_loop(memory): os.remove(dst) except FileNotFoundError: pass - shutil.copy2(input_path, dst) + shutil.copy2(input_file_path, dst) # Обновляем оригинальный объект - item.btfs_cid = ContentId(version=2, content_hash=b58decode(raw_hash)).serialize_v2() - item.ipfs_cid = ContentId(version=2, content_hash=b58decode(raw_hash)).serialize_v2() - item.meta = {**item.meta, 'converted_content': {'raw': raw_hash}} + unprocessed_encrypted_content.btfs_cid = ContentId( + version=2, content_hash=b58decode(raw_hash) + ).serialize_v2() + unprocessed_encrypted_content.ipfs_cid = ContentId( + version=2, content_hash=b58decode(raw_hash) + ).serialize_v2() + unprocessed_encrypted_content.meta = { + **unprocessed_encrypted_content.meta, + 'converted_content': {'raw': raw_hash} + } session.commit() - make_log("ConvertProcess", f"Raw content saved and CIDs set for {raw_hash}", level="info") + make_log("ConvertProcess", f"Raw content saved for {raw_hash}", level="info") return - # 5) Задаём опции конвертации для видео/аудио - if kind == "video": - options = ['high', 'low', 'low_preview'] - else: # audio - options = ['high_audio', 'low_audio'] - - # Preview interval + # ==== Конвертация для видео или аудио: оригинальная логика ==== + # Static preview interval in seconds preview_interval = [0, 30] - if item.onchain_index == 2: + if unprocessed_encrypted_content.onchain_index in [2]: preview_interval = [0, 60] - converted = {} - for opt in options: - # quality и trim - if opt.endswith("_preview"): - quality = opt.replace("_preview", "") - trim = preview_interval + make_log( + "ConvertProcess", + f"Processing content {unprocessed_encrypted_content.id} as {content_kind} with preview interval {preview_interval}", + level="info" + ) + + # Выбираем опции конвертации для видео и аудио + if content_kind == "video": + REQUIRED_CONVERT_OPTIONS = ['high', 'low', 'low_preview'] + else: + REQUIRED_CONVERT_OPTIONS = ['high', 'low'] # no preview for audio + + converted_content = {} + logs_dir = "/Storage/logs/converter" + + for option in REQUIRED_CONVERT_OPTIONS: + # Set quality parameter and trim option (only for preview) + if option == "low_preview": + quality = "low" + trim_value = f"{preview_interval[0]}-{preview_interval[1]}" else: - quality = opt.replace("_audio", "") - trim = None + quality = option + trim_value = None - # уникальная папка вывода - uid = str(uuid.uuid4()) - out_dir = f"/Storage/storedContent/converter-output/{uid}" - os.makedirs(out_dir, exist_ok=True) + # Generate a unique output directory for docker container + output_uuid = str(uuid.uuid4()) + output_dir = f"/Storage/storedContent/converter-output/{output_uuid}" - # ==== Вызов конвертера ==== - # converter_module.convert(input_path, out_dir, ext=ext, - # quality=quality, trim=trim, audio_only=(kind=="audio")) - # Здесь предполагаем, что convert возвращает имя файла результата. - try: - result_fname = await converter_module.convert( - input=input_path, - output_dir=out_dir, - ext=ext, - quality=quality, - trim=trim, - audio_only=(kind == "audio") - ) - except Exception as e: - make_log("ConvertProcess", f"Conversion failed {opt}: {e}", level="error") - return + # Build the docker command + cmd = [ + "docker", "run", "--rm", + "-v", f"{input_file_path}:/app/input", + "-v", f"{output_dir}:/app/output", + "-v", f"{logs_dir}:/app/logs", + "media_converter", + "--ext", input_ext, + "--quality", quality + ] + if trim_value: + cmd.extend(["--trim", trim_value]) + if content_kind == "audio": + cmd.append("--audio-only") # audio-only flag - out_file = os.path.join(out_dir, result_fname) - - # 6) Считаем sha256 и b58 - proc = await asyncio.create_subprocess_exec( - "sha256sum", out_file, + process = await asyncio.create_subprocess_exec( + *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) - so, se = await proc.communicate() - if proc.returncode != 0: - make_log("ConvertProcess", f"sha256sum error: {se.decode()}", level="error") + stdout, stderr = await process.communicate() + if process.returncode != 0: + make_log("ConvertProcess", f"Docker conversion failed for option {option}: {stderr.decode()}", level="error") return - sha_hex = so.decode().split()[0] - h58 = b58encode(bytes.fromhex(sha_hex)).decode() - # 7) Добавляем StoredContent, если нужно - if not session.query(StoredContent).filter(StoredContent.hash == h58).first(): - new = StoredContent( + # List files in output dir + try: + files = os.listdir(output_dir.replace("/Storage/storedContent", "/app/data")) + except Exception as e: + make_log("ConvertProcess", f"Error reading output directory {output_dir}: {e}", level="error") + return + + media_files = [f for f in files if f != "output.json"] + if len(media_files) != 1: + make_log("ConvertProcess", f"Expected one media file, found {len(media_files)} for option {option}", level="error") + return + + output_file = os.path.join( + output_dir.replace("/Storage/storedContent", "/app/data"), + media_files[0] + ) + + # Compute SHA256 hash of the output file + hash_process = await asyncio.create_subprocess_exec( + "sha256sum", output_file, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + hash_stdout, hash_stderr = await hash_process.communicate() + if hash_process.returncode != 0: + make_log("ConvertProcess", f"Error computing sha256sum for option {option}: {hash_stderr.decode()}", level="error") + return + file_hash = hash_stdout.decode().split()[0] + file_hash = b58encode(bytes.fromhex(file_hash)).decode() + + # Save new StoredContent if not exists + if not session.query(StoredContent).filter( + StoredContent.hash == file_hash + ).first(): + new_content = StoredContent( type="local/content_bin", - hash=h58, - user_id=item.user_id, - filename=result_fname, - meta={'encrypted_file_hash': item.hash}, + hash=file_hash, + user_id=unprocessed_encrypted_content.user_id, + filename=media_files[0], + meta={'encrypted_file_hash': unprocessed_encrypted_content.hash}, created=datetime.now(), ) - session.add(new) + session.add(new_content) session.commit() - # 8) Перемещаем в UPLOADS_DIR - dst = os.path.join(UPLOADS_DIR, h58) + save_path = os.path.join(UPLOADS_DIR, file_hash) try: - os.remove(dst) + os.remove(save_path) except FileNotFoundError: pass - shutil.move(out_file, dst) - converted[opt] = h58 + try: + shutil.move(output_file, save_path) + except Exception as e: + make_log("ConvertProcess", f"Error moving output file {output_file} to {save_path}: {e}", level="error") + return - # 9) optional: metadata from converter_module - meta_info = converter_module.get_meta(out_dir) # допустим, так - if meta_info and 'ffprobe_meta' not in item.meta: - item.meta['ffprobe_meta'] = meta_info + converted_content[option] = file_hash - # 10) очистка - shutil.rmtree(out_dir, ignore_errors=True) + # Process output.json for ffprobe_meta + output_json_path = os.path.join( + output_dir.replace("/Storage/storedContent", "/app/data"), + "output.json" + ) + if os.path.exists(output_json_path) and unprocessed_encrypted_content.meta.get('ffprobe_meta') is None: + try: + with open(output_json_path, "r") as f: + ffprobe_meta = json.load(f) + unprocessed_encrypted_content.meta = { + **unprocessed_encrypted_content.meta, + 'ffprobe_meta': ffprobe_meta + } + except Exception as e: + make_log("ConvertProcess", f"Error handling output.json for option {option}: {e}", level="error") - # 11) Обновляем оригинальный объект после всех опций - make_log("ConvertProcess", f"Converted: {converted}", level="info") - main_high = 'high' if kind=='video' else 'high_audio' - main_low = 'low' if kind=='video' else 'low_audio' - item.btfs_cid = ContentId(version=2, - content_hash=b58decode(converted[main_high])).serialize_v2() - item.ipfs_cid = ContentId(version=2, - content_hash=b58decode(converted[main_low])).serialize_v2() - item.meta = {**item.meta, 'converted_content': converted} + # Cleanup output directory + try: + shutil.rmtree(output_dir.replace("/Storage/storedContent", "/app/data")) + except Exception as e: + make_log("ConvertProcess", f"Error removing output dir {output_dir}: {e}", level="warning") + + # Finalize original record + make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Converted content: {converted_content}", level="info") + unprocessed_encrypted_content.btfs_cid = ContentId( + version=2, content_hash=b58decode(converted_content['high' if content_kind=='video' else 'low']) + ).serialize_v2() + unprocessed_encrypted_content.ipfs_cid = ContentId( + version=2, content_hash=b58decode(converted_content['low']) + ).serialize_v2() + unprocessed_encrypted_content.meta = { + **unprocessed_encrypted_content.meta, + 'converted_content': converted_content + } session.commit() - # 12) Отправляем уведомление пользователю - if not item.meta.get('upload_notify_msg_id'): - wc = session.query(WalletConnection).filter( - WalletConnection.wallet_address == item.owner_address + # Notify user if needed + if not unprocessed_encrypted_content.meta.get('upload_notify_msg_id'): + wallet_owner_connection = session.query(WalletConnection).filter( + WalletConnection.wallet_address == unprocessed_encrypted_content.owner_address ).order_by(WalletConnection.id.desc()).first() - if wc: - bot = Wrapped_CBotChat(memory._client_telegram_bot, - chat_id=wc.user.telegram_id, - user=wc.user, db_session=session) - item.meta['upload_notify_msg_id'] = await bot.send_content(session, item) + if wallet_owner_connection: + wallet_owner_user = wallet_owner_connection.user + bot = Wrapped_CBotChat( + memory._client_telegram_bot, + chat_id=wallet_owner_user.telegram_id, + user=wallet_owner_user, + db_session=session + ) + unprocessed_encrypted_content.meta['upload_notify_msg_id'] = await bot.send_content(session, unprocessed_encrypted_content) session.commit() diff --git a/requirements.txt b/requirements.txt index bdb4d1d..08aa8aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,3 +17,4 @@ pillow==10.2.0 ffmpeg-python==0.2.0 python-magic==0.4.27 +