#!/usr/bin/env python3 import argparse import os import sys import subprocess import logging from datetime import datetime import json # ============================================================================= # Utility Functions # ============================================================================= def sanitize_filename(name): """Keep only alphanumeric characters, dash, underscore and dot in filename.""" base = os.path.basename(name) return "".join(c if c.isalnum() or c in ('-', '_', '.') else '_' for c in base) def setup_logging(quality, in_ext, out_ext): """ Sets up logging to both console and a log file in the logs/ directory. Log filename format: YYYYMMDD-HHMMSS-input_-output_-.log """ os.makedirs("logs", exist_ok=True) current_time = datetime.now().strftime("%Y%m%d-%H%M%S") log_filename = f"logs/{current_time}-input_{sanitize_filename(in_ext)}-output_{sanitize_filename(out_ext)}-{quality}.log" logger = logging.getLogger() logger.setLevel(logging.INFO) formatter = logging.Formatter(fmt="%(asctime)s - %(levelname)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S") file_handler = logging.FileHandler(log_filename) file_handler.setFormatter(formatter) console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) if logger.hasHandlers(): logger.handlers.clear() logger.addHandler(file_handler) logger.addHandler(console_handler) logging.info("Logging initialized. Log file: %s", log_filename) def parse_metadata(input_file, output_dir): """ Parse all metadata from the input file using ffprobe and save them as JSON. The metadata is saved to 'output.json' in the specified output directory. """ try: cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", input_file ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) metadata_json = result.stdout output_file = os.path.join(output_dir, "output.json") with open(output_file, "w", encoding="utf-8") as f: f.write(metadata_json) logging.info("Metadata saved to %s", output_file) except subprocess.CalledProcessError as e: logging.error("Error parsing metadata: %s", e.stderr) sys.exit(1) # ============================================================================= # Media Detection Module # ============================================================================= class MediaDetector: """Detect media type (video or audio) using ffprobe and JSON parsing.""" @staticmethod def detect_media_type(input_file): """ Uses ffprobe with JSON output to analyze streams and decide if file is video or audio. Heuristic: - If there is exactly one video stream with codec 'mjpeg' and at least one audio stream, treat as audio. - Otherwise, if there's any video stream that is not an attached picture, treat as video. - Else, if audio stream exists, treat as audio. - Otherwise, return unknown. """ try: cmd = [ "ffprobe", "-v", "error", "-print_format", "json", "-show_entries", "stream=index,codec_type,codec_name,disposition,tags", input_file ] result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True) data = json.loads(result.stdout) streams = data.get("streams", []) video_streams = [] audio_count = 0 for stream in streams: codec_type = stream.get("codec_type", "") if codec_type == "video": video_streams.append(stream) elif codec_type == "audio": audio_count += 1 # If there is exactly one video stream with codec 'mjpeg' and audio is present, treat as audio if len(video_streams) == 1 and audio_count > 0: codec_name = video_streams[0].get("codec_name", "") if codec_name.lower() == "mjpeg": return "audio" # Otherwise, if any video stream is not an attached picture, treat as video for stream in video_streams: disposition = stream.get("disposition", {}) try: if int(disposition.get("attached_pic", 0)) != 1: return "video" except ValueError: return "video" if audio_count > 0: return "audio" else: return "unknown" except subprocess.CalledProcessError as e: logging.error("Error running ffprobe: %s", e.stderr) sys.exit(1) # ============================================================================= # Conversion Strategy Base Class and Implementations # ============================================================================= class ConversionStrategy: """Abstract base class for conversion strategies.""" def convert(self, input_file, output_file): raise NotImplementedError("Subclasses must implement the convert method.") def execute_command(self, cmd): """Execute the provided ffmpeg command, capturing stdout/stderr into logs.""" logging.info("Executing command: %s", " ".join(cmd)) process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) for line in process.stdout: logging.info(line.strip()) process.stdout.close() return_code = process.wait() if return_code != 0: logging.error("Error during conversion. Return code: %d", return_code) sys.exit(1) logging.info("Conversion completed successfully.") class VideoHighStrategy(ConversionStrategy): """High quality video conversion: Lossless H.264 (video) + PCM (audio).""" def convert(self, input_file, output_file): cmd = [ "ffmpeg", "-y", "-i", input_file, "-c:v", "libx264", "-preset", "veryslow", "-crf", "18", "-pix_fmt", "yuv420p", "-movflags", "+faststart", "-c:a", "pcm_s16le" # Lossless audio using PCM ] if hasattr(self, "custom_params"): cmd.extend(self.custom_params) cmd.append(output_file) self.execute_command(cmd) class VideoLowStrategy(ConversionStrategy): """Low quality video conversion: H.264 (video) + AAC (audio).""" def convert(self, input_file, output_file): cmd = [ "ffmpeg", "-y", "-i", input_file, "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "192k" ] if hasattr(self, "custom_params"): cmd.extend(self.custom_params) cmd.append(output_file) self.execute_command(cmd) class AudioHighStrategy(ConversionStrategy): """High quality audio conversion using FLAC.""" def convert(self, input_file, output_file): # Using -vn and -map to select only audio stream(s) cmd = [ "ffmpeg", "-y", "-i", input_file, "-vn", "-map", "0:a", "-c:a", "flac" ] if hasattr(self, "custom_params"): cmd.extend(self.custom_params) cmd.append(output_file) self.execute_command(cmd) class AudioLowStrategy(ConversionStrategy): """Low quality audio conversion using MP3 (libmp3lame).""" def convert(self, input_file, output_file): # Using -vn and -map to select only audio stream(s) cmd = [ "ffmpeg", "-y", "-i", input_file, "-vn", "-map", "0:a", "-c:a", "libmp3lame", "-b:a", "192k" ] if hasattr(self, "custom_params"): cmd.extend(self.custom_params) cmd.append(output_file) self.execute_command(cmd) # ============================================================================= # Converter Class: Core Logic # ============================================================================= class MediaConverter: """ Main class that selects the appropriate conversion strategy based on media type and quality, then performs the conversion. """ def __init__(self, input_file, output_file, quality, custom_params=None): self.input_file = input_file self.output_file = output_file self.quality = quality self.custom_params = custom_params or [] self.media_type = MediaDetector.detect_media_type(self.input_file) logging.info("Detected media type: %s", self.media_type) self.strategy = self.get_strategy() def get_strategy(self): """Selects and returns the proper conversion strategy.""" strategy_map = { ("video", "high"): VideoHighStrategy(), ("video", "low"): VideoLowStrategy(), ("audio", "high"): AudioHighStrategy(), ("audio", "low"): AudioLowStrategy(), } key = (self.media_type, self.quality) strategy = strategy_map.get(key) if strategy is None: logging.error("No conversion strategy defined for media type '%s' with quality '%s'.", self.media_type, self.quality) sys.exit(1) if self.custom_params: strategy.custom_params = self.custom_params return strategy def convert(self): """Executes the conversion using the selected strategy.""" self.strategy.convert(self.input_file, self.output_file) # ============================================================================= # Main Function and Argument Parsing # ============================================================================= def main(): parser = argparse.ArgumentParser(description="Advanced Media Converter using ffmpeg.") # The --ext argument specifies the input file extension (e.g. webm, mp4, etc.) parser.add_argument("--ext", required=True, help="Input file extension (e.g. webm, mp4, etc.)") parser.add_argument("--quality", required=True, choices=["high", "low"], help="Quality mode: high or low") parser.add_argument("--custom", nargs='*', help="Additional custom ffmpeg parameters", default=[]) args = parser.parse_args() # Fixed paths input_path = "/app/input" subprocess.run(f"rm -rf /app/output/* /app/output/.* 2>/dev/null", shell=True) os.makedirs("/app/output", exist_ok=True) if not os.path.isfile(input_path): logging.error("Input file not found at %s", input_path) sys.exit(1) # Determine media type media_type = MediaDetector.detect_media_type(input_path) # Set output extension based on media type and quality if media_type == "video": out_ext = ".mov" if args.quality == "high" else ".mp4" elif media_type == "audio": out_ext = ".flac" if args.quality == "high" else ".mp3" else: logging.error("Unsupported media type: %s", media_type) sys.exit(1) output_path = f"/app/output/output{out_ext}" # Setup logging setup_logging(args.quality, args.ext, out_ext) logging.info("Starting conversion with quality='%s'", args.quality) logging.info("Input file extension provided: %s", args.ext) logging.info("Detected media type (in main): %s", media_type) logging.info("Output file will be: %s", output_path) if args.custom: logging.info("Custom ffmpeg parameters: %s", args.custom) # Parse all metadata and save it as output.json in the output directory parse_metadata(input_path, os.path.dirname(output_path)) converter = MediaConverter(input_path, output_path, args.quality, args.custom) converter.convert() if __name__ == "__main__": main()