281 lines
13 KiB
Python
281 lines
13 KiB
Python
import asyncio
|
||
from datetime import datetime
|
||
import os
|
||
import uuid
|
||
import json
|
||
import shutil
|
||
import magic # python-magic for MIME detection
|
||
from base58 import b58decode, b58encode
|
||
from sqlalchemy import and_, or_, select
|
||
from app.core.models.node_storage import StoredContent
|
||
from app.core.models._telegram import Wrapped_CBotChat
|
||
from app.core._utils.send_status import send_status
|
||
from app.core.logger import make_log
|
||
from app.core.models.user import User
|
||
from app.core.models import WalletConnection
|
||
from app.core.storage import db_session
|
||
from app.core._config import UPLOADS_DIR, BACKEND_DATA_DIR_HOST, BACKEND_LOGS_DIR_HOST
|
||
from app.core.content.content_id import ContentId
|
||
|
||
|
||
async def convert_loop(memory):
|
||
async with db_session() as session:
|
||
# Query for unprocessed encrypted content
|
||
unprocessed_encrypted_content = (await session.execute(select(StoredContent).where(
|
||
and_(
|
||
StoredContent.type == "onchain/content",
|
||
or_(
|
||
StoredContent.btfs_cid == None,
|
||
StoredContent.ipfs_cid == None,
|
||
)
|
||
)
|
||
))).scalars().first()
|
||
if not unprocessed_encrypted_content:
|
||
make_log("ConvertProcess", "No content to convert", level="debug")
|
||
return
|
||
|
||
# Достаем расшифрованный файл
|
||
decrypted_content = (await session.execute(select(StoredContent).where(
|
||
StoredContent.id == unprocessed_encrypted_content.decrypted_content_id
|
||
))).scalars().first()
|
||
if not decrypted_content:
|
||
make_log("ConvertProcess", "Decrypted content not found", level="error")
|
||
return
|
||
|
||
# Определяем путь и расширение входного файла
|
||
# Путь внутри текущего контейнера (доступен Python процессу)
|
||
input_file_container = os.path.join(UPLOADS_DIR, decrypted_content.hash)
|
||
# Хостовый путь (нужен для docker -v маппинга при запуске конвертера)
|
||
input_file_host = os.path.join(BACKEND_DATA_DIR_HOST, decrypted_content.hash)
|
||
input_ext = (unprocessed_encrypted_content.filename.split('.')[-1]
|
||
if '.' in unprocessed_encrypted_content.filename else "mp4")
|
||
|
||
# ==== Новая логика: определение MIME-тип через python-magic ====
|
||
try:
|
||
mime_type = magic.from_file(input_file_container, mime=True)
|
||
except Exception as e:
|
||
make_log("ConvertProcess", f"magic probe failed: {e}", level="warning")
|
||
mime_type = ""
|
||
|
||
if mime_type.startswith("video/"):
|
||
content_kind = "video"
|
||
elif mime_type.startswith("audio/"):
|
||
content_kind = "audio"
|
||
else:
|
||
content_kind = "other"
|
||
|
||
make_log("ConvertProcess", f"Detected content_kind={content_kind}, mime={mime_type}", level="info")
|
||
|
||
# Для прочих типов сохраняем raw копию и выходим
|
||
if content_kind == "other":
|
||
make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Not audio/video, copy just", level="info")
|
||
unprocessed_encrypted_content.btfs_cid = ContentId(
|
||
version=2, content_hash=b58decode(decrypted_content.hash)
|
||
).serialize_v2()
|
||
unprocessed_encrypted_content.ipfs_cid = ContentId(
|
||
version=2, content_hash=b58decode(decrypted_content.hash)
|
||
).serialize_v2()
|
||
unprocessed_encrypted_content.meta = {
|
||
**unprocessed_encrypted_content.meta,
|
||
'converted_content': {
|
||
option_name: decrypted_content.hash for option_name in ['high', 'low', 'low_preview']
|
||
}
|
||
}
|
||
await session.commit()
|
||
return
|
||
|
||
# ==== Конвертация для видео или аудио: оригинальная логика ====
|
||
# Static preview interval in seconds
|
||
preview_interval = [0, 30]
|
||
if unprocessed_encrypted_content.onchain_index in [2]:
|
||
preview_interval = [0, 60]
|
||
|
||
make_log(
|
||
"ConvertProcess",
|
||
f"Processing content {unprocessed_encrypted_content.id} as {content_kind} with preview interval {preview_interval}",
|
||
level="info"
|
||
)
|
||
|
||
# Выбираем опции конвертации для видео и аудио
|
||
if content_kind == "video":
|
||
REQUIRED_CONVERT_OPTIONS = ['high', 'low', 'low_preview']
|
||
else:
|
||
REQUIRED_CONVERT_OPTIONS = ['high', 'low'] # no preview for audio
|
||
|
||
converted_content = {}
|
||
# Директория логов на хосте для docker-контейнера конвертера
|
||
logs_dir_host = BACKEND_LOGS_DIR_HOST
|
||
|
||
for option in REQUIRED_CONVERT_OPTIONS:
|
||
# Set quality parameter and trim option (only for preview)
|
||
if option == "low_preview":
|
||
quality = "low"
|
||
trim_value = f"{preview_interval[0]}-{preview_interval[1]}"
|
||
else:
|
||
quality = option
|
||
trim_value = None
|
||
|
||
# Generate a unique output directory for docker container
|
||
output_uuid = str(uuid.uuid4())
|
||
# Директория вывода в текущем контейнере (та же что и в UPLOADS_DIR, смонтирована с хоста)
|
||
output_dir_container = os.path.join(UPLOADS_DIR, "converter-output", output_uuid)
|
||
os.makedirs(output_dir_container, exist_ok=True)
|
||
# Соответствующая директория на хосте — нужна для docker -v
|
||
output_dir_host = os.path.join(BACKEND_DATA_DIR_HOST, "converter-output", output_uuid)
|
||
|
||
# Build the docker command
|
||
cmd = [
|
||
"docker", "run", "--rm",
|
||
# Важно: источники - это ХОСТОВЫЕ пути, так как docker демону они нужны на хосте
|
||
"-v", f"{input_file_host}:/app/input:ro",
|
||
"-v", f"{output_dir_host}:/app/output",
|
||
"-v", f"{logs_dir_host}:/app/logs",
|
||
"media_converter",
|
||
"--ext", input_ext,
|
||
"--quality", quality
|
||
]
|
||
if trim_value:
|
||
cmd.extend(["--trim", trim_value])
|
||
if content_kind == "audio":
|
||
cmd.append("--audio-only") # audio-only flag
|
||
|
||
process = await asyncio.create_subprocess_exec(
|
||
*cmd,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE
|
||
)
|
||
stdout, stderr = await process.communicate()
|
||
if process.returncode != 0:
|
||
make_log("ConvertProcess", f"Docker conversion failed for option {option}: {stderr.decode()}", level="error")
|
||
return
|
||
|
||
# List files in output dir
|
||
try:
|
||
files = os.listdir(output_dir_container)
|
||
except Exception as e:
|
||
make_log("ConvertProcess", f"Error reading output directory {output_dir}: {e}", level="error")
|
||
return
|
||
|
||
media_files = [f for f in files if f != "output.json"]
|
||
if len(media_files) != 1:
|
||
make_log("ConvertProcess", f"Expected one media file, found {len(media_files)} for option {option}", level="error")
|
||
return
|
||
|
||
output_file = os.path.join(output_dir_container, media_files[0])
|
||
|
||
# Compute SHA256 hash of the output file
|
||
hash_process = await asyncio.create_subprocess_exec(
|
||
"sha256sum", output_file,
|
||
stdout=asyncio.subprocess.PIPE,
|
||
stderr=asyncio.subprocess.PIPE
|
||
)
|
||
hash_stdout, hash_stderr = await hash_process.communicate()
|
||
if hash_process.returncode != 0:
|
||
make_log("ConvertProcess", f"Error computing sha256sum for option {option}: {hash_stderr.decode()}", level="error")
|
||
return
|
||
file_hash = hash_stdout.decode().split()[0]
|
||
file_hash = b58encode(bytes.fromhex(file_hash)).decode()
|
||
|
||
# Save new StoredContent if not exists
|
||
if not (await session.execute(select(StoredContent).where(StoredContent.hash == file_hash))).scalars().first():
|
||
new_content = StoredContent(
|
||
type="local/content_bin",
|
||
hash=file_hash,
|
||
user_id=unprocessed_encrypted_content.user_id,
|
||
filename=media_files[0],
|
||
meta={'encrypted_file_hash': unprocessed_encrypted_content.hash},
|
||
created=datetime.now(),
|
||
)
|
||
session.add(new_content)
|
||
await session.commit()
|
||
|
||
save_path = os.path.join(UPLOADS_DIR, file_hash)
|
||
try:
|
||
os.remove(save_path)
|
||
except FileNotFoundError:
|
||
pass
|
||
|
||
try:
|
||
shutil.move(output_file, save_path)
|
||
except Exception as e:
|
||
make_log("ConvertProcess", f"Error moving output file {output_file} to {save_path}: {e}", level="error")
|
||
return
|
||
|
||
converted_content[option] = file_hash
|
||
|
||
# Process output.json for ffprobe_meta
|
||
output_json_path = os.path.join(output_dir_container, "output.json")
|
||
if os.path.exists(output_json_path) and unprocessed_encrypted_content.meta.get('ffprobe_meta') is None:
|
||
try:
|
||
with open(output_json_path, "r") as f:
|
||
ffprobe_meta = json.load(f)
|
||
unprocessed_encrypted_content.meta = {
|
||
**unprocessed_encrypted_content.meta,
|
||
'ffprobe_meta': ffprobe_meta
|
||
}
|
||
except Exception as e:
|
||
make_log("ConvertProcess", f"Error handling output.json for option {option}: {e}", level="error")
|
||
|
||
# Cleanup output directory
|
||
try:
|
||
shutil.rmtree(output_dir_container)
|
||
except Exception as e:
|
||
make_log("ConvertProcess", f"Error removing output dir {output_dir}: {e}", level="warning")
|
||
|
||
# Finalize original record
|
||
make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Converted content: {converted_content}", level="info")
|
||
unprocessed_encrypted_content.btfs_cid = ContentId(
|
||
version=2, content_hash=b58decode(converted_content['high' if content_kind=='video' else 'low'])
|
||
).serialize_v2()
|
||
unprocessed_encrypted_content.ipfs_cid = ContentId(
|
||
version=2, content_hash=b58decode(converted_content['low'])
|
||
).serialize_v2()
|
||
unprocessed_encrypted_content.meta = {
|
||
**unprocessed_encrypted_content.meta,
|
||
'converted_content': converted_content
|
||
}
|
||
await session.commit()
|
||
|
||
# Notify user if needed
|
||
if not unprocessed_encrypted_content.meta.get('upload_notify_msg_id'):
|
||
wallet_owner_connection = (await session.execute(select(WalletConnection).where(
|
||
WalletConnection.wallet_address == unprocessed_encrypted_content.owner_address
|
||
).order_by(WalletConnection.id.desc()))).scalars().first()
|
||
if wallet_owner_connection:
|
||
wallet_owner_user = wallet_owner_connection.user
|
||
bot = Wrapped_CBotChat(
|
||
memory._client_telegram_bot,
|
||
chat_id=wallet_owner_user.telegram_id,
|
||
user=wallet_owner_user,
|
||
db_session=session
|
||
)
|
||
unprocessed_encrypted_content.meta['upload_notify_msg_id'] = await bot.send_content(session, unprocessed_encrypted_content)
|
||
await session.commit()
|
||
|
||
|
||
async def main_fn(memory):
|
||
make_log("ConvertProcess", "Service started", level="info")
|
||
seqno = 0
|
||
while True:
|
||
try:
|
||
rid = __import__('uuid').uuid4().hex[:8]
|
||
try:
|
||
from app.core.log_context import ctx_rid
|
||
ctx_rid.set(rid)
|
||
except BaseException:
|
||
pass
|
||
make_log("ConvertProcess", "Service running", level="debug", rid=rid)
|
||
await convert_loop(memory)
|
||
await asyncio.sleep(5)
|
||
await send_status("convert_service", f"working (seqno={seqno})")
|
||
seqno += 1
|
||
except BaseException as e:
|
||
make_log("ConvertProcess", f"Error: {e}", level="error", rid=locals().get('rid'))
|
||
await asyncio.sleep(3)
|
||
finally:
|
||
try:
|
||
from app.core.log_context import ctx_rid
|
||
ctx_rid.set(None)
|
||
except BaseException:
|
||
pass
|