From 0e4268fb4dc52f28a79d8be2f7a5b1ddaf66c92d Mon Sep 17 00:00:00 2001 From: user Date: Sat, 26 Apr 2025 12:52:32 +0300 Subject: [PATCH] python magic for images --- Dockerfile | 2 + app/core/background/convert_service.py | 319 ++++++++++++------------- requirements.txt | 1 + 3 files changed, 158 insertions(+), 164 deletions(-) diff --git a/Dockerfile b/Dockerfile index 51a30bd..d83375e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -21,5 +21,7 @@ RUN apt-get update && apt-get install -y \ apt-get update && \ apt-get install -y docker-ce-cli +RUN apt-get install libmagic1 -y + CMD ["python", "app"] diff --git a/app/core/background/convert_service.py b/app/core/background/convert_service.py index bd0df39..c49bab8 100644 --- a/app/core/background/convert_service.py +++ b/app/core/background/convert_service.py @@ -4,23 +4,24 @@ import os import uuid import json import shutil +import magic # python-magic from base58 import b58decode, b58encode from sqlalchemy import and_, or_ from app.core.models.node_storage import StoredContent from app.core.models._telegram import Wrapped_CBotChat from app.core._utils.send_status import send_status from app.core.logger import make_log -from app.core.models.user import User from app.core.models import WalletConnection from app.core.storage import db_session from app.core._config import UPLOADS_DIR from app.core.content.content_id import ContentId +import converter_module # наш модуль для конвертации async def convert_loop(memory): with db_session() as session: - # Query for unprocessed encrypted content - unprocessed_encrypted_content = session.query(StoredContent).filter( + # 1) Найти несанкционированный контент + item = session.query(StoredContent).filter( and_( StoredContent.type == "onchain/content", or_( @@ -29,192 +30,182 @@ async def convert_loop(memory): ) ) ).first() - if not unprocessed_encrypted_content: + if not item: make_log("ConvertProcess", "No content to convert", level="debug") return - - # Static preview interval in seconds - preview_interval = [0, 30] - if unprocessed_encrypted_content.onchain_index in [2]: - preview_interval = [0, 60] - make_log("ConvertProcess", f"Processing content {unprocessed_encrypted_content.id} with preview interval {preview_interval}", level="info") - decrypted_content = session.query(StoredContent).filter( - StoredContent.id == unprocessed_encrypted_content.decrypted_content_id + # 2) Достать расшифрованный файл + decrypted = session.query(StoredContent).filter( + StoredContent.id == item.decrypted_content_id ).first() - if not decrypted_content: + if not decrypted: make_log("ConvertProcess", "Decrypted content not found", level="error") return + input_path = f"/Storage/storedContent/{decrypted.hash}" + filename = item.filename + ext = filename.split('.')[-1] if '.' in filename else "" - # List of conversion options to process - REQUIRED_CONVERT_OPTIONS = ['high', 'low', 'low_preview'] - converted_content = {} # Mapping: option -> sha256 hash of output file + # 3) Определяем MIME-тип через python-magic + try: + mime = magic.from_file(input_path, mime=True) + except Exception as e: + make_log("ConvertProcess", f"magic probe failed: {e}", level="warning") + mime = "" - # Define input file path and extract its extension from filename - input_file_path = f"/Storage/storedContent/{decrypted_content.hash}" + if mime.startswith("video/"): + kind = "video" + elif mime.startswith("audio/"): + kind = "audio" + else: + kind = "other" + make_log("ConvertProcess", f"Detected kind={kind}, mime={mime}", level="info") - input_ext = unprocessed_encrypted_content.filename.split('.')[-1] if '.' in unprocessed_encrypted_content.filename else "mp4" - - # Logs directory mapping - logs_dir = "/Storage/logs/converter" - - # Process each conversion option in sequence - for option in REQUIRED_CONVERT_OPTIONS: - # Set quality parameter and trim option (only for preview) - if option == "low_preview": - quality = "low" - trim_value = f"{preview_interval[0]}-{preview_interval[1]}" - else: - quality = option # 'high' or 'low' - trim_value = None - - # Generate a unique output directory for docker container - output_uuid = str(uuid.uuid4()) - output_dir = f"/Storage/storedContent/converter-output/{output_uuid}" - - # Build the docker command with appropriate volume mounts and parameters - cmd = [ - "docker", "run", "--rm", - "-v", f"{input_file_path}:/app/input", - "-v", f"{output_dir}:/app/output", - "-v", f"{logs_dir}:/app/logs", - "media_converter", - "--ext", input_ext, - "--quality", quality - ] - if trim_value: - cmd.extend(["--trim", trim_value]) - - # Run the docker container asynchronously - process = await asyncio.create_subprocess_exec( - *cmd, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE + # 4) Если не видео и не аудио — сохраняем raw copy и выходим + if kind == "other": + raw_hash = item.hash + raw = StoredContent( + type="local/content_raw", + hash=raw_hash, + user_id=item.user_id, + filename=filename, + meta={'source': 'raw_copy'}, + created=datetime.now(), ) - stdout, stderr = await process.communicate() - if process.returncode != 0: - make_log("ConvertProcess", f"Docker conversion failed for option {option}: {stderr.decode()}", level="error") - return + session.add(raw) + session.commit() - # List files in the output directory + # Копируем файл в UPLOADS_DIR + dst = os.path.join(UPLOADS_DIR, raw_hash) try: - files = os.listdir(output_dir.replace("/Storage/storedContent", "/app/data")) - except Exception as e: - make_log("ConvertProcess", f"Error reading output directory {output_dir}: {e}", level="error") - return - - # Exclude 'output.json' and expect exactly one media output file - media_files = [f for f in files if f != "output.json"] - if len(media_files) != 1: - make_log("ConvertProcess", f"Expected one media file, found {len(media_files)} for option {option}", level="error") - return - - output_file = os.path.join(output_dir.replace("/Storage/storedContent", "/app/data"), media_files[0]) - - # Compute SHA256 hash of the output file using async subprocess - hash_process = await asyncio.create_subprocess_exec( - "sha256sum", output_file, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE - ) - hash_stdout, hash_stderr = await hash_process.communicate() - if hash_process.returncode != 0: - make_log("ConvertProcess", f"Error computing sha256sum for option {option}: {hash_stderr.decode()}", level="error") - return - file_hash = hash_stdout.decode().split()[0] - file_hash = b58encode(bytes.fromhex(file_hash)).decode() - - if not session.query(StoredContent).filter( - StoredContent.hash == file_hash - ).first(): - new_content = StoredContent( - type="local/content_bin", - hash=file_hash, - user_id=unprocessed_encrypted_content.user_id, - filename=media_files[0], - meta={ - 'encrypted_file_hash': unprocessed_encrypted_content.hash, - }, - created=datetime.now(), - ) - session.add(new_content) - session.commit() - - save_path = os.path.join(UPLOADS_DIR, file_hash) - try: - os.remove(save_path) + os.remove(dst) except FileNotFoundError: pass + shutil.copy2(input_path, dst) + # Обновляем оригинальный объект + item.btfs_cid = ContentId(version=2, content_hash=b58decode(raw_hash)).serialize_v2() + item.ipfs_cid = ContentId(version=2, content_hash=b58decode(raw_hash)).serialize_v2() + item.meta = {**item.meta, 'converted_content': {'raw': raw_hash}} + session.commit() + + make_log("ConvertProcess", f"Raw content saved and CIDs set for {raw_hash}", level="info") + return + + # 5) Задаём опции конвертации для видео/аудио + if kind == "video": + options = ['high', 'low', 'low_preview'] + else: # audio + options = ['high_audio', 'low_audio'] + + # Preview interval + preview_interval = [0, 30] + if item.onchain_index == 2: + preview_interval = [0, 60] + + converted = {} + for opt in options: + # quality и trim + if opt.endswith("_preview"): + quality = opt.replace("_preview", "") + trim = preview_interval + else: + quality = opt.replace("_audio", "") + trim = None + + # уникальная папка вывода + uid = str(uuid.uuid4()) + out_dir = f"/Storage/storedContent/converter-output/{uid}" + os.makedirs(out_dir, exist_ok=True) + + # ==== Вызов конвертера ==== + # converter_module.convert(input_path, out_dir, ext=ext, + # quality=quality, trim=trim, audio_only=(kind=="audio")) + # Здесь предполагаем, что convert возвращает имя файла результата. try: - shutil.move(output_file, save_path) + result_fname = await converter_module.convert( + input=input_path, + output_dir=out_dir, + ext=ext, + quality=quality, + trim=trim, + audio_only=(kind == "audio") + ) except Exception as e: - make_log("ConvertProcess", f"Error moving output file {output_file} to {save_path}: {e}", level="error") + make_log("ConvertProcess", f"Conversion failed {opt}: {e}", level="error") return - converted_content[option] = file_hash + out_file = os.path.join(out_dir, result_fname) - # Process output.json: read its contents and update meta['ffprobe_meta'] - output_json_path = os.path.join(output_dir.replace("/Storage/storedContent", "/app/data"), "output.json") - if os.path.exists(output_json_path): - if unprocessed_encrypted_content.meta.get('ffprobe_meta') is None: - try: - with open(output_json_path, "r") as f: - output_json_content = f.read() - except Exception as e: - make_log("ConvertProcess", f"Error reading output.json for option {option}: {e}", level="error") - return - - try: - ffprobe_meta = json.loads(output_json_content) - except Exception as e: - make_log("ConvertProcess", f"Error parsing output.json for option {option}: {e}", level="error") - return - - unprocessed_encrypted_content.meta = { - **unprocessed_encrypted_content.meta, - 'ffprobe_meta': ffprobe_meta - } - else: - make_log("ConvertProcess", f"output.json not found for option {option}", level="error") - - # Remove the output directory after processing - try: - shutil.rmtree(output_dir.replace("/Storage/storedContent", "/app/data")) - except Exception as e: - make_log("ConvertProcess", f"Error removing output directory {output_dir}: {e}", level="error") - # Continue even if deletion fails - - make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Converted content: {converted_content}", level="info") - - unprocessed_encrypted_content.btfs_cid = ContentId( - version=2, content_hash=b58decode(converted_content['high']) - ).serialize_v2() - unprocessed_encrypted_content.ipfs_cid = ContentId( - version=2, content_hash=b58decode(converted_content['low']) - ).serialize_v2() - unprocessed_encrypted_content.meta = { - **unprocessed_encrypted_content.meta, - 'converted_content': converted_content - } - - session.commit() - if not unprocessed_encrypted_content.meta.get('upload_notify_msg_id'): - wallet_owner_connection = session.query(WalletConnection).filter( - WalletConnection.wallet_address == unprocessed_encrypted_content.owner_address - ).order_by(WalletConnection.id.desc()).first() - if wallet_owner_connection: - wallet_owner_user = wallet_owner_connection.user - wallet_owner_bot = Wrapped_CBotChat(memory._client_telegram_bot, chat_id=wallet_owner_user.telegram_id, user=wallet_owner_user, db_session=session) - unprocessed_encrypted_content.meta = { - **unprocessed_encrypted_content.meta, - 'upload_notify_msg_id': await wallet_owner_bot.send_content(session, unprocessed_encrypted_content) - } + # 6) Считаем sha256 и b58 + proc = await asyncio.create_subprocess_exec( + "sha256sum", out_file, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + so, se = await proc.communicate() + if proc.returncode != 0: + make_log("ConvertProcess", f"sha256sum error: {se.decode()}", level="error") + return + sha_hex = so.decode().split()[0] + h58 = b58encode(bytes.fromhex(sha_hex)).decode() + # 7) Добавляем StoredContent, если нужно + if not session.query(StoredContent).filter(StoredContent.hash == h58).first(): + new = StoredContent( + type="local/content_bin", + hash=h58, + user_id=item.user_id, + filename=result_fname, + meta={'encrypted_file_hash': item.hash}, + created=datetime.now(), + ) + session.add(new) session.commit() + # 8) Перемещаем в UPLOADS_DIR + dst = os.path.join(UPLOADS_DIR, h58) + try: + os.remove(dst) + except FileNotFoundError: + pass + shutil.move(out_file, dst) + + converted[opt] = h58 + + # 9) optional: metadata from converter_module + meta_info = converter_module.get_meta(out_dir) # допустим, так + if meta_info and 'ffprobe_meta' not in item.meta: + item.meta['ffprobe_meta'] = meta_info + + # 10) очистка + shutil.rmtree(out_dir, ignore_errors=True) + + # 11) Обновляем оригинальный объект после всех опций + make_log("ConvertProcess", f"Converted: {converted}", level="info") + main_high = 'high' if kind=='video' else 'high_audio' + main_low = 'low' if kind=='video' else 'low_audio' + item.btfs_cid = ContentId(version=2, + content_hash=b58decode(converted[main_high])).serialize_v2() + item.ipfs_cid = ContentId(version=2, + content_hash=b58decode(converted[main_low])).serialize_v2() + item.meta = {**item.meta, 'converted_content': converted} + session.commit() + + # 12) Отправляем уведомление пользователю + if not item.meta.get('upload_notify_msg_id'): + wc = session.query(WalletConnection).filter( + WalletConnection.wallet_address == item.owner_address + ).order_by(WalletConnection.id.desc()).first() + if wc: + bot = Wrapped_CBotChat(memory._client_telegram_bot, + chat_id=wc.user.telegram_id, + user=wc.user, db_session=session) + item.meta['upload_notify_msg_id'] = await bot.send_content(session, item) + session.commit() + + async def main_fn(memory): make_log("ConvertProcess", "Service started", level="info") seqno = 0 diff --git a/requirements.txt b/requirements.txt index c3a449c..bdb4d1d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,5 @@ aiofiles==23.2.1 pydub==0.25.1 pillow==10.2.0 ffmpeg-python==0.2.0 +python-magic==0.4.27