first working version

This commit is contained in:
user 2025-02-23 15:16:24 +03:00
commit f1bb2ba031
4 changed files with 328 additions and 0 deletions

5
.gitignore vendored Normal file
View File

@ -0,0 +1,5 @@
.DS_Store
.note
logs
output
in_examples

12
README.md Normal file
View File

@ -0,0 +1,12 @@
## Usage
```bash
docker build -t media_converter .
docker run --rm \
-v /path/to/your/input_file.ext:/app/input \
-v /path/to/your/output_dir:/app/output \
-v /your/local/logs:/app/logs \
media_converter --ext mp4 --quality high
```

13
converter/Dockerfile Normal file
View File

@ -0,0 +1,13 @@
FROM python:3.9-slim
# Install ffmpeg
RUN apt-get update && apt-get install -y ffmpeg && rm -rf /var/lib/apt/lists/*
WORKDIR /app
# Create necessary directories
RUN mkdir -p logs output
COPY converter.py /app/
ENTRYPOINT ["python", "converter.py"]

298
converter/converter.py Normal file
View File

@ -0,0 +1,298 @@
#!/usr/bin/env python3
import argparse
import os
import sys
import subprocess
import logging
from datetime import datetime
import json
# =============================================================================
# Utility Functions
# =============================================================================
def sanitize_filename(name):
"""Keep only alphanumeric characters, dash, underscore and dot in filename."""
base = os.path.basename(name)
return "".join(c if c.isalnum() or c in ('-', '_', '.') else '_' for c in base)
def setup_logging(quality, in_ext, out_ext):
"""
Sets up logging to both console and a log file in the logs/ directory.
Log filename format: YYYYMMDD-HHMMSS-input_<in_ext>-output_<out_ext>-<quality>.log
"""
os.makedirs("logs", exist_ok=True)
current_time = datetime.now().strftime("%Y%m%d-%H%M%S")
log_filename = f"logs/{current_time}-input_{sanitize_filename(in_ext)}-output_{sanitize_filename(out_ext)}-{quality}.log"
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(fmt="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
file_handler = logging.FileHandler(log_filename)
file_handler.setFormatter(formatter)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
if logger.hasHandlers():
logger.handlers.clear()
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logging.info("Logging initialized. Log file: %s", log_filename)
def parse_metadata(input_file, output_dir):
"""
Parse all metadata from the input file using ffprobe and save them as JSON.
The metadata is saved to 'output.json' in the specified output directory.
"""
try:
cmd = [
"ffprobe", "-v", "quiet",
"-print_format", "json",
"-show_format",
"-show_streams",
input_file
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
metadata_json = result.stdout
output_file = os.path.join(output_dir, "output.json")
with open(output_file, "w", encoding="utf-8") as f:
f.write(metadata_json)
logging.info("Metadata saved to %s", output_file)
except subprocess.CalledProcessError as e:
logging.error("Error parsing metadata: %s", e.stderr)
sys.exit(1)
# =============================================================================
# Media Detection Module
# =============================================================================
class MediaDetector:
"""Detect media type (video or audio) using ffprobe and JSON parsing."""
@staticmethod
def detect_media_type(input_file):
"""
Uses ffprobe with JSON output to analyze streams and decide if file is video or audio.
Heuristic:
- If there is exactly one video stream with codec 'mjpeg' and at least one audio stream, treat as audio.
- Otherwise, if there's any video stream that is not an attached picture, treat as video.
- Else, if audio stream exists, treat as audio.
- Otherwise, return unknown.
"""
try:
cmd = [
"ffprobe", "-v", "error",
"-print_format", "json",
"-show_entries", "stream=index,codec_type,codec_name,disposition,tags",
input_file
]
result = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, check=True)
data = json.loads(result.stdout)
streams = data.get("streams", [])
video_streams = []
audio_count = 0
for stream in streams:
codec_type = stream.get("codec_type", "")
if codec_type == "video":
video_streams.append(stream)
elif codec_type == "audio":
audio_count += 1
# If there is exactly one video stream with codec 'mjpeg' and audio is present, treat as audio
if len(video_streams) == 1 and audio_count > 0:
codec_name = video_streams[0].get("codec_name", "")
if codec_name.lower() == "mjpeg":
return "audio"
# Otherwise, if any video stream is not an attached picture, treat as video
for stream in video_streams:
disposition = stream.get("disposition", {})
try:
if int(disposition.get("attached_pic", 0)) != 1:
return "video"
except ValueError:
return "video"
if audio_count > 0:
return "audio"
else:
return "unknown"
except subprocess.CalledProcessError as e:
logging.error("Error running ffprobe: %s", e.stderr)
sys.exit(1)
# =============================================================================
# Conversion Strategy Base Class and Implementations
# =============================================================================
class ConversionStrategy:
"""Abstract base class for conversion strategies."""
def convert(self, input_file, output_file):
raise NotImplementedError("Subclasses must implement the convert method.")
def execute_command(self, cmd):
"""Execute the provided ffmpeg command, capturing stdout/stderr into logs."""
logging.info("Executing command: %s", " ".join(cmd))
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True)
for line in process.stdout:
logging.info(line.strip())
process.stdout.close()
return_code = process.wait()
if return_code != 0:
logging.error("Error during conversion. Return code: %d", return_code)
sys.exit(1)
logging.info("Conversion completed successfully.")
class VideoHighStrategy(ConversionStrategy):
"""High quality video conversion: Apple ProRes (video) + PCM (audio)."""
def convert(self, input_file, output_file):
cmd = [
"ffmpeg", "-y", "-i", input_file,
"-c:v", "prores_ks", "-profile:v", "3",
"-c:a", "pcm_s16le"
]
if hasattr(self, "custom_params"):
cmd.extend(self.custom_params)
cmd.append(output_file)
self.execute_command(cmd)
class VideoLowStrategy(ConversionStrategy):
"""Low quality video conversion: H.264 (video) + AAC (audio)."""
def convert(self, input_file, output_file):
cmd = [
"ffmpeg", "-y", "-i", input_file,
"-c:v", "libx264", "-preset", "fast", "-crf", "23",
"-c:a", "aac", "-b:a", "192k"
]
if hasattr(self, "custom_params"):
cmd.extend(self.custom_params)
cmd.append(output_file)
self.execute_command(cmd)
class AudioHighStrategy(ConversionStrategy):
"""High quality audio conversion using FLAC."""
def convert(self, input_file, output_file):
# Using -vn and -map to select only audio stream(s)
cmd = [
"ffmpeg", "-y", "-i", input_file,
"-vn",
"-map", "0:a",
"-c:a", "flac"
]
if hasattr(self, "custom_params"):
cmd.extend(self.custom_params)
cmd.append(output_file)
self.execute_command(cmd)
class AudioLowStrategy(ConversionStrategy):
"""Low quality audio conversion using MP3 (libmp3lame)."""
def convert(self, input_file, output_file):
# Using -vn and -map to select only audio stream(s)
cmd = [
"ffmpeg", "-y", "-i", input_file,
"-vn",
"-map", "0:a",
"-c:a", "libmp3lame", "-b:a", "192k"
]
if hasattr(self, "custom_params"):
cmd.extend(self.custom_params)
cmd.append(output_file)
self.execute_command(cmd)
# =============================================================================
# Converter Class: Core Logic
# =============================================================================
class MediaConverter:
"""
Main class that selects the appropriate conversion strategy based on media type
and quality, then performs the conversion.
"""
def __init__(self, input_file, output_file, quality, custom_params=None):
self.input_file = input_file
self.output_file = output_file
self.quality = quality
self.custom_params = custom_params or []
self.media_type = MediaDetector.detect_media_type(self.input_file)
logging.info("Detected media type: %s", self.media_type)
self.strategy = self.get_strategy()
def get_strategy(self):
"""Selects and returns the proper conversion strategy."""
strategy_map = {
("video", "high"): VideoHighStrategy(),
("video", "low"): VideoLowStrategy(),
("audio", "high"): AudioHighStrategy(),
("audio", "low"): AudioLowStrategy(),
}
key = (self.media_type, self.quality)
strategy = strategy_map.get(key)
if strategy is None:
logging.error("No conversion strategy defined for media type '%s' with quality '%s'.",
self.media_type, self.quality)
sys.exit(1)
if self.custom_params:
strategy.custom_params = self.custom_params
return strategy
def convert(self):
"""Executes the conversion using the selected strategy."""
self.strategy.convert(self.input_file, self.output_file)
# =============================================================================
# Main Function and Argument Parsing
# =============================================================================
def main():
parser = argparse.ArgumentParser(description="Advanced Media Converter using ffmpeg.")
# The --ext argument specifies the input file extension (e.g. webm, mp4, etc.)
parser.add_argument("--ext", required=True,
help="Input file extension (e.g. webm, mp4, etc.)")
parser.add_argument("--quality", required=True, choices=["high", "low"],
help="Quality mode: high or low")
parser.add_argument("--custom", nargs='*',
help="Additional custom ffmpeg parameters", default=[])
args = parser.parse_args()
# Fixed paths
input_path = "/app/input"
os.makedirs("/app/output", exist_ok=True)
if not os.path.isfile(input_path):
logging.error("Input file not found at %s", input_path)
sys.exit(1)
# Determine media type
media_type = MediaDetector.detect_media_type(input_path)
# Set output extension based on media type and quality
if media_type == "video":
out_ext = ".mov" if args.quality == "high" else ".mp4"
elif media_type == "audio":
out_ext = ".flac" if args.quality == "high" else ".mp3"
else:
logging.error("Unsupported media type: %s", media_type)
sys.exit(1)
output_path = f"/app/output/output{out_ext}"
# Setup logging
setup_logging(args.quality, args.ext, out_ext)
logging.info("Starting conversion with quality='%s'", args.quality)
logging.info("Input file extension provided: %s", args.ext)
logging.info("Detected media type (in main): %s", media_type)
logging.info("Output file will be: %s", output_path)
if args.custom:
logging.info("Custom ffmpeg parameters: %s", args.custom)
# Parse all metadata and save it as output.json in the output directory
parse_metadata(input_path, os.path.dirname(output_path))
converter = MediaConverter(input_path, output_path, args.quality, args.custom)
converter.convert()
if __name__ == "__main__":
main()