converter-module/converter/converter.py

341 lines
14 KiB
Python

#!/usr/bin/env python3
import argparse
import os
import sys
import subprocess
import logging
from datetime import datetime
import json
# =============================================================================
# Utility Functions
# =============================================================================
def sanitize_filename(name):
"""Keep only alphanumeric characters, dash, underscore and dot in filename."""
base = os.path.basename(name)
return "".join(c if c.isalnum() or c in ('-', '_', '.') else '_' for c in base)
def setup_logging(quality, in_ext, out_ext):
"""
Sets up logging to both console and a log file in the logs/ directory.
Log filename format: YYYYMMDD-HHMMSS-input_<in_ext>-output_<out_ext>-<quality>.log
"""
os.makedirs("logs", exist_ok=True)
current_time = datetime.now().strftime("%Y%m%d-%H%M%S")
log_filename = f"logs/{current_time}-input_{sanitize_filename(in_ext)}-output_{sanitize_filename(out_ext)}-{quality}.log"
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(fmt="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
file_handler = logging.FileHandler(log_filename)
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
if logger.hasHandlers():
logger.handlers.clear()
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logging.info("Logging initialized. Log file: %s", log_filename)
def parse_metadata(input_file, output_dir):
"""
Parse all metadata from the input file using ffprobe and save them as JSON.
The metadata is saved to 'output.json' in the specified output directory.
"""
try:
cmd = [
"ffprobe", "-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
input_file
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
metadata_json = result.stdout
output_file = os.path.join(output_dir, "output.json")
with open(output_file, "w", encoding="utf-8") as f:
f.write(metadata_json)
logging.info("Metadata saved to %s", output_file)
except subprocess.CalledProcessError as e:
logging.error("Error parsing metadata: %s", e.stderr)
sys.exit(1)
# =============================================================================
# Media Detection Module
# =============================================================================
class MediaDetector:
"""Detect media type (video or audio) using ffprobe and JSON parsing."""
@staticmethod
def detect_media_type(input_file):
"""
Uses ffprobe with JSON output to analyze streams and decide if file is video or audio.
Heuristic:
- If there is exactly one video stream with codec 'mjpeg' and at least one audio stream, treat as audio.
- Otherwise, if there's any video stream that is not an attached picture, treat as video.
- Else, if audio stream exists, treat as audio.
- Otherwise, return unknown.
"""
try:
cmd = [
"ffprobe", "-v", "error",
"-print_format", "json",
"-show_entries", "stream=index,codec_type,codec_name,disposition,tags",
input_file
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
data = json.loads(result.stdout)
streams = data.get("streams", [])
video_streams = []
audio_count = 0
for stream in streams:
codec_type = stream.get("codec_type", "")
if codec_type == "video":
video_streams.append(stream)
elif codec_type == "audio":
audio_count += 1
# If there is exactly one video stream with codec 'mjpeg' and audio is present, treat as audio
if len(video_streams) == 1 and audio_count > 0:
codec_name = video_streams[0].get("codec_name", "")
if codec_name.lower() == "mjpeg":
return "audio"
# Otherwise, if any video stream is not an attached picture, treat as video
for stream in video_streams:
disposition = stream.get("disposition", {})
try:
if int(disposition.get("attached_pic", 0)) != 1:
return "video"
except ValueError:
return "video"
if audio_count > 0:
return "audio"
else:
return "unknown"
except subprocess.CalledProcessError as e:
logging.error("Error running ffprobe: %s", e.stderr)
sys.exit(1)
# =============================================================================
# Conversion Strategy Base Class and Implementations
# =============================================================================
class ConversionStrategy:
"""Abstract base class for conversion strategies."""
def convert(self, input_file, output_file):
raise NotImplementedError("Subclasses must implement the convert method.")
def execute_command(self, cmd):
"""Execute the provided ffmpeg command, capturing stdout/stderr into logs."""
logging.info("Executing command: %s", " ".join(cmd))
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in process.stdout:
logging.info(line.strip())
process.stdout.close()
return_code = process.wait()
if return_code != 0:
logging.error("Error during conversion. Return code: %d", return_code)
sys.exit(1)
logging.info("Conversion completed successfully.")
class VideoHighStrategy(ConversionStrategy):
"""High quality video conversion: Lossless H.264 (video) + PCM (audio)."""
def convert(self, input_file, output_file):
cmd = [
"ffmpeg", "-y", "-i", input_file
]
# Add trim parameters if specified
if hasattr(self, "trim"):
start, end = self.trim
cmd += ["-ss", str(start), "-to", str(end)]
cmd += [
"-c:v", "libx264", "-preset", "veryslow", "-crf", "18",
"-pix_fmt", "yuv420p",
"-movflags", "+faststart",
"-c:a", "pcm_s16le" # Lossless audio using PCM
]
if hasattr(self, "custom_params"):
cmd.extend(self.custom_params)
cmd.append(output_file)
self.execute_command(cmd)
class VideoLowStrategy(ConversionStrategy):
"""Low quality video conversion: H.264 (video) + AAC (audio)."""
def convert(self, input_file, output_file):
cmd = [
"ffmpeg", "-y", "-i", input_file
]
# Add trim parameters if specified
if hasattr(self, "trim"):
start, end = self.trim
cmd += ["-ss", str(start), "-to", str(end)]
cmd += [
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "192k"
]
if hasattr(self, "custom_params"):
cmd.extend(self.custom_params)
cmd.append(output_file)
self.execute_command(cmd)
class AudioHighStrategy(ConversionStrategy):
"""High quality audio conversion using FLAC."""
def convert(self, input_file, output_file):
cmd = [
"ffmpeg", "-y", "-i", input_file
]
# Add trim parameters if specified
if hasattr(self, "trim"):
start, end = self.trim
cmd += ["-ss", str(start), "-to", str(end)]
cmd += [
"-vn",
"-map", "0:a",
"-c:a", "flac"
]
if hasattr(self, "custom_params"):
cmd.extend(self.custom_params)
cmd.append(output_file)
self.execute_command(cmd)
class AudioLowStrategy(ConversionStrategy):
"""Low quality audio conversion using MP3 (libmp3lame)."""
def convert(self, input_file, output_file):
cmd = [
"ffmpeg", "-y", "-i", input_file
]
# Add trim parameters if specified
if hasattr(self, "trim"):
start, end = self.trim
cmd += ["-ss", str(start), "-to", str(end)]
cmd += [
"-vn",
"-map", "0:a",
"-c:a", "libmp3lame", "-b:a", "192k"
]
if hasattr(self, "custom_params"):
cmd.extend(self.custom_params)
cmd.append(output_file)
self.execute_command(cmd)
# =============================================================================
# Converter Class: Core Logic
# =============================================================================
class MediaConverter:
"""
Main class that selects the appropriate conversion strategy based on media type
and quality, then performs the conversion.
"""
def __init__(self, input_file, output_file, quality, custom_params=None, trim=None):
self.input_file = input_file
self.output_file = output_file
self.quality = quality
self.custom_params = custom_params or []
self.trim = trim # Tuple of (start, end) times in seconds
self.media_type = MediaDetector.detect_media_type(self.input_file)
logging.info("Detected media type: %s", self.media_type)
self.strategy = self.get_strategy()
if self.trim:
self.strategy.trim = self.trim
def get_strategy(self):
"""Selects and returns the proper conversion strategy."""
strategy_map = {
("video", "high"): VideoHighStrategy(),
("video", "low"): VideoLowStrategy(),
("audio", "high"): AudioHighStrategy(),
("audio", "low"): AudioLowStrategy(),
}
key = (self.media_type, self.quality)
strategy = strategy_map.get(key)
if strategy is None:
logging.error("No conversion strategy defined for media type '%s' with quality '%s'.",
self.media_type, self.quality)
sys.exit(1)
if self.custom_params:
strategy.custom_params = self.custom_params
return strategy
def convert(self):
"""Executes the conversion using the selected strategy."""
self.strategy.convert(self.input_file, self.output_file)
# =============================================================================
# Main Function and Argument Parsing
# =============================================================================
def main():
parser = argparse.ArgumentParser(description="Advanced Media Converter using ffmpeg.")
# The --ext argument specifies the input file extension (e.g. webm, mp4, etc.)
parser.add_argument("--ext", required=True,
help="Input file extension (e.g. webm, mp4, etc.)")
parser.add_argument("--quality", required=True, choices=["high", "low"],
help="Quality mode: high or low")
parser.add_argument("--custom", nargs='*',
help="Additional custom ffmpeg parameters", default=[])
parser.add_argument("--trim", help="Trim the media between start-end seconds (e.g., 0.5-35)", default=None)
args = parser.parse_args()
# Fixed paths
input_path = "/app/input"
subprocess.run(f"rm -rf /app/output/* /app/output/.* 2>/dev/null", shell=True)
os.makedirs("/app/output", exist_ok=True)
if not os.path.isfile(input_path):
logging.error("Input file not found at %s", input_path)
sys.exit(1)
# Parse trim argument if provided
trim_tuple = None
if args.trim:
try:
start_str, end_str = args.trim.split('-')
trim_tuple = (float(start_str), float(end_str))
if trim_tuple[0] >= trim_tuple[1]:
logging.error("Invalid trim range: start time must be less than end time.")
sys.exit(1)
except ValueError:
logging.error("Invalid trim format. Use format: start-end (e.g., 0.5-35)")
sys.exit(1)
# Determine media type
media_type = MediaDetector.detect_media_type(input_path)
# Set output extension based on media type and quality
if media_type == "video":
out_ext = ".mov" if args.quality == "high" else ".mp4"
elif media_type == "audio":
out_ext = ".flac" if args.quality == "high" else ".mp3"
else:
logging.error("Unsupported media type: %s", media_type)
sys.exit(1)
output_path = f"/app/output/output{out_ext}"
# Setup logging
setup_logging(args.quality, args.ext, out_ext)
logging.info("Starting conversion with quality='%s'", args.quality)
logging.info("Input file extension provided: %s", args.ext)
logging.info("Detected media type (in main): %s", media_type)
logging.info("Output file will be: %s", output_path)
if args.custom:
logging.info("Custom ffmpeg parameters: %s", args.custom)
if trim_tuple:
logging.info("Trimming media from %s to %s seconds", trim_tuple[0], trim_tuple[1])
# Parse all metadata and save it as output.json in the output directory
parse_metadata(input_path, os.path.dirname(output_path))
converter = MediaConverter(input_path, output_path, args.quality, args.custom, trim=trim_tuple)
converter.convert()
if __name__ == "__main__":
main()