import os import subprocess import asyncio from uuid import uuid4 from datetime import datetime from mimetypes import guess_type from base64 import b64decode import aiofiles from base58 import b58encode from sanic import response from app.core.logger import make_log from app.core.models.node_storage import StoredContent from app.core._config import UPLOADS_DIR from app.core._utils.resolve_content import resolve_content # POST /api/v1.5/storage async def s_api_v1_5_storage_post(request): # Log the receipt of a chunk upload request make_log("uploader_v1.5", "Received chunk upload request", level="INFO") # Get the provided file hash from header (hex format) provided_hash_hex = request.headers.get("X-Content-SHA256") if not provided_hash_hex: make_log("uploader_v1.5", "Missing X-Content-SHA256 header", level="ERROR") return response.json({"error": "Missing X-Content-SHA256 header"}, status=400) try: provided_hash_bytes = bytes.fromhex(provided_hash_hex) provided_hash_b58 = b58encode(provided_hash_bytes).decode() make_log("uploader_v1.5", f"Provided hash (base58): {provided_hash_b58}", level="INFO") except Exception as e: make_log("uploader_v1.5", f"Invalid X-Content-SHA256 header format: {e}", level="ERROR") return response.json({"error": "Invalid X-Content-SHA256 header format"}, status=400) # Get the provided file name from header and decode it from base64 provided_filename_b64 = request.headers.get("X-File-Name") if not provided_filename_b64: make_log("uploader_v1.5", "Missing X-File-Name header", level="ERROR") return response.json({"error": "Missing X-File-Name header"}, status=400) try: provided_filename = b64decode(provided_filename_b64).decode("utf-8") except Exception as e: make_log("uploader_v1.5", f"Invalid X-File-Name header: {e}", level="ERROR") return response.json({"error": "Invalid X-File-Name header"}, status=400) # Get X-Chunk-Start header (must be provided) and parse it as integer chunk_start_header = request.headers.get("X-Chunk-Start") if chunk_start_header is None: make_log("uploader_v1.5", "Missing X-Chunk-Start header", level="ERROR") return response.json({"error": "Missing X-Chunk-Start header"}, status=400) try: chunk_start = int(chunk_start_header) except Exception as e: make_log("uploader_v1.5", f"Invalid X-Chunk-Start header: {e}", level="ERROR") return response.json({"error": "Invalid X-Chunk-Start header"}, status=400) # Enforce maximum chunk size (80 MB) using Content-Length header if provided max_chunk_size = 80 * 1024 * 1024 # 80 MB content_length = request.headers.get("Content-Length") if content_length is not None: try: content_length = int(content_length) if content_length > max_chunk_size: make_log("uploader_v1.5", f"Chunk size {content_length} exceeds maximum allowed", level="ERROR") return response.json({"error": "Chunk size exceeds maximum allowed (80 MB)"}, status=400) except: pass # Determine if this is a new upload or a continuation (resume) upload_id = request.headers.get("X-Upload-ID") is_new_upload = False if not upload_id: # New upload session: generate a new uuid upload_id = str(uuid4()) is_new_upload = True make_log("uploader_v1.5", f"Starting new upload session with ID: {upload_id}", level="INFO") else: make_log("uploader_v1.5", f"Resuming upload session with ID: {upload_id}", level="INFO") # Determine the temporary file path based on upload_id temp_path = os.path.join(UPLOADS_DIR, f"v1.5_upload_{upload_id}") # Check current size of the temporary file (if it exists) current_size = 0 if os.path.exists(temp_path): current_size = os.path.getsize(temp_path) # If the provided chunk_start is less than current_size, the chunk is already received if chunk_start < current_size: make_log("uploader_v1.5", f"Chunk starting at {chunk_start} already received, current size: {current_size}", level="INFO") return response.json({"upload_id": upload_id, "current_size": current_size}) elif chunk_start > current_size: make_log("uploader_v1.5", f"Chunk start {chunk_start} does not match current file size {current_size}", level="ERROR") return response.json({"error": "Chunk start does not match current file size"}, status=400) # Append the received chunk to the temporary file try: mode = 'wb' if is_new_upload else 'ab' async with aiofiles.open(temp_path, mode) as out_file: data = request.body # Get the full body if available if data: await out_file.write(data) # Write the whole body at once else: async for chunk in request.stream: await out_file.write(chunk) new_size = os.path.getsize(temp_path) make_log("uploader_v1.5", f"Appended chunk. New file size: {new_size}", level="INFO") except Exception as e: make_log("uploader_v1.5", f"Error saving chunk: {e}", level="ERROR") return response.json({"error": "Failed to save chunk"}, status=500) # Compute the SHA256 hash of the temporary file using subprocess try: proc = await asyncio.create_subprocess_exec( 'sha256sum', temp_path, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode().strip() make_log("uploader_v1.5", f"sha256sum error: {error_msg}", level="ERROR") return response.json({"error": "Failed to compute file hash"}, status=500) computed_hash_hex = stdout.decode().split()[0].strip() computed_hash_bytes = bytes.fromhex(computed_hash_hex) computed_hash_b58 = b58encode(computed_hash_bytes).decode() make_log("uploader_v1.5", f"Computed hash (base58): {computed_hash_b58}", level="INFO") except Exception as e: make_log("uploader_v1.5", f"Error computing file hash: {e}", level="ERROR") return response.json({"error": "Error computing file hash"}, status=500) # If computed hash matches the provided one, the final chunk has been received if computed_hash_b58 == provided_hash_b58: final_path = os.path.join(UPLOADS_DIR, f"v1.5_{computed_hash_b58}") try: os.rename(temp_path, final_path) make_log("uploader_v1.5", f"Final chunk received. File renamed to: {final_path}", level="INFO") except Exception as e: make_log("uploader_v1.5", f"Error renaming file: {e}", level="ERROR") return response.json({"error": "Failed to finalize file storage"}, status=500) db_session = request.ctx.db_session existing = db_session.query(StoredContent).filter_by(hash=computed_hash_b58).first() if existing: make_log("uploader_v1.5", f"File with hash {computed_hash_b58} already exists in DB", level="INFO") serialized_v2 = existing.serialize_v2() serialized_v1 = existing.serialize_v1() return response.json({ "upload_id": upload_id, "content_sha256": computed_hash_b58, "content_id": serialized_v2, "content_id_v1": serialized_v1, "content_url": f"dmy://storage?cid={serialized_v2}", }) try: user_id = request.ctx.user.id if request.ctx.user else None new_content = StoredContent( type='local/content_bin', hash=computed_hash_b58, user_id=user_id, filename=provided_filename, key_id=None, meta={}, created=datetime.utcnow() ) db_session.add(new_content) db_session.commit() make_log("uploader_v1.5", f"New file stored and indexed for user {user_id} with hash {computed_hash_b58}", level="INFO") except Exception as e: make_log("uploader_v1.5", f"Database error: {e}", level="ERROR") return response.json({"error": "Database error"}, status=500) serialized_v2 = new_content.serialize_v2() serialized_v1 = new_content.serialize_v1() return response.json({ "upload_id": upload_id, "content_sha256": computed_hash_b58, "content_id": serialized_v2, "content_id_v1": serialized_v1, "content_url": f"dmy://storage?cid={serialized_v2}", }) else: # Not the final chunk yet – return current upload status return response.json({"upload_id": upload_id, "current_size": os.path.getsize(temp_path)}) # GET /api/v1.5/storage/ async def s_api_v1_5_storage_get(request, file_hash): # Log the file retrieval request make_log("uploader_v1.5", f"Received file retrieval request for hash: {file_hash}", level="INFO") final_path = os.path.join(UPLOADS_DIR, f"v1.5_{file_hash}") if not os.path.exists(final_path): make_log("uploader_v1.5", f"File not found: {final_path}", level="ERROR") return response.json({"error": "File not found"}, status=404) db_session = request.ctx.db_session stored = db_session.query(StoredContent).filter_by(hash=file_hash).first() if stored and stored.filename: filename_for_mime = stored.filename else: filename_for_mime = final_path mime_type, _ = guess_type(filename_for_mime) if not mime_type: mime_type = "application/octet-stream" file_size = os.path.getsize(final_path) range_header = request.headers.get("Range") if range_header: make_log("uploader_v1.5", f"Processing Range header: {range_header}", level="INFO") range_spec = range_header.strip().lower() if not range_spec.startswith("bytes="): make_log("uploader_v1.5", f"Invalid Range header: {range_header}", level="ERROR") return response.json({"error": "Invalid Range header"}, status=400) range_spec = range_spec[len("bytes="):] range_parts = [part.strip() for part in range_spec.split(',')] parsed_ranges = [] try: for part in range_parts: if '-' not in part: raise ValueError("Invalid range format") start_str, end_str = part.split('-', 1) if start_str == "": suffix_length = int(end_str) if suffix_length > file_size: start = 0 else: start = file_size - suffix_length end = file_size - 1 else: start = int(start_str) if end_str == "": end = file_size - 1 else: end = int(end_str) if start > end or end >= file_size: raise ValueError("Requested Range Not Satisfiable") parsed_ranges.append((start, end)) except Exception as e: make_log("uploader_v1.5", f"Invalid Range header: {range_header} - {e}", level="ERROR") return response.json({"error": "Invalid Range header"}, status=400) if len(parsed_ranges) == 1: start, end = parsed_ranges[0] content_length = end - start + 1 headers = { "Content-Range": f"bytes {start}-{end}/{file_size}", "Accept-Ranges": "bytes", "Content-Length": str(content_length), "Content-Type": mime_type, } async def stream_file_range(): make_log("uploader_v1.5", f"Starting to stream file from byte {start} to {end}", level="INFO") async with aiofiles.open(final_path, mode='rb') as f: await f.seek(start) remaining = content_length chunk_size = 1024 * 1024 while remaining > 0: read_size = min(chunk_size, remaining) data = await f.read(read_size) if not data: break remaining -= len(data) yield data make_log("uploader_v1.5", f"Finished streaming file: {final_path}", level="INFO") return response.stream(stream_file_range, status=206, headers=headers) else: boundary = uuid4().hex headers = { "Content-Type": f"multipart/byteranges; boundary={boundary}", "Accept-Ranges": "bytes", } async def stream_multipart(): for start, end in parsed_ranges: part_header = ( f"--{boundary}\r\n" f"Content-Type: {mime_type}\r\n" f"Content-Range: bytes {start}-{end}/{file_size}\r\n" f"\r\n" ) yield part_header.encode() part_length = end - start + 1 async with aiofiles.open(final_path, mode='rb') as f: await f.seek(start) remaining = part_length chunk_size = 1024 * 1024 while remaining > 0: read_size = min(chunk_size, remaining) data = await f.read(read_size) if not data: break remaining -= len(data) yield data yield b"\r\n" yield f"--{boundary}--\r\n".encode() return response.stream(stream_multipart, status=206, headers=headers) else: make_log("uploader_v1.5", f"Returning full file for video/audio: {final_path}", level="INFO") return await response.file(final_path, mime_type=mime_type)