211 lines
8.8 KiB
Python
211 lines
8.8 KiB
Python
import asyncio
|
|
from datetime import datetime
|
|
import os
|
|
import uuid
|
|
import json
|
|
import shutil
|
|
from base58 import b58decode, b58encode
|
|
from sqlalchemy import and_, or_
|
|
from app.core.models.node_storage import StoredContent
|
|
from app.core._utils.send_status import send_status
|
|
from app.core.logger import make_log
|
|
from app.core.storage import db_session
|
|
from app.core._config import UPLOADS_DIR
|
|
from app.core.content.content_id import ContentId
|
|
|
|
|
|
async def convert_loop():
|
|
with db_session() as session:
|
|
# Query for unprocessed encrypted content
|
|
unprocessed_encrypted_content = session.query(StoredContent).filter(
|
|
and_(
|
|
StoredContent.type == "onchain/content",
|
|
or_(
|
|
StoredContent.btfs_cid == None,
|
|
StoredContent.ipfs_cid == None,
|
|
)
|
|
)
|
|
).first()
|
|
if not unprocessed_encrypted_content:
|
|
make_log("ConvertProcess", "No content to convert", level="debug")
|
|
return
|
|
|
|
make_log("ConvertProcess", f"Processing content {unprocessed_encrypted_content.id}", level="debug")
|
|
decrypted_content = session.query(StoredContent).filter(
|
|
StoredContent.id == unprocessed_encrypted_content.decrypted_content_id
|
|
).first()
|
|
if not decrypted_content:
|
|
make_log("ConvertProcess", "Decrypted content not found", level="error")
|
|
return
|
|
|
|
# Static preview interval in seconds
|
|
preview_interval = [0, 30]
|
|
|
|
# List of conversion options to process
|
|
REQUIRED_CONVERT_OPTIONS = ['high', 'low', 'low_preview']
|
|
converted_content = {} # Mapping: option -> sha256 hash of output file
|
|
|
|
# Define input file path and extract its extension from filename
|
|
input_file_path = f"/Storage/storedContent/{decrypted_content.hash}"
|
|
|
|
|
|
input_ext = unprocessed_encrypted_content.filename.split('.')[-1] if '.' in unprocessed_encrypted_content.filename else "mp4"
|
|
|
|
# Logs directory mapping
|
|
logs_dir = "/Storage/logs/converter"
|
|
|
|
# Process each conversion option in sequence
|
|
for option in REQUIRED_CONVERT_OPTIONS:
|
|
# Set quality parameter and trim option (only for preview)
|
|
if option == "low_preview":
|
|
quality = "low"
|
|
trim_value = f"{preview_interval[0]}-{preview_interval[1]}"
|
|
else:
|
|
quality = option # 'high' or 'low'
|
|
trim_value = None
|
|
|
|
# Generate a unique output directory for docker container
|
|
output_uuid = str(uuid.uuid4())
|
|
output_dir = f"/Storage/storedContent/converter-output/{output_uuid}"
|
|
|
|
# Build the docker command with appropriate volume mounts and parameters
|
|
cmd = [
|
|
"docker", "run", "--rm",
|
|
"-v", f"{input_file_path}:/app/input",
|
|
"-v", f"{output_dir}:/app/output",
|
|
"-v", f"{logs_dir}:/app/logs",
|
|
"media_converter",
|
|
"--ext", input_ext,
|
|
"--quality", quality
|
|
]
|
|
if trim_value:
|
|
cmd.extend(["--trim", trim_value])
|
|
|
|
# Run the docker container asynchronously
|
|
process = await asyncio.create_subprocess_exec(
|
|
*cmd,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await process.communicate()
|
|
if process.returncode != 0:
|
|
make_log("ConvertProcess", f"Docker conversion failed for option {option}: {stderr.decode()}", level="error")
|
|
return
|
|
|
|
# List files in the output directory
|
|
try:
|
|
files = os.listdir(output_dir.replace("/Storage/storedContent", "/app/data"))
|
|
except Exception as e:
|
|
make_log("ConvertProcess", f"Error reading output directory {output_dir}: {e}", level="error")
|
|
return
|
|
|
|
# Exclude 'output.json' and expect exactly one media output file
|
|
media_files = [f for f in files if f != "output.json"]
|
|
if len(media_files) != 1:
|
|
make_log("ConvertProcess", f"Expected one media file, found {len(media_files)} for option {option}", level="error")
|
|
return
|
|
|
|
output_file = os.path.join(output_dir.replace("/Storage/storedContent", "/app/data"), media_files[0])
|
|
|
|
# Compute SHA256 hash of the output file using async subprocess
|
|
hash_process = await asyncio.create_subprocess_exec(
|
|
"sha256sum", output_file,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
hash_stdout, hash_stderr = await hash_process.communicate()
|
|
if hash_process.returncode != 0:
|
|
make_log("ConvertProcess", f"Error computing sha256sum for option {option}: {hash_stderr.decode()}", level="error")
|
|
return
|
|
file_hash = hash_stdout.decode().split()[0]
|
|
file_hash = b58encode(bytes.fromhex(file_hash)).decode()
|
|
|
|
if not session.query(StoredContent).filter(
|
|
StoredContent.hash == file_hash
|
|
).first():
|
|
new_content = StoredContent(
|
|
type="local/content_bin",
|
|
hash=file_hash,
|
|
user_id=unprocessed_encrypted_content.user_id,
|
|
filename=media_files[0],
|
|
meta={
|
|
'encrypted_file_hash': unprocessed_encrypted_content.hash,
|
|
},
|
|
created=datetime.now(),
|
|
)
|
|
session.add(new_content)
|
|
session.commit()
|
|
|
|
save_path = os.path.join(UPLOADS_DIR, file_hash)
|
|
try:
|
|
os.remove(save_path)
|
|
except FileNotFoundError:
|
|
pass
|
|
|
|
try:
|
|
shutil.move(output_file, save_path)
|
|
except Exception as e:
|
|
make_log("ConvertProcess", f"Error moving output file {output_file} to {save_path}: {e}", level="error")
|
|
return
|
|
|
|
converted_content[option] = file_hash
|
|
|
|
# Process output.json: read its contents and update meta['ffprobe_meta']
|
|
output_json_path = os.path.join(output_dir.replace("/Storage/storedContent", "/app/data"), "output.json")
|
|
if os.path.exists(output_json_path) and unprocessed_encrypted_content.meta.get('ffprobe_meta') is None:
|
|
try:
|
|
with open(output_json_path, "r") as f:
|
|
output_json_content = f.read()
|
|
except Exception as e:
|
|
make_log("ConvertProcess", f"Error reading output.json for option {option}: {e}", level="error")
|
|
return
|
|
|
|
try:
|
|
ffprobe_meta = json.loads(output_json_content)
|
|
except Exception as e:
|
|
make_log("ConvertProcess", f"Error parsing output.json for option {option}: {e}", level="error")
|
|
return
|
|
|
|
unprocessed_encrypted_content.meta = {
|
|
**unprocessed_encrypted_content.meta,
|
|
'ffprobe_meta': ffprobe_meta
|
|
}
|
|
else:
|
|
make_log("ConvertProcess", f"output.json not found for option {option}", level="error")
|
|
|
|
# Remove the output directory after processing
|
|
try:
|
|
shutil.rmtree(output_dir.replace("/Storage/storedContent", "/app/data"))
|
|
except Exception as e:
|
|
make_log("ConvertProcess", f"Error removing output directory {output_dir}: {e}", level="error")
|
|
# Continue even if deletion fails
|
|
|
|
make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Converted content: {converted_content}", level="info")
|
|
|
|
unprocessed_encrypted_content.btfs_cid = ContentId(
|
|
version=2, content_hash=b58decode(converted_content['high'])
|
|
)
|
|
unprocessed_encrypted_content.ipfs_cid = ContentId(
|
|
version=2, content_hash=b58decode(converted_content['low'])
|
|
)
|
|
unprocessed_encrypted_content.meta = {
|
|
**unprocessed_encrypted_content.meta,
|
|
'converted_content': converted_content
|
|
}
|
|
session.commit()
|
|
|
|
|
|
async def main_fn(memory):
|
|
make_log("ConvertProcess", "Service started", level="info")
|
|
seqno = 0
|
|
while True:
|
|
try:
|
|
make_log("ConvertProcess", "Service running", level="debug")
|
|
await convert_loop()
|
|
await asyncio.sleep(5)
|
|
await send_status("convert_service", f"working (seqno={seqno})")
|
|
seqno += 1
|
|
except BaseException as e:
|
|
make_log("ConvertProcess", f"Error: {e}", level="error")
|
|
await asyncio.sleep(3)
|