diff --git a/Dockerfile b/Dockerfile index 7a2bd96..5722c24 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,24 @@ FROM python:3.9 WORKDIR /app + +# Copy and install Python dependencies COPY requirements.txt . RUN pip install -r requirements.txt + COPY . . -RUN apt-get update && apt-get install -y ffmpeg + +# Install required packages, add Docker's official GPG key and repository, then install Docker CLI +RUN apt-get update && apt-get install -y \ + ca-certificates \ + curl \ + gnupg \ + lsb-release && \ + install -m 0755 -d /etc/apt/keyrings && \ + curl -fsSL https://download.docker.com/linux/ubuntu/gpg -o /etc/apt/keyrings/docker.asc && \ + chmod a+r /etc/apt/keyrings/docker.asc && \ + echo "deb [arch=$(dpkg --print-architecture) signed-by=/etc/apt/keyrings/docker.asc] https://download.docker.com/linux/ubuntu \ + $(. /etc/os-release && echo \"$VERSION_CODENAME\") stable" | tee /etc/apt/sources.list.d/docker.list > /dev/null && \ + apt-get update && \ + apt-get install -y docker-ce-cli + CMD ["python", "app"] diff --git a/app/__main__.py b/app/__main__.py index ac72b54..af88a25 100644 --- a/app/__main__.py +++ b/app/__main__.py @@ -109,6 +109,9 @@ if __name__ == '__main__': elif startup_target == 'license_index': from app.core.background.license_service import main_fn as target_fn time.sleep(7) + elif startup_target == 'convert_service': + from app.core.background.convert_service import main_fn as target_fn + time.sleep(9) startup_fn = startup_fn or target_fn assert startup_fn diff --git a/app/api/routes/_blockchain.py b/app/api/routes/_blockchain.py index ecd2571..2526dac 100644 --- a/app/api/routes/_blockchain.py +++ b/app/api/routes/_blockchain.py @@ -52,17 +52,17 @@ async def s_api_v1_blockchain_send_new_content_message(request): assert field_key in request.json, f"No {field_key} provided" assert field_value(request.json[field_key]), f"Invalid {field_key} provided" - wallet_connection = request.ctx.user.wallet_connection(request.ctx.db_session) - assert wallet_connection, "No wallet connection found" - decrypted_content_cid, err = resolve_content(request.json['content']) assert not err, f"Invalid content CID" + + # Поиск исходного файла загруженного decrypted_content = request.ctx.db_session.query(StoredContent).filter( StoredContent.hash == decrypted_content_cid.content_hash_b58 ).first() assert decrypted_content, "No content locally found" assert decrypted_content.type == "local/content_bin", "Invalid content type" + # Создание фиктивного encrypted_content. Не шифруем для производительности, тк зашифрованная нигде дальше не используется encrypted_content = await create_encrypted_content(request.ctx.db_session, decrypted_content) encrypted_content_cid = encrypted_content.cid @@ -83,7 +83,7 @@ async def s_api_v1_blockchain_send_new_content_message(request): metadata_content = await create_metadata_for_item( request.ctx.db_session, title=content_title, - cover_url=f"{PROJECT_HOST}/api/v1/storage/{image_content_cid.serialize_v2()}" if image_content_cid else None, + cover_url=f"{PROJECT_HOST}/api/v1.5/storage/{image_content_cid.serialize_v2()}" if image_content_cid else None, authors=request.json['authors'], hashtags=request.json['hashtags'] ) @@ -107,7 +107,6 @@ async def s_api_v1_blockchain_send_new_content_message(request): 'hint_type': 'uploadContentTxRequested' } ) - return response.json({ 'address': platform.address.to_string(1, 1, 1), @@ -133,7 +132,7 @@ async def s_api_v1_blockchain_send_new_content_message(request): begin_cell() .store_ref( begin_cell() - .store_bytes(f"{PROJECT_HOST}/api/v1/storage/{metadata_content.cid.serialize_v2(include_accept_type=True)}".encode()) + .store_bytes(f"{PROJECT_HOST}/api/v1.5/storage/{metadata_content.cid.serialize_v2(include_accept_type=True)}".encode()) .end_cell() ) .store_ref( diff --git a/app/api/routes/progressive_storage.py b/app/api/routes/progressive_storage.py index d490743..9605a00 100644 --- a/app/api/routes/progressive_storage.py +++ b/app/api/routes/progressive_storage.py @@ -133,7 +133,7 @@ async def s_api_v1_5_storage_post(request): # If computed hash matches the provided one, the final chunk has been received if computed_hash_b58 == provided_hash_b58: - final_path = os.path.join(UPLOADS_DIR, f"v1.5_{computed_hash_b58}") + final_path = os.path.join(UPLOADS_DIR, f"{computed_hash_b58}") try: os.rename(temp_path, final_path) make_log("uploader_v1.5", f"Final chunk received. File renamed to: {final_path}", level="INFO") @@ -192,11 +192,16 @@ async def s_api_v1_5_storage_post(request): async def s_api_v1_5_storage_get(request, file_hash): make_log("uploader_v1.5", f"Received file retrieval request for hash: {file_hash}", level="INFO") - final_path = os.path.join(UPLOADS_DIR, f"v1.5_{file_hash}") + try: + file_hash = resolve_content(file_hash)[0].content_hash + except: + pass + + final_path = os.path.join(UPLOADS_DIR, f"{file_hash}") if not os.path.exists(final_path): make_log("uploader_v1.5", f"File not found: {final_path}", level="ERROR") return response.json({"error": "File not found"}, status=404) - + db_session = request.ctx.db_session stored = db_session.query(StoredContent).filter_by(hash=file_hash).first() if stored and stored.filename: diff --git a/app/core/_crypto/content.py b/app/core/_crypto/content.py index b779ce1..6a5b446 100644 --- a/app/core/_crypto/content.py +++ b/app/core/_crypto/content.py @@ -11,6 +11,7 @@ from app.core._crypto.cipher import AESCipher from app.core.models.keys import KnownKey from app.core.models.node_storage import StoredContent from app.core.logger import make_log +from base58 import b58decode async def create_new_encryption_key(db_session, user_id: int = None) -> KnownKey: @@ -63,8 +64,7 @@ async def create_encrypted_content( assert decrypted_content.key_id, "Key not assigned" decrypted_path = os.path.join(UPLOADS_DIR, decrypted_content.hash) - async with aiofiles.open(decrypted_path, mode='rb') as file: - decrypted_bin = await file.read() + decrypted_bin = b58decode(decrypted_content.hash) key = decrypted_content.key cipher = AESCipher(key.seed_bin) diff --git a/app/core/background/convert_service.py b/app/core/background/convert_service.py new file mode 100644 index 0000000..0112a24 --- /dev/null +++ b/app/core/background/convert_service.py @@ -0,0 +1,201 @@ +import asyncio +from datetime import datetime +import os +import uuid +import json +import shutil +from base58 import b58encode +from sqlalchemy import and_, or_ +from app.core.models.node_storage import StoredContent +from app.core._utils.send_status import send_status +from app.core.logger import make_log +from app.core.storage import db_session +from app.core._config import UPLOADS_DIR + + +async def convert_loop(): + with db_session() as session: + # Query for unprocessed encrypted content + unprocessed_encrypted_content = session.query(StoredContent).filter( + and_( + StoredContent.type == "onchain/content", + or_( + StoredContent.btfs_cid == None, + StoredContent.ipfs_cid == None, + ) + ) + ).first() + if not unprocessed_encrypted_content: + make_log("ConvertProcess", "No content to convert", level="debug") + return + + make_log("ConvertProcess", f"Processing content {unprocessed_encrypted_content.id}", level="debug") + decrypted_content = session.query(StoredContent).filter( + StoredContent.id == unprocessed_encrypted_content.decrypted_content_id + ).first() + if not decrypted_content: + make_log("ConvertProcess", "Decrypted content not found", level="error") + return + + # Static preview interval in seconds + preview_interval = [0, 30] + + # List of conversion options to process + REQUIRED_CONVERT_OPTIONS = ['high', 'low', 'low_preview'] + converted_content = {} # Mapping: option -> sha256 hash of output file + + # Define input file path and extract its extension from filename + input_file_path = f"/Storage/storedContent/{unprocessed_encrypted_content.hash}" + input_ext = unprocessed_encrypted_content.filename.split('.')[-1] if '.' in unprocessed_encrypted_content.filename else "mp4" + + # Logs directory mapping + logs_dir = "/Storage/logs/converter" + + # Process each conversion option in sequence + for option in REQUIRED_CONVERT_OPTIONS: + # Set quality parameter and trim option (only for preview) + if option == "low_preview": + quality = "low" + trim_value = f"{preview_interval[0]}-{preview_interval[1]}" + else: + quality = option # 'high' or 'low' + trim_value = None + + # Generate a unique output directory for docker container + output_uuid = str(uuid.uuid4()) + output_dir = f"/Storage/storedContent/converter-output/{output_uuid}" + + # Build the docker command with appropriate volume mounts and parameters + cmd = [ + "docker", "run", "--rm", + "-v", f"{input_file_path}:/app/input", + "-v", f"{output_dir}:/app/output", + "-v", f"{logs_dir}:/app/logs", + "media_converter", + "--ext", input_ext, + "--quality", quality + ] + if trim_value: + cmd.extend(["--trim", trim_value]) + + # Run the docker container asynchronously + process = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, stderr = await process.communicate() + if process.returncode != 0: + make_log("ConvertProcess", f"Docker conversion failed for option {option}: {stderr.decode()}", level="error") + return + + # List files in the output directory + try: + files = os.listdir(output_dir.replace("/Storage/storedContent", "/app/data")) + except Exception as e: + make_log("ConvertProcess", f"Error reading output directory {output_dir}: {e}", level="error") + return + + # Exclude 'output.json' and expect exactly one media output file + media_files = [f for f in files if f != "output.json"] + if len(media_files) != 1: + make_log("ConvertProcess", f"Expected one media file, found {len(media_files)} for option {option}", level="error") + return + + output_file = os.path.join(output_dir.replace("/Storage/storedContent", "/app/data"), media_files[0]) + + # Compute SHA256 hash of the output file using async subprocess + hash_process = await asyncio.create_subprocess_exec( + "sha256sum", output_file, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + hash_stdout, hash_stderr = await hash_process.communicate() + if hash_process.returncode != 0: + make_log("ConvertProcess", f"Error computing sha256sum for option {option}: {hash_stderr.decode()}", level="error") + return + file_hash = hash_stdout.decode().split()[0] + file_hash = b58encode(bytes.fromhex(file_hash)).decode() + + if not db_session.query(StoredContent).filter( + StoredContent.hash == file_hash + ).first(): + new_content = StoredContent( + type="local/content_bin", + hash=file_hash, + user_id=unprocessed_encrypted_content.user_id, + filename=media_files[0], + meta={ + 'encrypted_file_hash': unprocessed_encrypted_content.hash, + }, + created=datetime.now(), + ) + session.add(new_content) + session.commit() + + save_path = os.path.join(UPLOADS_DIR, file_hash) + try: + os.remove(save_path) + except FileNotFoundError: + pass + + try: + shutil.move(output_file, save_path) + except Exception as e: + make_log("ConvertProcess", f"Error moving output file {output_file} to {save_path}: {e}", level="error") + return + + converted_content[option] = file_hash + + # Process output.json: read its contents and update meta['ffprobe_meta'] + output_json_path = os.path.join(output_dir.replace("/Storage/storedContent", "/app/data"), "output.json") + if os.path.exists(output_json_path) and unprocessed_encrypted_content.meta.get('ffprobe_meta') is None: + try: + with open(output_json_path, "r") as f: + output_json_content = f.read() + except Exception as e: + make_log("ConvertProcess", f"Error reading output.json for option {option}: {e}", level="error") + return + + try: + ffprobe_meta = json.loads(output_json_content) + except Exception as e: + make_log("ConvertProcess", f"Error parsing output.json for option {option}: {e}", level="error") + return + + unprocessed_encrypted_content.meta = { + **unprocessed_encrypted_content.meta, + 'ffprobe_meta': ffprobe_meta + } + else: + make_log("ConvertProcess", f"output.json not found for option {option}", level="error") + + # Remove the output directory after processing + try: + shutil.rmtree(output_dir.replace("/Storage/storedContent", "/app/data")) + except Exception as e: + make_log("ConvertProcess", f"Error removing output directory {output_dir}: {e}", level="error") + # Continue even if deletion fails + + make_log("ConvertProcess", f"Content {unprocessed_encrypted_content.id} processed. Converted content: {converted_content}", level="info") + + unprocessed_encrypted_content.meta = { + **unprocessed_encrypted_content.meta, + 'converted_content': converted_content + } + session.commit() + + +async def main_fn(memory): + make_log("ConvertProcess", "Service started", level="info") + seqno = 0 + while True: + try: + make_log("ConvertProcess", "Service running", level="debug") + await convert_loop() + await asyncio.sleep(5) + await send_status("convert_service", f"working (seqno={seqno})") + seqno += 1 + except BaseException as e: + make_log("ConvertProcess", f"Error: {e}", level="error") + await asyncio.sleep(3) diff --git a/app/core/models/memory.py b/app/core/models/memory.py index 81ca825..61a8ed0 100644 --- a/app/core/models/memory.py +++ b/app/core/models/memory.py @@ -32,6 +32,10 @@ class Memory: "ton_daemon": { "status": "no status", "timestamp": None + }, + 'convert_service': { + 'status': 'no status', + 'timestamp': None } } diff --git a/app/core/models/node_storage.py b/app/core/models/node_storage.py index fb78d56..eb1e5d4 100644 --- a/app/core/models/node_storage.py +++ b/app/core/models/node_storage.py @@ -30,8 +30,8 @@ class StoredContent(AlchemyBase, AudioContentMixin): user_id = Column(Integer, ForeignKey('users.id'), nullable=True) owner_address = Column(String(1024), nullable=True) - btfs_cid = Column(String(1024), nullable=True) - ipfs_cid = Column(String(1024), nullable=True) + btfs_cid = Column(String(1024), nullable=True) # На самом деле это CID контента в High качестве + ipfs_cid = Column(String(1024), nullable=True) # На самом деле это CID контента в Low качестве telegram_cid = Column(String(1024), nullable=True) codebase_version = Column(Integer, nullable=True) diff --git a/docker-compose.yml b/docker-compose.yml index 0409315..218c559 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -80,3 +80,19 @@ services: maria_db: condition: service_healthy + convert_process: + build: + context: . + dockerfile: Dockerfile + command: python -m app convert_process + env_file: + - .env + links: + - maria_db + volumes: + - /Storage/logs:/app/logs + - /Storage/storedContent:/app/data + depends_on: + maria_db: + condition: service_healthy +