add converter v1
This commit is contained in:
parent
080f000fd2
commit
5c8d9aa864
19
Dockerfile
19
Dockerfile
|
|
@ -1,7 +1,24 @@
|
|||
FROM python:3.9
|
||||
WORKDIR /app
|
||||
|
||||
# Copy and install Python dependencies
|
||||
COPY requirements.txt .
|
||||
RUN pip install -r requirements.txt
|
||||
|
||||
COPY . .
|
||||
RUN apt-get update && apt-get install -y ffmpeg
|
||||
|
||||
# Install required packages, add Docker's official GPG key and repository, then install Docker CLI
|
||||
RUN apt-get update && apt-get install -y \
|
||||
ca-certificates \
|
||||
curl \
|
||||
gnupg \
|
||||
lsb-release && \
|
||||
install -m 0755 -d /etc/apt/keyrings && \
|
||||
curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc && \
|
||||
chmod a+r /etc/apt/keyrings/docker.asc && \
|
||||
echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \
|
||||
$(. /etc/os-release && echo \"$VERSION_CODENAME\") stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null && \
|
||||
apt-get update && \
|
||||
apt-get install -y docker-ce-cli
|
||||
|
||||
CMD ["python", "app"]
|
||||
|
|
|
|||
|
|
@ -109,6 +109,9 @@ if __name__ == '__main__':
|
|||
elif startup_target == 'license_index':
|
||||
from app.core.background.license_service import main_fn as target_fn
|
||||
time.sleep(7)
|
||||
elif startup_target == 'convert_service':
|
||||
from app.core.background.convert_service import main_fn as target_fn
|
||||
time.sleep(9)
|
||||
|
||||
startup_fn = startup_fn or target_fn
|
||||
assert startup_fn
|
||||
|
|
|
|||
|
|
@ -52,17 +52,17 @@ async def s_api_v1_blockchain_send_new_content_message(request):
|
|||
assert field_key in request.json, f"No {field_key} provided"
|
||||
assert field_value(request.json[field_key]), f"Invalid {field_key} provided"
|
||||
|
||||
wallet_connection = request.ctx.user.wallet_connection(request.ctx.db_session)
|
||||
assert wallet_connection, "No wallet connection found"
|
||||
|
||||
decrypted_content_cid, err = resolve_content(request.json['content'])
|
||||
assert not err, f"Invalid content CID"
|
||||
|
||||
# Поиск исходного файла загруженного
|
||||
decrypted_content = request.ctx.db_session.query(StoredContent).filter(
|
||||
StoredContent.hash == decrypted_content_cid.content_hash_b58
|
||||
).first()
|
||||
assert decrypted_content, "No content locally found"
|
||||
assert decrypted_content.type == "local/content_bin", "Invalid content type"
|
||||
|
||||
# Создание фиктивного encrypted_content. Не шифруем для производительности, тк зашифрованная нигде дальше не используется
|
||||
encrypted_content = await create_encrypted_content(request.ctx.db_session, decrypted_content)
|
||||
encrypted_content_cid = encrypted_content.cid
|
||||
|
||||
|
|
@ -83,7 +83,7 @@ async def s_api_v1_blockchain_send_new_content_message(request):
|
|||
metadata_content = await create_metadata_for_item(
|
||||
request.ctx.db_session,
|
||||
title=content_title,
|
||||
cover_url=f"{PROJECT_HOST}/api/v1/storage/{image_content_cid.serialize_v2()}" if image_content_cid else None,
|
||||
cover_url=f"{PROJECT_HOST}/api/v1.5/storage/{image_content_cid.serialize_v2()}" if image_content_cid else None,
|
||||
authors=request.json['authors'],
|
||||
hashtags=request.json['hashtags']
|
||||
)
|
||||
|
|
@ -108,7 +108,6 @@ async def s_api_v1_blockchain_send_new_content_message(request):
|
|||
}
|
||||
)
|
||||
|
||||
|
||||
return response.json({
|
||||
'address': platform.address.to_string(1, 1, 1),
|
||||
'amount': str(int(0.15 * 10 ** 9)),
|
||||
|
|
@ -133,7 +132,7 @@ async def s_api_v1_blockchain_send_new_content_message(request):
|
|||
begin_cell()
|
||||
.store_ref(
|
||||
begin_cell()
|
||||
.store_bytes(f"{PROJECT_HOST}/api/v1/storage/{metadata_content.cid.serialize_v2(include_accept_type=True)}".encode())
|
||||
.store_bytes(f"{PROJECT_HOST}/api/v1.5/storage/{metadata_content.cid.serialize_v2(include_accept_type=True)}".encode())
|
||||
.end_cell()
|
||||
)
|
||||
.store_ref(
|
||||
|
|
|
|||
|
|
@ -133,7 +133,7 @@ async def s_api_v1_5_storage_post(request):
|
|||
|
||||
# If computed hash matches the provided one, the final chunk has been received
|
||||
if computed_hash_b58 == provided_hash_b58:
|
||||
final_path = os.path.join(UPLOADS_DIR, f"v1.5_{computed_hash_b58}")
|
||||
final_path = os.path.join(UPLOADS_DIR, f"{computed_hash_b58}")
|
||||
try:
|
||||
os.rename(temp_path, final_path)
|
||||
make_log("uploader_v1.5", f"Final chunk received. File renamed to: {final_path}", level="INFO")
|
||||
|
|
@ -192,7 +192,12 @@ async def s_api_v1_5_storage_post(request):
|
|||
async def s_api_v1_5_storage_get(request, file_hash):
|
||||
make_log("uploader_v1.5", f"Received file retrieval request for hash: {file_hash}", level="INFO")
|
||||
|
||||
final_path = os.path.join(UPLOADS_DIR, f"v1.5_{file_hash}")
|
||||
try:
|
||||
file_hash = resolve_content(file_hash)[0].content_hash
|
||||
except:
|
||||
pass
|
||||
|
||||
final_path = os.path.join(UPLOADS_DIR, f"{file_hash}")
|
||||
if not os.path.exists(final_path):
|
||||
make_log("uploader_v1.5", f"File not found: {final_path}", level="ERROR")
|
||||
return response.json({"error": "File not found"}, status=404)
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ from app.core._crypto.cipher import AESCipher
|
|||
from app.core.models.keys import KnownKey
|
||||
from app.core.models.node_storage import StoredContent
|
||||
from app.core.logger import make_log
|
||||
from base58 import b58decode
|
||||
|
||||
|
||||
async def create_new_encryption_key(db_session, user_id: int = None) -> KnownKey:
|
||||
|
|
@ -63,8 +64,7 @@ async def create_encrypted_content(
|
|||
assert decrypted_content.key_id, "Key not assigned"
|
||||
|
||||
decrypted_path = os.path.join(UPLOADS_DIR, decrypted_content.hash)
|
||||
async with aiofiles.open(decrypted_path, mode='rb') as file:
|
||||
decrypted_bin = await file.read()
|
||||
decrypted_bin = b58decode(decrypted_content.hash)
|
||||
|
||||
key = decrypted_content.key
|
||||
cipher = AESCipher(key.seed_bin)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,201 @@
|
|||
import asyncio
|
||||
from datetime import datetime
|
||||
import os
|
||||
import uuid
|
||||
import json
|
||||
import shutil
|
||||
from base58 import b58encode
|
||||
from sqlalchemy import and_, or_
|
||||
from app.core.models.node_storage import StoredContent
|
||||
from app.core._utils.send_status import send_status
|
||||
from app.core.logger import make_log
|
||||
from app.core.storage import db_session
|
||||
from app.core._config import UPLOADS_DIR
|
||||
|
||||
|
||||
async def convert_loop():
|
||||
with db_session() as session:
|
||||
# Query for unprocessed encrypted content
|
||||
unprocessed_encrypted_content = session.query(StoredContent).filter(
|
||||
and_(
|
||||
StoredContent.type == "onchain/content",
|
||||
or_(
|
||||
StoredContent.btfs_cid == None,
|
||||
StoredContent.ipfs_cid == None,
|
||||
)
|
||||
)
|
||||
).first()
|
||||
if not unprocessed_encrypted_content:
|
||||
make_log("ConvertProcess", "No content to convert", level="debug")
|
||||
return
|
||||
|
||||
make_log("ConvertProcess", f"Processing content {unprocessed_encrypted_content.id}", level="debug")
|
||||
decrypted_content = session.query(StoredContent).filter(
|
||||
StoredContent.id == unprocessed_encrypted_content.decrypted_content_id
|
||||
).first()
|
||||
if not decrypted_content:
|
||||
make_log("ConvertProcess", "Decrypted content not found", level="error")
|
||||
return
|
||||
|
||||
# Static preview interval in seconds
|
||||
preview_interval = [0, 30]
|
||||
|
||||
# List of conversion options to process
|
||||
REQUIRED_CONVERT_OPTIONS = ['high', 'low', 'low_preview']
|
||||
converted_content = {} # Mapping: option -> sha256 hash of output file
|
||||
|
||||
# Define input file path and extract its extension from filename
|
||||
input_file_path = f"/Storage/storedContent/{unprocessed_encrypted_content.hash}"
|
||||
input_ext = unprocessed_encrypted_content.filename.split('.')[-1] if '.' in unprocessed_encrypted_content.filename else "mp4"
|
||||
|
||||
# Logs directory mapping
|
||||
logs_dir = "/Storage/logs/converter"
|
||||
|
||||
# Process each conversion option in sequence
|
||||
for option in REQUIRED_CONVERT_OPTIONS:
|
||||
# Set quality parameter and trim option (only for preview)
|
||||
if option == "low_preview":
|
||||
quality = "low"
|
||||
trim_value = f"{preview_interval[0]}-{preview_interval[1]}"
|
||||
else:
|
||||
quality = option # 'high' or 'low'
|
||||
trim_value = None
|
||||
|
||||
# Generate a unique output directory for docker container
|
||||
output_uuid = str(uuid.uuid4())
|
||||
output_dir = f"/Storage/storedContent/converter-output/{output_uuid}"
|
||||
|
||||
# Build the docker command with appropriate volume mounts and parameters
|
||||
cmd = [
|
||||
"docker", "run", "--rm",
|
||||
"-v", f"{input_file_path}:/app/input",
|
||||
"-v", f"{output_dir}:/app/output",
|
||||
"-v", f"{logs_dir}:/app/logs",
|
||||
"media_converter",
|
||||
"--ext", input_ext,
|
||||
"--quality", quality
|
||||
]
|
||||
if trim_value:
|
||||
cmd.extend(["--trim", trim_value])
|
||||
|
||||
# Run the docker container asynchronously
|
||||
process = await asyncio.create_subprocess_exec(
|
||||
*cmd,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
stdout, stderr = await process.communicate()
|
||||
if process.returncode != 0:
|
||||
make_log("ConvertProcess", f"Docker conversion failed for option {option}: {stderr.decode()}", level="error")
|
||||
return
|
||||
|
||||
# List files in the output directory
|
||||
try:
|
||||
files = os.listdir(output_dir.replace("/Storage/storedContent", "/app/data"))
|
||||
except Exception as e:
|
||||
make_log("ConvertProcess", f"Error reading output directory {output_dir}: {e}", level="error")
|
||||
return
|
||||
|
||||
# Exclude 'output.json' and expect exactly one media output file
|
||||
media_files = [f for f in files if f != "output.json"]
|
||||
if len(media_files) != 1:
|
||||
make_log("ConvertProcess", f"Expected one media file, found {len(media_files)} for option {option}", level="error")
|
||||
return
|
||||
|
||||
output_file = os.path.join(output_dir.replace("/Storage/storedContent", "/app/data"), media_files[0])
|
||||
|
||||
# Compute SHA256 hash of the output file using async subprocess
|
||||
hash_process = await asyncio.create_subprocess_exec(
|
||||
"sha256sum", output_file,
|
||||
stdout=asyncio.subprocess.PIPE,
|
||||
stderr=asyncio.subprocess.PIPE
|
||||
)
|
||||
hash_stdout, hash_stderr = await hash_process.communicate()
|
||||
if hash_process.returncode != 0:
|
||||
make_log("ConvertProcess", f"Error computing sha256sum for option {option}: {hash_stderr.decode()}", level="error")
|
||||
return
|
||||
file_hash = hash_stdout.decode().split()[0]
|
||||
file_hash = b58encode(bytes.fromhex(file_hash)).decode()
|
||||
|
||||
if not db_session.query(StoredContent).filter(
|
||||
StoredContent.hash == file_hash
|
||||
).first():
|
||||
new_content = StoredContent(
|
||||
type="local/content_bin",
|
||||
hash=file_hash,
|
||||
user_id=unprocessed_encrypted_content.user_id,
|
||||
filename=media_files[0],
|
||||
meta={
|
||||
'encrypted_file_hash': unprocessed_encrypted_content.hash,
|
||||
},
|
||||
created=datetime.now(),
|
||||
)
|
||||
session.add(new_content)
|
||||
session.commit()
|
||||
|
||||
save_path = os.path.join(UPLOADS_DIR, file_hash)
|
||||
try:
|
||||
os.remove(save_path)
|
||||
except FileNotFoundError:
|
||||
pass
|
||||
|
||||
try:
|
||||
shutil.move(output_file, save_path)
|
||||
except Exception as e:
|
||||
make_log("ConvertProcess", f"Error moving output file {output_file} to {save_path}: {e}", level="error")
|
||||
return
|
||||
|
||||
converted_content[option] = file_hash
|
||||
|
||||
# Process output.json: read its contents and update meta['ffprobe_meta']
|
||||
output_json_path = os.path.join(output_dir.replace("/Storage/storedContent", "/app/data"), "output.json")
|
||||
if os.path.exists(output_json_path) and unprocessed_encrypted_content.meta.get('ffprobe_meta') is None:
|
||||
try:
|
||||
with open(output_json_path, "r") as f:
|
||||
output_json_content = f.read()
|
||||
except Exception as e:
|
||||
make_log("ConvertProcess", f"Error reading output.json for option {option}: {e}", level="error")
|
||||
return
|
||||
|
||||
try:
|
||||
ffprobe_meta = json.loads(output_json_content)
|
||||
except Exception as e:
|
||||
make_log("ConvertProcess", f"Error parsing output.json for option {option}: {e}", level="error")
|
||||
return
|
||||
|
||||
unprocessed_encrypted_content.meta = {
|
||||
**unprocessed_encrypted_content.meta,
|
||||
'ffprobe_meta': ffprobe_meta
|
||||
}
|
||||
else:
|
||||
make_log("ConvertProcess", f"output.json not found for option {option}", level="error")
|
||||
|
||||
# Remove the output directory after processing
|
||||
try:
|
||||
shutil.rmtree(output_dir.replace("/Storage/storedContent", "/app/data"))
|
||||
except Exception as e:
|
||||
make_log("ConvertProcess", f"Error removing output directory {output_dir}: {e}", level="error")
|
||||
# Continue even if deletion fails
|
||||
|
||||
make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Converted content: {converted_content}", level="info")
|
||||
|
||||
unprocessed_encrypted_content.meta = {
|
||||
**unprocessed_encrypted_content.meta,
|
||||
'converted_content': converted_content
|
||||
}
|
||||
session.commit()
|
||||
|
||||
|
||||
async def main_fn(memory):
|
||||
make_log("ConvertProcess", "Service started", level="info")
|
||||
seqno = 0
|
||||
while True:
|
||||
try:
|
||||
make_log("ConvertProcess", "Service running", level="debug")
|
||||
await convert_loop()
|
||||
await asyncio.sleep(5)
|
||||
await send_status("convert_service", f"working (seqno={seqno})")
|
||||
seqno += 1
|
||||
except BaseException as e:
|
||||
make_log("ConvertProcess", f"Error: {e}", level="error")
|
||||
await asyncio.sleep(3)
|
||||
|
|
@ -32,6 +32,10 @@ class Memory:
|
|||
"ton_daemon": {
|
||||
"status": "no status",
|
||||
"timestamp": None
|
||||
},
|
||||
'convert_service': {
|
||||
'status': 'no status',
|
||||
'timestamp': None
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -30,8 +30,8 @@ class StoredContent(AlchemyBase, AudioContentMixin):
|
|||
user_id = Column(Integer, ForeignKey('users.id'), nullable=True)
|
||||
owner_address = Column(String(1024), nullable=True)
|
||||
|
||||
btfs_cid = Column(String(1024), nullable=True)
|
||||
ipfs_cid = Column(String(1024), nullable=True)
|
||||
btfs_cid = Column(String(1024), nullable=True) # На самом деле это CID контента в High качестве
|
||||
ipfs_cid = Column(String(1024), nullable=True) # На самом деле это CID контента в Low качестве
|
||||
telegram_cid = Column(String(1024), nullable=True)
|
||||
|
||||
codebase_version = Column(Integer, nullable=True)
|
||||
|
|
|
|||
|
|
@ -80,3 +80,19 @@ services:
|
|||
maria_db:
|
||||
condition: service_healthy
|
||||
|
||||
convert_process:
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
command: python -m app convert_process
|
||||
env_file:
|
||||
- .env
|
||||
links:
|
||||
- maria_db
|
||||
volumes:
|
||||
- /Storage/logs:/app/logs
|
||||
- /Storage/storedContent:/app/data
|
||||
depends_on:
|
||||
maria_db:
|
||||
condition: service_healthy
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue