import asyncio from datetime import datetime import os import uuid import json import shutil from base58 import b58encode from sqlalchemy import and_, or_ from app.core.models.node_storage import StoredContent from app.core._utils.send_status import send_status from app.core.logger import make_log from app.core.storage import db_session from app.core._config import UPLOADS_DIR async def convert_loop(): with db_session() as session: # Query for unprocessed encrypted content unprocessed_encrypted_content = session.query(StoredContent).filter( and_( StoredContent.type == "onchain/content", or_( StoredContent.btfs_cid == None, StoredContent.ipfs_cid == None, ) ) ).first() if not unprocessed_encrypted_content: make_log("ConvertProcess", "No content to convert", level="debug") return make_log("ConvertProcess", f"Processing content {unprocessed_encrypted_content.id}", level="debug") decrypted_content = session.query(StoredContent).filter( StoredContent.id == unprocessed_encrypted_content.decrypted_content_id ).first() if not decrypted_content: make_log("ConvertProcess", "Decrypted content not found", level="error") return # Static preview interval in seconds preview_interval = [0, 30] # List of conversion options to process REQUIRED_CONVERT_OPTIONS = ['high', 'low', 'low_preview'] converted_content = {} # Mapping: option -> sha256 hash of output file # Define input file path and extract its extension from filename input_file_path = f"/Storage/storedContent/{unprocessed_encrypted_content.hash}" input_ext = unprocessed_encrypted_content.filename.split('.')[-1] if '.' in unprocessed_encrypted_content.filename else "mp4" # Logs directory mapping logs_dir = "/Storage/logs/converter" # Process each conversion option in sequence for option in REQUIRED_CONVERT_OPTIONS: # Set quality parameter and trim option (only for preview) if option == "low_preview": quality = "low" trim_value = f"{preview_interval[0]}-{preview_interval[1]}" else: quality = option # 'high' or 'low' trim_value = None # Generate a unique output directory for docker container output_uuid = str(uuid.uuid4()) output_dir = f"/Storage/storedContent/converter-output/{output_uuid}" # Build the docker command with appropriate volume mounts and parameters cmd = [ "docker", "run", "--rm", "-v", f"{input_file_path}:/app/input", "-v", f"{output_dir}:/app/output", "-v", f"{logs_dir}:/app/logs", "media_converter", "--ext", input_ext, "--quality", quality ] if trim_value: cmd.extend(["--trim", trim_value]) # Run the docker container asynchronously process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await process.communicate() if process.returncode != 0: make_log("ConvertProcess", f"Docker conversion failed for option {option}: {stderr.decode()}", level="error") return # List files in the output directory try: files = os.listdir(output_dir.replace("/Storage/storedContent", "/app/data")) except Exception as e: make_log("ConvertProcess", f"Error reading output directory {output_dir}: {e}", level="error") return # Exclude 'output.json' and expect exactly one media output file media_files = [f for f in files if f != "output.json"] if len(media_files) != 1: make_log("ConvertProcess", f"Expected one media file, found {len(media_files)} for option {option}", level="error") return output_file = os.path.join(output_dir.replace("/Storage/storedContent", "/app/data"), media_files[0]) # Compute SHA256 hash of the output file using async subprocess hash_process = await asyncio.create_subprocess_exec( "sha256sum", output_file, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) hash_stdout, hash_stderr = await hash_process.communicate() if hash_process.returncode != 0: make_log("ConvertProcess", f"Error computing sha256sum for option {option}: {hash_stderr.decode()}", level="error") return file_hash = hash_stdout.decode().split()[0] file_hash = b58encode(bytes.fromhex(file_hash)).decode() if not db_session.query(StoredContent).filter( StoredContent.hash == file_hash ).first(): new_content = StoredContent( type="local/content_bin", hash=file_hash, user_id=unprocessed_encrypted_content.user_id, filename=media_files[0], meta={ 'encrypted_file_hash': unprocessed_encrypted_content.hash, }, created=datetime.now(), ) session.add(new_content) session.commit() save_path = os.path.join(UPLOADS_DIR, file_hash) try: os.remove(save_path) except FileNotFoundError: pass try: shutil.move(output_file, save_path) except Exception as e: make_log("ConvertProcess", f"Error moving output file {output_file} to {save_path}: {e}", level="error") return converted_content[option] = file_hash # Process output.json: read its contents and update meta['ffprobe_meta'] output_json_path = os.path.join(output_dir.replace("/Storage/storedContent", "/app/data"), "output.json") if os.path.exists(output_json_path) and unprocessed_encrypted_content.meta.get('ffprobe_meta') is None: try: with open(output_json_path, "r") as f: output_json_content = f.read() except Exception as e: make_log("ConvertProcess", f"Error reading output.json for option {option}: {e}", level="error") return try: ffprobe_meta = json.loads(output_json_content) except Exception as e: make_log("ConvertProcess", f"Error parsing output.json for option {option}: {e}", level="error") return unprocessed_encrypted_content.meta = { **unprocessed_encrypted_content.meta, 'ffprobe_meta': ffprobe_meta } else: make_log("ConvertProcess", f"output.json not found for option {option}", level="error") # Remove the output directory after processing try: shutil.rmtree(output_dir.replace("/Storage/storedContent", "/app/data")) except Exception as e: make_log("ConvertProcess", f"Error removing output directory {output_dir}: {e}", level="error") # Continue even if deletion fails make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Converted content: {converted_content}", level="info") unprocessed_encrypted_content.meta = { **unprocessed_encrypted_content.meta, 'converted_content': converted_content } session.commit() async def main_fn(memory): make_log("ConvertProcess", "Service started", level="info") seqno = 0 while True: try: make_log("ConvertProcess", "Service running", level="debug") await convert_loop() await asyncio.sleep(5) await send_status("convert_service", f"working (seqno={seqno})") seqno += 1 except BaseException as e: make_log("ConvertProcess", f"Error: {e}", level="error") await asyncio.sleep(3)