uploader-bot/app/core/background/convert_service.py

222 lines
8.7 KiB
Python

import asyncio
from datetime import datetime
import os
import uuid
import json
import shutil
import magic # python-magic
from base58 import b58decode, b58encode
from sqlalchemy import and_, or_
from app.core.models.node_storage import StoredContent
from app.core.models._telegram import Wrapped_CBotChat
from app.core._utils.send_status import send_status
from app.core.logger import make_log
from app.core.models import WalletConnection
from app.core.storage import db_session
from app.core._config import UPLOADS_DIR
from app.core.content.content_id import ContentId
import converter_module # наш модуль для конвертации
async def convert_loop(memory):
with db_session() as session:
# 1) Найти несанкционированный контент
item = session.query(StoredContent).filter(
and_(
StoredContent.type == "onchain/content",
or_(
StoredContent.btfs_cid == None,
StoredContent.ipfs_cid == None,
)
)
).first()
if not item:
make_log("ConvertProcess", "No content to convert", level="debug")
return
# 2) Достать расшифрованный файл
decrypted = session.query(StoredContent).filter(
StoredContent.id == item.decrypted_content_id
).first()
if not decrypted:
make_log("ConvertProcess", "Decrypted content not found", level="error")
return
input_path = f"/Storage/storedContent/{decrypted.hash}"
filename = item.filename
ext = filename.split('.')[-1] if '.' in filename else ""
# 3) Определяем MIME-тип через python-magic
try:
mime = magic.from_file(input_path, mime=True)
except Exception as e:
make_log("ConvertProcess", f"magic probe failed: {e}", level="warning")
mime = ""
if mime.startswith("video/"):
kind = "video"
elif mime.startswith("audio/"):
kind = "audio"
else:
kind = "other"
make_log("ConvertProcess", f"Detected kind={kind}, mime={mime}", level="info")
# 4) Если не видео и не аудио — сохраняем raw copy и выходим
if kind == "other":
raw_hash = item.hash
raw = StoredContent(
type="local/content_raw",
hash=raw_hash,
user_id=item.user_id,
filename=filename,
meta={'source': 'raw_copy'},
created=datetime.now(),
)
session.add(raw)
session.commit()
# Копируем файл в UPLOADS_DIR
dst = os.path.join(UPLOADS_DIR, raw_hash)
try:
os.remove(dst)
except FileNotFoundError:
pass
shutil.copy2(input_path, dst)
# Обновляем оригинальный объект
item.btfs_cid = ContentId(version=2, content_hash=b58decode(raw_hash)).serialize_v2()
item.ipfs_cid = ContentId(version=2, content_hash=b58decode(raw_hash)).serialize_v2()
item.meta = {**item.meta, 'converted_content': {'raw': raw_hash}}
session.commit()
make_log("ConvertProcess", f"Raw content saved and CIDs set for {raw_hash}", level="info")
return
# 5) Задаём опции конвертации для видео/аудио
if kind == "video":
options = ['high', 'low', 'low_preview']
else: # audio
options = ['high_audio', 'low_audio']
# Preview interval
preview_interval = [0, 30]
if item.onchain_index == 2:
preview_interval = [0, 60]
converted = {}
for opt in options:
# quality и trim
if opt.endswith("_preview"):
quality = opt.replace("_preview", "")
trim = preview_interval
else:
quality = opt.replace("_audio", "")
trim = None
# уникальная папка вывода
uid = str(uuid.uuid4())
out_dir = f"/Storage/storedContent/converter-output/{uid}"
os.makedirs(out_dir, exist_ok=True)
# ==== Вызов конвертера ====
# converter_module.convert(input_path, out_dir, ext=ext,
# quality=quality, trim=trim, audio_only=(kind=="audio"))
# Здесь предполагаем, что convert возвращает имя файла результата.
try:
result_fname = await converter_module.convert(
input=input_path,
output_dir=out_dir,
ext=ext,
quality=quality,
trim=trim,
audio_only=(kind == "audio")
)
except Exception as e:
make_log("ConvertProcess", f"Conversion failed {opt}: {e}", level="error")
return
out_file = os.path.join(out_dir, result_fname)
# 6) Считаем sha256 и b58
proc = await asyncio.create_subprocess_exec(
"sha256sum", out_file,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
so, se = await proc.communicate()
if proc.returncode != 0:
make_log("ConvertProcess", f"sha256sum error: {se.decode()}", level="error")
return
sha_hex = so.decode().split()[0]
h58 = b58encode(bytes.fromhex(sha_hex)).decode()
# 7) Добавляем StoredContent, если нужно
if not session.query(StoredContent).filter(StoredContent.hash == h58).first():
new = StoredContent(
type="local/content_bin",
hash=h58,
user_id=item.user_id,
filename=result_fname,
meta={'encrypted_file_hash': item.hash},
created=datetime.now(),
)
session.add(new)
session.commit()
# 8) Перемещаем в UPLOADS_DIR
dst = os.path.join(UPLOADS_DIR, h58)
try:
os.remove(dst)
except FileNotFoundError:
pass
shutil.move(out_file, dst)
converted[opt] = h58
# 9) optional: metadata from converter_module
meta_info = converter_module.get_meta(out_dir) # допустим, так
if meta_info and 'ffprobe_meta' not in item.meta:
item.meta['ffprobe_meta'] = meta_info
# 10) очистка
shutil.rmtree(out_dir, ignore_errors=True)
# 11) Обновляем оригинальный объект после всех опций
make_log("ConvertProcess", f"Converted: {converted}", level="info")
main_high = 'high' if kind=='video' else 'high_audio'
main_low = 'low' if kind=='video' else 'low_audio'
item.btfs_cid = ContentId(version=2,
content_hash=b58decode(converted[main_high])).serialize_v2()
item.ipfs_cid = ContentId(version=2,
content_hash=b58decode(converted[main_low])).serialize_v2()
item.meta = {**item.meta, 'converted_content': converted}
session.commit()
# 12) Отправляем уведомление пользователю
if not item.meta.get('upload_notify_msg_id'):
wc = session.query(WalletConnection).filter(
WalletConnection.wallet_address == item.owner_address
).order_by(WalletConnection.id.desc()).first()
if wc:
bot = Wrapped_CBotChat(memory._client_telegram_bot,
chat_id=wc.user.telegram_id,
user=wc.user, db_session=session)
item.meta['upload_notify_msg_id'] = await bot.send_content(session, item)
session.commit()
async def main_fn(memory):
make_log("ConvertProcess", "Service started", level="info")
seqno = 0
while True:
try:
make_log("ConvertProcess", "Service running", level="debug")
await convert_loop(memory)
await asyncio.sleep(5)
await send_status("convert_service", f"working (seqno={seqno})")
seqno += 1
except BaseException as e:
make_log("ConvertProcess", f"Error: {e}", level="error")
await asyncio.sleep(3)