uploader-bot/app/core/background/convert_service.py

280 lines
13 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio
from datetime import datetime
import os
import uuid
import json
import shutil
import magic # python-magic for MIME detection
from base58 import b58decode, b58encode
from sqlalchemy import and_, or_, select
from app.core.models.node_storage import StoredContent
from app.core.models._telegram import Wrapped_CBotChat
from app.core._utils.send_status import send_status
from app.core.logger import make_log
from app.core.models.user import User
from app.core.models import WalletConnection
from app.core.storage import db_session
from app.core._config import UPLOADS_DIR, BACKEND_DATA_DIR_HOST, BACKEND_LOGS_DIR_HOST
from app.core.content.content_id import ContentId
async def convert_loop(memory):
async with db_session() as session:
# Query for unprocessed encrypted content
unprocessed_encrypted_content = (await session.execute(select(StoredContent).where(
and_(
StoredContent.type == "onchain/content",
or_(
StoredContent.btfs_cid == None,
StoredContent.ipfs_cid == None,
)
)
))).scalars().first()
if not unprocessed_encrypted_content:
make_log("ConvertProcess", "No content to convert", level="debug")
return
# Достаем расшифрованный файл
decrypted_content = (await session.execute(select(StoredContent).where(
StoredContent.id == unprocessed_encrypted_content.decrypted_content_id
))).scalars().first()
if not decrypted_content:
make_log("ConvertProcess", "Decrypted content not found", level="error")
return
# Определяем путь и расширение входного файла
# Путь внутри текущего контейнера (доступен Python процессу)
input_file_container = os.path.join(UPLOADS_DIR, decrypted_content.hash)
# Хостовый путь (нужен для docker -v маппинга при запуске конвертера)
input_file_host = os.path.join(BACKEND_DATA_DIR_HOST, decrypted_content.hash)
input_ext = (unprocessed_encrypted_content.filename.split('.')[-1]
if '.' in unprocessed_encrypted_content.filename else "mp4")
# ==== Новая логика: определение MIME-тип через python-magic ====
try:
mime_type = magic.from_file(input_file_container, mime=True)
except Exception as e:
make_log("ConvertProcess", f"magic probe failed: {e}", level="warning")
mime_type = ""
if mime_type.startswith("video/"):
content_kind = "video"
elif mime_type.startswith("audio/"):
content_kind = "audio"
else:
content_kind = "other"
make_log("ConvertProcess", f"Detected content_kind={content_kind}, mime={mime_type}", level="info")
# Для прочих типов сохраняем raw копию и выходим
if content_kind == "other":
make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Not audio/video, copy just", level="info")
unprocessed_encrypted_content.btfs_cid = ContentId(
version=2, content_hash=b58decode(decrypted_content.hash)
).serialize_v2()
unprocessed_encrypted_content.ipfs_cid = ContentId(
version=2, content_hash=b58decode(decrypted_content.hash)
).serialize_v2()
unprocessed_encrypted_content.meta = {
**unprocessed_encrypted_content.meta,
'converted_content': {
option_name: decrypted_content.hash for option_name in ['high', 'low', 'low_preview']
}
}
await session.commit()
return
# ==== Конвертация для видео или аудио: оригинальная логика ====
# Static preview interval in seconds
preview_interval = [0, 30]
if unprocessed_encrypted_content.onchain_index in [2]:
preview_interval = [0, 60]
make_log(
"ConvertProcess",
f"Processing content {unprocessed_encrypted_content.id} as {content_kind} with preview interval {preview_interval}",
level="info"
)
# Выбираем опции конвертации для видео и аудио
if content_kind == "video":
REQUIRED_CONVERT_OPTIONS = ['high', 'low', 'low_preview']
else:
REQUIRED_CONVERT_OPTIONS = ['high', 'low'] # no preview for audio
converted_content = {}
# Директория логов на хосте для docker-контейнера конвертера
logs_dir_host = BACKEND_LOGS_DIR_HOST
for option in REQUIRED_CONVERT_OPTIONS:
# Set quality parameter and trim option (only for preview)
if option == "low_preview":
quality = "low"
trim_value = f"{preview_interval[0]}-{preview_interval[1]}"
else:
quality = option
trim_value = None
# Generate a unique output directory for docker container
output_uuid = str(uuid.uuid4())
# Директория вывода в текущем контейнере (та же что и в UPLOADS_DIR, смонтирована с хоста)
output_dir_container = os.path.join(UPLOADS_DIR, "converter-output", output_uuid)
os.makedirs(output_dir_container, exist_ok=True)
# Соответствующая директория на хосте — нужна для docker -v
output_dir_host = os.path.join(BACKEND_DATA_DIR_HOST, "converter-output", output_uuid)
# Build the docker command
cmd = [
"docker", "run", "--rm",
# Важно: источники - это ХОСТОВЫЕ пути, так как docker демону они нужны на хосте
"-v", f"{input_file_host}:/app/input:ro",
"-v", f"{output_dir_host}:/app/output",
"-v", f"{logs_dir_host}:/app/logs",
"media_converter",
"--ext", input_ext,
"--quality", quality
]
if trim_value:
cmd.extend(["--trim", trim_value])
# converter auto-detects audio/video, no explicit flag required
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
make_log("ConvertProcess", f"Docker conversion failed for option {option}: {stderr.decode()}", level="error")
return
# List files in output dir
try:
files = os.listdir(output_dir_container)
except Exception as e:
make_log("ConvertProcess", f"Error reading output directory {output_dir}: {e}", level="error")
return
media_files = [f for f in files if f != "output.json"]
if len(media_files) != 1:
make_log("ConvertProcess", f"Expected one media file, found {len(media_files)} for option {option}", level="error")
return
output_file = os.path.join(output_dir_container, media_files[0])
# Compute SHA256 hash of the output file
hash_process = await asyncio.create_subprocess_exec(
"sha256sum", output_file,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
hash_stdout, hash_stderr = await hash_process.communicate()
if hash_process.returncode != 0:
make_log("ConvertProcess", f"Error computing sha256sum for option {option}: {hash_stderr.decode()}", level="error")
return
file_hash = hash_stdout.decode().split()[0]
file_hash = b58encode(bytes.fromhex(file_hash)).decode()
# Save new StoredContent if not exists
if not (await session.execute(select(StoredContent).where(StoredContent.hash == file_hash))).scalars().first():
new_content = StoredContent(
type="local/content_bin",
hash=file_hash,
user_id=unprocessed_encrypted_content.user_id,
filename=media_files[0],
meta={'encrypted_file_hash': unprocessed_encrypted_content.hash},
created=datetime.now(),
)
session.add(new_content)
await session.commit()
save_path = os.path.join(UPLOADS_DIR, file_hash)
try:
os.remove(save_path)
except FileNotFoundError:
pass
try:
shutil.move(output_file, save_path)
except Exception as e:
make_log("ConvertProcess", f"Error moving output file {output_file} to {save_path}: {e}", level="error")
return
converted_content[option] = file_hash
# Process output.json for ffprobe_meta
output_json_path = os.path.join(output_dir_container, "output.json")
if os.path.exists(output_json_path) and unprocessed_encrypted_content.meta.get('ffprobe_meta') is None:
try:
with open(output_json_path, "r") as f:
ffprobe_meta = json.load(f)
unprocessed_encrypted_content.meta = {
**unprocessed_encrypted_content.meta,
'ffprobe_meta': ffprobe_meta
}
except Exception as e:
make_log("ConvertProcess", f"Error handling output.json for option {option}: {e}", level="error")
# Cleanup output directory
try:
shutil.rmtree(output_dir_container)
except Exception as e:
make_log("ConvertProcess", f"Error removing output dir {output_dir}: {e}", level="warning")
# Finalize original record
make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Converted content: {converted_content}", level="info")
unprocessed_encrypted_content.btfs_cid = ContentId(
version=2, content_hash=b58decode(converted_content['high' if content_kind=='video' else 'low'])
).serialize_v2()
unprocessed_encrypted_content.ipfs_cid = ContentId(
version=2, content_hash=b58decode(converted_content['low'])
).serialize_v2()
unprocessed_encrypted_content.meta = {
**unprocessed_encrypted_content.meta,
'converted_content': converted_content
}
await session.commit()
# Notify user if needed
if not unprocessed_encrypted_content.meta.get('upload_notify_msg_id'):
wallet_owner_connection = (await session.execute(select(WalletConnection).where(
WalletConnection.wallet_address == unprocessed_encrypted_content.owner_address
).order_by(WalletConnection.id.desc()))).scalars().first()
if wallet_owner_connection:
wallet_owner_user = wallet_owner_connection.user
bot = Wrapped_CBotChat(
memory._client_telegram_bot,
chat_id=wallet_owner_user.telegram_id,
user=wallet_owner_user,
db_session=session
)
unprocessed_encrypted_content.meta['upload_notify_msg_id'] = await bot.send_content(session, unprocessed_encrypted_content)
await session.commit()
async def main_fn(memory):
make_log("ConvertProcess", "Service started", level="info")
seqno = 0
while True:
try:
rid = __import__('uuid').uuid4().hex[:8]
try:
from app.core.log_context import ctx_rid
ctx_rid.set(rid)
except BaseException:
pass
make_log("ConvertProcess", "Service running", level="debug", rid=rid)
await convert_loop(memory)
await asyncio.sleep(5)
await send_status("convert_service", f"working (seqno={seqno})")
seqno += 1
except BaseException as e:
make_log("ConvertProcess", f"Error: {e}", level="error", rid=locals().get('rid'))
await asyncio.sleep(3)
finally:
try:
from app.core.log_context import ctx_rid
ctx_rid.set(None)
except BaseException:
pass