import asyncio from datetime import datetime import os import uuid import json import shutil import magic # python-magic for MIME detection from base58 import b58decode, b58encode from sqlalchemy import and_, or_, select from app.core.models.node_storage import StoredContent from app.core.models._telegram import Wrapped_CBotChat from app.core._utils.send_status import send_status from app.core.logger import make_log from app.core.models.user import User from app.core.models import WalletConnection from app.core.storage import db_session from app.core._config import UPLOADS_DIR, BACKEND_DATA_DIR_HOST, BACKEND_LOGS_DIR_HOST from app.core.content.content_id import ContentId async def convert_loop(memory): async with db_session() as session: # Query for unprocessed encrypted content unprocessed_encrypted_content = (await session.execute(select(StoredContent).where( and_( StoredContent.type == "onchain/content", or_( StoredContent.btfs_cid == None, StoredContent.ipfs_cid == None, ) ) ))).scalars().first() if not unprocessed_encrypted_content: make_log("ConvertProcess", "No content to convert", level="debug") return # Достаем расшифрованный файл decrypted_content = (await session.execute(select(StoredContent).where( StoredContent.id == unprocessed_encrypted_content.decrypted_content_id ))).scalars().first() if not decrypted_content: make_log("ConvertProcess", "Decrypted content not found", level="error") return # Определяем путь и расширение входного файла # Путь внутри текущего контейнера (доступен Python процессу) input_file_container = os.path.join(UPLOADS_DIR, decrypted_content.hash) # Хостовый путь (нужен для docker -v маппинга при запуске конвертера) input_file_host = os.path.join(BACKEND_DATA_DIR_HOST, decrypted_content.hash) input_ext = (unprocessed_encrypted_content.filename.split('.')[-1] if '.' in unprocessed_encrypted_content.filename else "mp4") # ==== Новая логика: определение MIME-тип через python-magic ==== try: mime_type = magic.from_file(input_file_container, mime=True) except Exception as e: make_log("ConvertProcess", f"magic probe failed: {e}", level="warning") mime_type = "" if mime_type.startswith("video/"): content_kind = "video" elif mime_type.startswith("audio/"): content_kind = "audio" else: content_kind = "other" make_log("ConvertProcess", f"Detected content_kind={content_kind}, mime={mime_type}", level="info") # Для прочих типов сохраняем raw копию и выходим if content_kind == "other": make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Not audio/video, copy just", level="info") unprocessed_encrypted_content.btfs_cid = ContentId( version=2, content_hash=b58decode(decrypted_content.hash) ).serialize_v2() unprocessed_encrypted_content.ipfs_cid = ContentId( version=2, content_hash=b58decode(decrypted_content.hash) ).serialize_v2() unprocessed_encrypted_content.meta = { **unprocessed_encrypted_content.meta, 'converted_content': { option_name: decrypted_content.hash for option_name in ['high', 'low', 'low_preview'] } } await session.commit() return # ==== Конвертация для видео или аудио: оригинальная логика ==== # Static preview interval in seconds preview_interval = [0, 30] if unprocessed_encrypted_content.onchain_index in [2]: preview_interval = [0, 60] make_log( "ConvertProcess", f"Processing content {unprocessed_encrypted_content.id} as {content_kind} with preview interval {preview_interval}", level="info" ) # Выбираем опции конвертации для видео и аудио if content_kind == "video": REQUIRED_CONVERT_OPTIONS = ['high', 'low', 'low_preview'] else: REQUIRED_CONVERT_OPTIONS = ['high', 'low'] # no preview for audio converted_content = {} # Директория логов на хосте для docker-контейнера конвертера logs_dir_host = BACKEND_LOGS_DIR_HOST for option in REQUIRED_CONVERT_OPTIONS: # Set quality parameter and trim option (only for preview) if option == "low_preview": quality = "low" trim_value = f"{preview_interval[0]}-{preview_interval[1]}" else: quality = option trim_value = None # Generate a unique output directory for docker container output_uuid = str(uuid.uuid4()) # Директория вывода в текущем контейнере (та же что и в UPLOADS_DIR, смонтирована с хоста) output_dir_container = os.path.join(UPLOADS_DIR, "converter-output", output_uuid) os.makedirs(output_dir_container, exist_ok=True) # Соответствующая директория на хосте — нужна для docker -v output_dir_host = os.path.join(BACKEND_DATA_DIR_HOST, "converter-output", output_uuid) # Build the docker command cmd = [ "docker", "run", "--rm", # Важно: источники - это ХОСТОВЫЕ пути, так как docker демону они нужны на хосте "-v", f"{input_file_host}:/app/input:ro", "-v", f"{output_dir_host}:/app/output", "-v", f"{logs_dir_host}:/app/logs", "media_converter", "--ext", input_ext, "--quality", quality ] if trim_value: cmd.extend(["--trim", trim_value]) # converter auto-detects audio/video, no explicit flag required process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: make_log("ConvertProcess", f"Docker conversion failed for option {option}: {stderr.decode()}", level="error") return # List files in output dir try: files = os.listdir(output_dir_container) except Exception as e: make_log("ConvertProcess", f"Error reading output directory {output_dir}: {e}", level="error") return media_files = [f for f in files if f != "output.json"] if len(media_files) != 1: make_log("ConvertProcess", f"Expected one media file, found {len(media_files)} for option {option}", level="error") return output_file = os.path.join(output_dir_container, media_files[0]) # Compute SHA256 hash of the output file hash_process = await asyncio.create_subprocess_exec( "sha256sum", output_file, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) hash_stdout, hash_stderr = await hash_process.communicate() if hash_process.returncode != 0: make_log("ConvertProcess", f"Error computing sha256sum for option {option}: {hash_stderr.decode()}", level="error") return file_hash = hash_stdout.decode().split()[0] file_hash = b58encode(bytes.fromhex(file_hash)).decode() # Save new StoredContent if not exists if not (await session.execute(select(StoredContent).where(StoredContent.hash == file_hash))).scalars().first(): new_content = StoredContent( type="local/content_bin", hash=file_hash, user_id=unprocessed_encrypted_content.user_id, filename=media_files[0], meta={'encrypted_file_hash': unprocessed_encrypted_content.hash}, created=datetime.now(), ) session.add(new_content) await session.commit() save_path = os.path.join(UPLOADS_DIR, file_hash) try: os.remove(save_path) except FileNotFoundError: pass try: shutil.move(output_file, save_path) except Exception as e: make_log("ConvertProcess", f"Error moving output file {output_file} to {save_path}: {e}", level="error") return converted_content[option] = file_hash # Process output.json for ffprobe_meta output_json_path = os.path.join(output_dir_container, "output.json") if os.path.exists(output_json_path) and unprocessed_encrypted_content.meta.get('ffprobe_meta') is None: try: with open(output_json_path, "r") as f: ffprobe_meta = json.load(f) unprocessed_encrypted_content.meta = { **unprocessed_encrypted_content.meta, 'ffprobe_meta': ffprobe_meta } except Exception as e: make_log("ConvertProcess", f"Error handling output.json for option {option}: {e}", level="error") # Cleanup output directory try: shutil.rmtree(output_dir_container) except Exception as e: make_log("ConvertProcess", f"Error removing output dir {output_dir}: {e}", level="warning") # Finalize original record make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Converted content: {converted_content}", level="info") unprocessed_encrypted_content.btfs_cid = ContentId( version=2, content_hash=b58decode(converted_content['high' if content_kind=='video' else 'low']) ).serialize_v2() unprocessed_encrypted_content.ipfs_cid = ContentId( version=2, content_hash=b58decode(converted_content['low']) ).serialize_v2() unprocessed_encrypted_content.meta = { **unprocessed_encrypted_content.meta, 'converted_content': converted_content } await session.commit() # Notify user if needed if not unprocessed_encrypted_content.meta.get('upload_notify_msg_id'): wallet_owner_connection = (await session.execute(select(WalletConnection).where( WalletConnection.wallet_address == unprocessed_encrypted_content.owner_address ).order_by(WalletConnection.id.desc()))).scalars().first() if wallet_owner_connection: wallet_owner_user = wallet_owner_connection.user bot = Wrapped_CBotChat( memory._client_telegram_bot, chat_id=wallet_owner_user.telegram_id, user=wallet_owner_user, db_session=session ) unprocessed_encrypted_content.meta['upload_notify_msg_id'] = await bot.send_content(session, unprocessed_encrypted_content) await session.commit() async def main_fn(memory): make_log("ConvertProcess", "Service started", level="info") seqno = 0 while True: try: rid = __import__('uuid').uuid4().hex[:8] try: from app.core.log_context import ctx_rid ctx_rid.set(rid) except BaseException: pass make_log("ConvertProcess", "Service running", level="debug", rid=rid) await convert_loop(memory) await asyncio.sleep(5) await send_status("convert_service", f"working (seqno={seqno})") seqno += 1 except BaseException as e: make_log("ConvertProcess", f"Error: {e}", level="error", rid=locals().get('rid')) await asyncio.sleep(3) finally: try: from app.core.log_context import ctx_rid ctx_rid.set(None) except BaseException: pass