uploader-bot/app/core/background/convert_service.py

288 lines
12 KiB
Python

import asyncio
from datetime import datetime
import os
import uuid
import json
import shutil
import magic # python-magic for MIME detection
from base58 import b58decode, b58encode
from sqlalchemy import and_, or_
from app.core.models.node_storage import StoredContent
from app.core.models._telegram import Wrapped_CBotChat
from app.core._utils.send_status import send_status
from app.core.logger import make_log
from app.core.models.user import User
from app.core.models import WalletConnection
from app.core.storage import db_session
from app.core._config import UPLOADS_DIR
from app.core.content.content_id import ContentId
async def convert_loop(memory):
with db_session() as session:
# Query for unprocessed encrypted content
unprocessed_encrypted_content = session.query(StoredContent).filter(
and_(
StoredContent.type == "onchain/content",
or_(
StoredContent.btfs_cid == None,
StoredContent.ipfs_cid == None,
)
)
).first()
if not unprocessed_encrypted_content:
make_log("ConvertProcess", "No content to convert", level="debug")
return
# Достаем расшифрованный файл
decrypted_content = session.query(StoredContent).filter(
StoredContent.id == unprocessed_encrypted_content.decrypted_content_id
).first()
if not decrypted_content:
make_log("ConvertProcess", "Decrypted content not found", level="error")
return
# Определяем путь и расширение входного файла
input_file_path = f"/Storage/storedContent/{decrypted_content.hash}"
input_ext = (unprocessed_encrypted_content.filename.split('.')[-1]
if '.' in unprocessed_encrypted_content.filename else "mp4")
# ==== Новая логика: определение MIME-тип через python-magic ====
try:
mime_type = magic.from_file(input_file_path, mime=True)
except Exception as e:
make_log("ConvertProcess", f"magic probe failed: {e}", level="warning")
mime_type = ""
if mime_type.startswith("video/"):
content_kind = "video"
elif mime_type.startswith("audio/"):
content_kind = "audio"
else:
content_kind = "other"
make_log("ConvertProcess", f"Detected content_kind={content_kind}, mime={mime_type}", level="info")
# Для прочих типов сохраняем raw копию и выходим
if content_kind == "other":
raw_hash = unprocessed_encrypted_content.hash
raw_content = StoredContent(
type="local/content_raw",
hash=raw_hash,
user_id=unprocessed_encrypted_content.user_id,
filename=unprocessed_encrypted_content.filename,
meta={'source': 'raw_copy'},
created=datetime.now(),
)
session.add(raw_content)
session.commit()
# Копируем файл в UPLOADS_DIR
dst = os.path.join(UPLOADS_DIR, raw_hash)
try:
os.remove(dst)
except FileNotFoundError:
pass
shutil.copy2(input_file_path, dst)
# Обновляем оригинальный объект
unprocessed_encrypted_content.btfs_cid = ContentId(
version=2, content_hash=b58decode(raw_hash)
).serialize_v2()
unprocessed_encrypted_content.ipfs_cid = ContentId(
version=2, content_hash=b58decode(raw_hash)
).serialize_v2()
unprocessed_encrypted_content.meta = {
**unprocessed_encrypted_content.meta,
'converted_content': {'raw': raw_hash}
}
session.commit()
make_log("ConvertProcess", f"Raw content saved for {raw_hash}", level="info")
return
# ==== Конвертация для видео или аудио: оригинальная логика ====
# Static preview interval in seconds
preview_interval = [0, 30]
if unprocessed_encrypted_content.onchain_index in [2]:
preview_interval = [0, 60]
make_log(
"ConvertProcess",
f"Processing content {unprocessed_encrypted_content.id} as {content_kind} with preview interval {preview_interval}",
level="info"
)
# Выбираем опции конвертации для видео и аудио
if content_kind == "video":
REQUIRED_CONVERT_OPTIONS = ['high', 'low', 'low_preview']
else:
REQUIRED_CONVERT_OPTIONS = ['high', 'low'] # no preview for audio
converted_content = {}
logs_dir = "/Storage/logs/converter"
for option in REQUIRED_CONVERT_OPTIONS:
# Set quality parameter and trim option (only for preview)
if option == "low_preview":
quality = "low"
trim_value = f"{preview_interval[0]}-{preview_interval[1]}"
else:
quality = option
trim_value = None
# Generate a unique output directory for docker container
output_uuid = str(uuid.uuid4())
output_dir = f"/Storage/storedContent/converter-output/{output_uuid}"
# Build the docker command
cmd = [
"docker", "run", "--rm",
"-v", f"{input_file_path}:/app/input",
"-v", f"{output_dir}:/app/output",
"-v", f"{logs_dir}:/app/logs",
"media_converter",
"--ext", input_ext,
"--quality", quality
]
if trim_value:
cmd.extend(["--trim", trim_value])
if content_kind == "audio":
cmd.append("--audio-only") # audio-only flag
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
make_log("ConvertProcess", f"Docker conversion failed for option {option}: {stderr.decode()}", level="error")
return
# List files in output dir
try:
files = os.listdir(output_dir.replace("/Storage/storedContent", "/app/data"))
except Exception as e:
make_log("ConvertProcess", f"Error reading output directory {output_dir}: {e}", level="error")
return
media_files = [f for f in files if f != "output.json"]
if len(media_files) != 1:
make_log("ConvertProcess", f"Expected one media file, found {len(media_files)} for option {option}", level="error")
return
output_file = os.path.join(
output_dir.replace("/Storage/storedContent", "/app/data"),
media_files[0]
)
# Compute SHA256 hash of the output file
hash_process = await asyncio.create_subprocess_exec(
"sha256sum", output_file,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
hash_stdout, hash_stderr = await hash_process.communicate()
if hash_process.returncode != 0:
make_log("ConvertProcess", f"Error computing sha256sum for option {option}: {hash_stderr.decode()}", level="error")
return
file_hash = hash_stdout.decode().split()[0]
file_hash = b58encode(bytes.fromhex(file_hash)).decode()
# Save new StoredContent if not exists
if not session.query(StoredContent).filter(
StoredContent.hash == file_hash
).first():
new_content = StoredContent(
type="local/content_bin",
hash=file_hash,
user_id=unprocessed_encrypted_content.user_id,
filename=media_files[0],
meta={'encrypted_file_hash': unprocessed_encrypted_content.hash},
created=datetime.now(),
)
session.add(new_content)
session.commit()
save_path = os.path.join(UPLOADS_DIR, file_hash)
try:
os.remove(save_path)
except FileNotFoundError:
pass
try:
shutil.move(output_file, save_path)
except Exception as e:
make_log("ConvertProcess", f"Error moving output file {output_file} to {save_path}: {e}", level="error")
return
converted_content[option] = file_hash
# Process output.json for ffprobe_meta
output_json_path = os.path.join(
output_dir.replace("/Storage/storedContent", "/app/data"),
"output.json"
)
if os.path.exists(output_json_path) and unprocessed_encrypted_content.meta.get('ffprobe_meta') is None:
try:
with open(output_json_path, "r") as f:
ffprobe_meta = json.load(f)
unprocessed_encrypted_content.meta = {
**unprocessed_encrypted_content.meta,
'ffprobe_meta': ffprobe_meta
}
except Exception as e:
make_log("ConvertProcess", f"Error handling output.json for option {option}: {e}", level="error")
# Cleanup output directory
try:
shutil.rmtree(output_dir.replace("/Storage/storedContent", "/app/data"))
except Exception as e:
make_log("ConvertProcess", f"Error removing output dir {output_dir}: {e}", level="warning")
# Finalize original record
make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Converted content: {converted_content}", level="info")
unprocessed_encrypted_content.btfs_cid = ContentId(
version=2, content_hash=b58decode(converted_content['high' if content_kind=='video' else 'low'])
).serialize_v2()
unprocessed_encrypted_content.ipfs_cid = ContentId(
version=2, content_hash=b58decode(converted_content['low'])
).serialize_v2()
unprocessed_encrypted_content.meta = {
**unprocessed_encrypted_content.meta,
'converted_content': converted_content
}
session.commit()
# Notify user if needed
if not unprocessed_encrypted_content.meta.get('upload_notify_msg_id'):
wallet_owner_connection = session.query(WalletConnection).filter(
WalletConnection.wallet_address == unprocessed_encrypted_content.owner_address
).order_by(WalletConnection.id.desc()).first()
if wallet_owner_connection:
wallet_owner_user = wallet_owner_connection.user
bot = Wrapped_CBotChat(
memory._client_telegram_bot,
chat_id=wallet_owner_user.telegram_id,
user=wallet_owner_user,
db_session=session
)
unprocessed_encrypted_content.meta['upload_notify_msg_id'] = await bot.send_content(session, unprocessed_encrypted_content)
session.commit()
async def main_fn(memory):
make_log("ConvertProcess", "Service started", level="info")
seqno = 0
while True:
try:
make_log("ConvertProcess", "Service running", level="debug")
await convert_loop(memory)
await asyncio.sleep(5)
await send_status("convert_service", f"working (seqno={seqno})")
seqno += 1
except BaseException as e:
make_log("ConvertProcess", f"Error: {e}", level="error")
await asyncio.sleep(3)