diff --git a/app/api/middleware.py b/app/api/middleware.py index b29e3f4..4837e85 100644 --- a/app/api/middleware.py +++ b/app/api/middleware.py @@ -15,7 +15,7 @@ from datetime import datetime, timedelta def attach_headers(response): response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" - response.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-file-name, x-content-sha256" + response.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-file-name, x-content-sha256, x-chunk-start, x-upload-id" response.headers["Access-Control-Allow-Credentials"] = "true" return response diff --git a/app/api/routes/progressive_storage.py b/app/api/routes/progressive_storage.py index 72b239b..9521095 100644 --- a/app/api/routes/progressive_storage.py +++ b/app/api/routes/progressive_storage.py @@ -4,10 +4,10 @@ import asyncio from uuid import uuid4 from datetime import datetime from mimetypes import guess_type +from base64 import b64decode import aiofiles from base58 import b58encode -from base64 import b64decode from sanic import response from app.core.logger import make_log @@ -18,8 +18,8 @@ from app.core._utils.resolve_content import resolve_content # POST /api/v1.5/storage async def s_api_v1_5_storage_post(request): - # Log the start of the file upload process - make_log("uploader_v1.5", "Received file upload request", level="INFO") + # Log the receipt of a chunk upload request + make_log("uploader_v1.5", "Received chunk upload request", level="INFO") # Get the provided file hash from header (hex format) provided_hash_hex = request.headers.get("X-Content-SHA256") @@ -34,38 +34,80 @@ async def s_api_v1_5_storage_post(request): make_log("uploader_v1.5", f"Invalid X-Content-SHA256 header format: {e}", level="ERROR") return response.json({"error": "Invalid X-Content-SHA256 header format"}, status=400) - provided_filename = request.headers.get("X-File-Name") - provided_filename = b64decode(provided_filename).decode("utf-8") - - # Check if the file already exists in the database - db_session = request.ctx.db_session - existing = db_session.query(StoredContent).filter_by(hash=provided_hash_b58).first() - if existing: - make_log("uploader_v1.5", f"File with hash {provided_hash_b58} already exists in DB", level="INFO") - serialized_v2 = existing.serialize_v2() # returns a string - serialized_v1 = existing.serialize_v1() # returns a string - return response.json({ - "content_sha256": provided_hash_b58, - "content_id": serialized_v2, - "content_id_v1": serialized_v1, - "content_url": f"dmy://storage?cid={serialized_v2}", - }) - - # Save uploaded file to a temporary location using streaming - temp_filename = f"v1.5_upload_{uuid4()}" - temp_path = os.path.join(UPLOADS_DIR, temp_filename) - make_log("uploader_v1.5", f"Saving file to temporary location: {temp_path}", level="INFO") + # Get the provided file name from header and decode it from base64 + provided_filename_b64 = request.headers.get("X-File-Name") + if not provided_filename_b64: + make_log("uploader_v1.5", "Missing X-File-Name header", level="ERROR") + return response.json({"error": "Missing X-File-Name header"}, status=400) try: - async with aiofiles.open(temp_path, 'wb') as out_file: + provided_filename = b64decode(provided_filename_b64).decode("utf-8") + except Exception as e: + make_log("uploader_v1.5", f"Invalid X-File-Name header: {e}", level="ERROR") + return response.json({"error": "Invalid X-File-Name header"}, status=400) + + # Get X-Chunk-Start header (must be provided) and parse it as integer + chunk_start_header = request.headers.get("X-Chunk-Start") + if chunk_start_header is None: + make_log("uploader_v1.5", "Missing X-Chunk-Start header", level="ERROR") + return response.json({"error": "Missing X-Chunk-Start header"}, status=400) + try: + chunk_start = int(chunk_start_header) + except Exception as e: + make_log("uploader_v1.5", f"Invalid X-Chunk-Start header: {e}", level="ERROR") + return response.json({"error": "Invalid X-Chunk-Start header"}, status=400) + + # Enforce maximum chunk size (80 MB) using Content-Length header if provided + max_chunk_size = 80 * 1024 * 1024 # 80 MB + content_length = request.headers.get("Content-Length") + if content_length is not None: + try: + content_length = int(content_length) + if content_length > max_chunk_size: + make_log("uploader_v1.5", f"Chunk size {content_length} exceeds maximum allowed", level="ERROR") + return response.json({"error": "Chunk size exceeds maximum allowed (80 MB)"}, status=400) + except: + pass + + # Determine if this is a new upload or a continuation (resume) + upload_id = request.headers.get("X-Upload-ID") + is_new_upload = False + if not upload_id: + # New upload session: generate a new uuid + upload_id = str(uuid4()) + is_new_upload = True + make_log("uploader_v1.5", f"Starting new upload session with ID: {upload_id}", level="INFO") + else: + make_log("uploader_v1.5", f"Resuming upload session with ID: {upload_id}", level="INFO") + + # Determine the temporary file path based on upload_id + temp_path = os.path.join(UPLOADS_DIR, f"temp_{upload_id}") + + # Check current size of the temporary file (if it exists) + current_size = 0 + if os.path.exists(temp_path): + current_size = os.path.getsize(temp_path) + + # If the provided chunk_start is less than current_size, the chunk is already received + if chunk_start < current_size: + make_log("uploader_v1.5", f"Chunk starting at {chunk_start} already received, current size: {current_size}", level="INFO") + return response.json({"upload_id": upload_id, "current_size": current_size}) + elif chunk_start > current_size: + make_log("uploader_v1.5", f"Chunk start {chunk_start} does not match current file size {current_size}", level="ERROR") + return response.json({"error": "Chunk start does not match current file size"}, status=400) + + # Append the received chunk to the temporary file + try: + mode = 'wb' if is_new_upload else 'ab' + async with aiofiles.open(temp_path, mode) as out_file: async for chunk in request.stream: await out_file.write(chunk) - make_log("uploader_v1.5", f"Finished saving file to temporary location: {temp_path}", level="INFO") + new_size = os.path.getsize(temp_path) + make_log("uploader_v1.5", f"Appended chunk. New file size: {new_size}", level="INFO") except Exception as e: - make_log("uploader_v1.5", f"Error saving uploaded file: {e}", level="ERROR") - return response.json({"error": "Failed to save uploaded file"}, status=500) - - # Compute file SHA256 using subprocess to avoid loading the file into memory - make_log("uploader_v1.5", f"Computing file hash using subprocess for file: {temp_path}", level="INFO") + make_log("uploader_v1.5", f"Error saving chunk: {e}", level="ERROR") + return response.json({"error": "Failed to save chunk"}, status=500) + + # Compute the SHA256 hash of the temporary file using subprocess try: proc = await asyncio.create_subprocess_exec( 'sha256sum', temp_path, @@ -77,7 +119,6 @@ async def s_api_v1_5_storage_post(request): error_msg = stderr.decode().strip() make_log("uploader_v1.5", f"sha256sum error: {error_msg}", level="ERROR") return response.json({"error": "Failed to compute file hash"}, status=500) - # Parse output: " " computed_hash_hex = stdout.decode().split()[0].strip() computed_hash_bytes = bytes.fromhex(computed_hash_hex) computed_hash_b58 = b58encode(computed_hash_bytes).decode() @@ -85,70 +126,60 @@ async def s_api_v1_5_storage_post(request): except Exception as e: make_log("uploader_v1.5", f"Error computing file hash: {e}", level="ERROR") return response.json({"error": "Error computing file hash"}, status=500) - - # Verify that the computed hash matches the provided hash - if computed_hash_b58 != provided_hash_b58: - make_log("uploader_v1.5", f"Hash mismatch: provided {provided_hash_b58} vs computed {computed_hash_b58}", level="ERROR") + + # If computed hash matches the provided one, the final chunk has been received + if computed_hash_b58 == provided_hash_b58: + final_path = os.path.join(UPLOADS_DIR, f"v1.5_{computed_hash_b58}") try: - os.remove(temp_path) - make_log("uploader_v1.5", f"Temporary file removed due to hash mismatch: {temp_path}", level="INFO") + os.rename(temp_path, final_path) + make_log("uploader_v1.5", f"Final chunk received. File renamed to: {final_path}", level="INFO") except Exception as e: - make_log("uploader_v1.5", f"Error removing temp file: {e}", level="ERROR") - return response.json({"error": "Hash mismatch"}, status=400) - - # Determine the final file path - final_path = os.path.join(UPLOADS_DIR, f"v1.5_{computed_hash_b58}") - if os.path.exists(final_path): - make_log("uploader_v1.5", f"File already exists on disk: {final_path}", level="INFO") - try: - os.remove(temp_path) - make_log("uploader_v1.5", f"Temporary file removed: {temp_path}", level="INFO") - except Exception as e: - make_log("uploader_v1.5", f"Error removing temp file: {e}", level="ERROR") + make_log("uploader_v1.5", f"Error renaming file: {e}", level="ERROR") + return response.json({"error": "Failed to finalize file storage"}, status=500) + + db_session = request.ctx.db_session existing = db_session.query(StoredContent).filter_by(hash=computed_hash_b58).first() if existing: + make_log("uploader_v1.5", f"File with hash {computed_hash_b58} already exists in DB", level="INFO") serialized_v2 = existing.serialize_v2() serialized_v1 = existing.serialize_v1() return response.json({ + "upload_id": upload_id, "content_sha256": computed_hash_b58, "content_id": serialized_v2, "content_id_v1": serialized_v1, "content_url": f"dmy://storage?cid={serialized_v2}", }) - else: + try: - os.rename(temp_path, final_path) - make_log("uploader_v1.5", f"Renamed temporary file to final location: {final_path}", level="INFO") + new_content = StoredContent( + type='local/content_bin', + hash=computed_hash_b58, + user_id=request.ctx.user.id, + filename=provided_filename, + key_id=None, + meta={}, + created=datetime.utcnow() + ) + db_session.add(new_content) + db_session.commit() + make_log("uploader_v1.5", f"New file stored and indexed for user {request.ctx.user.id} with hash {computed_hash_b58}", level="INFO") except Exception as e: - make_log("uploader_v1.5", f"Error renaming file: {e}", level="ERROR") - return response.json({"error": "Failed to finalize file storage"}, status=500) - - # Create a new StoredContent record with user_id from request.ctx.user and commit to DB - try: - new_content = StoredContent( - type='local/content_bin', - hash=computed_hash_b58, - user_id=request.ctx.user.id, # 'user_id' is added to StoredContent - filename=provided_filename, - key_id=None, - meta={}, - created=datetime.utcnow() - ) - db_session.add(new_content) - db_session.commit() - make_log("uploader_v1.5", f"New file stored and indexed for user {request.ctx.user.id} with hash {computed_hash_b58}", level="INFO") - except Exception as e: - make_log("uploader_v1.5", f"Database error: {e}", level="ERROR") - return response.json({"error": "Database error"}, status=500) - - serialized_v2 = new_content.serialize_v2() - serialized_v1 = new_content.serialize_v1() - return response.json({ - "content_sha256": computed_hash_b58, - "content_id": serialized_v2, - "content_id_v1": serialized_v1, - "content_url": f"dmy://storage?cid={serialized_v2}", - }) + make_log("uploader_v1.5", f"Database error: {e}", level="ERROR") + return response.json({"error": "Database error"}, status=500) + + serialized_v2 = new_content.serialize_v2() + serialized_v1 = new_content.serialize_v1() + return response.json({ + "upload_id": upload_id, + "content_sha256": computed_hash_b58, + "content_id": serialized_v2, + "content_id_v1": serialized_v1, + "content_url": f"dmy://storage?cid={serialized_v2}", + }) + else: + # Not the final chunk yet – return current upload status + return response.json({"upload_id": upload_id, "current_size": os.path.getsize(temp_path)}) # GET /api/v1.5/storage/ @@ -156,22 +187,18 @@ async def s_api_v1_5_storage_get(request, file_hash): # Log the file retrieval request make_log("uploader_v1.5", f"Received file retrieval request for hash: {file_hash}", level="INFO") - # Determine the file path based on the provided file_hash final_path = os.path.join(UPLOADS_DIR, f"v1.5_{file_hash}") if not os.path.exists(final_path): make_log("uploader_v1.5", f"File not found: {final_path}", level="ERROR") return response.json({"error": "File not found"}, status=404) - # Retrieve the StoredContent record from the database db_session = request.ctx.db_session stored = db_session.query(StoredContent).filter_by(hash=file_hash).first() if stored and stored.filename: filename_for_mime = stored.filename else: - # If the record is not found or filename is not set, fallback to the file path filename_for_mime = final_path - # Determine MIME type using filename from StoredContent mime_type, _ = guess_type(filename_for_mime) if not mime_type: mime_type = "application/octet-stream" @@ -186,7 +213,6 @@ async def s_api_v1_5_storage_get(request, file_hash): make_log("uploader_v1.5", f"Invalid Range header: {range_header}", level="ERROR") return response.json({"error": "Invalid Range header"}, status=400) range_spec = range_spec[len("bytes="):] - # Split by comma to handle multiple ranges range_parts = [part.strip() for part in range_spec.split(',')] parsed_ranges = [] try: @@ -195,7 +221,6 @@ async def s_api_v1_5_storage_get(request, file_hash): raise ValueError("Invalid range format") start_str, end_str = part.split('-', 1) if start_str == "": - # Suffix byte range: last N bytes suffix_length = int(end_str) if suffix_length > file_size: start = 0 @@ -215,7 +240,6 @@ async def s_api_v1_5_storage_get(request, file_hash): make_log("uploader_v1.5", f"Invalid Range header: {range_header} - {e}", level="ERROR") return response.json({"error": "Invalid Range header"}, status=400) - # If only one range is requested, use single range response if len(parsed_ranges) == 1: start, end = parsed_ranges[0] content_length = end - start + 1 @@ -226,12 +250,11 @@ async def s_api_v1_5_storage_get(request, file_hash): "Content-Type": mime_type, } async def stream_file_range(): - # Stream single range content make_log("uploader_v1.5", f"Starting to stream file from byte {start} to {end}", level="INFO") async with aiofiles.open(final_path, mode='rb') as f: await f.seek(start) remaining = content_length - chunk_size = 1024 * 1024 # 1MB chunks + chunk_size = 1024 * 1024 while remaining > 0: read_size = min(chunk_size, remaining) data = await f.read(read_size) @@ -242,14 +265,12 @@ async def s_api_v1_5_storage_get(request, file_hash): make_log("uploader_v1.5", f"Finished streaming file: {final_path}", level="INFO") return response.stream(stream_file_range, status=206, headers=headers) else: - # Multiple ranges requested: create a multipart/byteranges response - boundary = uuid4().hex # Generate a random boundary string + boundary = uuid4().hex headers = { "Content-Type": f"multipart/byteranges; boundary={boundary}", "Accept-Ranges": "bytes", } async def stream_multipart(): - # For each range, yield the boundary, part headers, and the file content for start, end in parsed_ranges: part_header = ( f"--{boundary}\r\n" @@ -262,7 +283,7 @@ async def s_api_v1_5_storage_get(request, file_hash): async with aiofiles.open(final_path, mode='rb') as f: await f.seek(start) remaining = part_length - chunk_size = 1024 * 1024 # 1MB chunks + chunk_size = 1024 * 1024 while remaining > 0: read_size = min(chunk_size, remaining) data = await f.read(read_size) @@ -271,10 +292,8 @@ async def s_api_v1_5_storage_get(request, file_hash): remaining -= len(data) yield data yield b"\r\n" - # Final boundary marker yield f"--{boundary}--\r\n".encode() return response.stream(stream_multipart, status=206, headers=headers) else: - # No Range header: return full file make_log("uploader_v1.5", f"Returning full file for video/audio: {final_path}", level="INFO") return await response.file(final_path, mime_type=mime_type) diff --git a/uploader_test.html b/uploader_test.html index b0ed2d1..87b5bb6 100644 --- a/uploader_test.html +++ b/uploader_test.html @@ -43,7 +43,7 @@ // Base URL for endpoints const BASE_URL = "https://my-public-node-1.projscale.dev/api/v1.5/storage"; - // Function to append messages to log div + // Append log message to log div function appendLog(message) { const logDiv = document.getElementById('log'); const p = document.createElement('p'); @@ -51,7 +51,7 @@ logDiv.appendChild(p); } - // Function to compute SHA-256 hash of a file in hex using jsSHA library with incremental reading + // Compute SHA-256 hash of a file using jsSHA library (incremental reading) function computeSHA256(file) { return new Promise((resolve, reject) => { const chunkSize = 2097152; // 2MB per chunk @@ -60,7 +60,7 @@ const shaObj = new jsSHA("SHA-256", "ARRAYBUFFER"); reader.onload = function(e) { - // Update the hash object with the current chunk data + // Update hash with current chunk data shaObj.update(e.target.result); offset += chunkSize; appendLog(`Processed ${Math.min(offset, file.size)} из ${file.size} байт`); @@ -89,6 +89,70 @@ }); } + // Upload file in chunks (max 80 MB per chunk) + async function uploadFileInChunks(file) { + const maxChunkSize = 80 * 1024 * 1024; // 80 MB + let offset = 0; + let uploadId = null; + try { + appendLog("Starting hash computation..."); + const hashHex = await computeSHA256(file); + appendLog(`Computed SHA-256 hash: ${hashHex}`); + + while (offset < file.size) { + const chunk = file.slice(offset, Math.min(offset + maxChunkSize, file.size)); + appendLog(`Uploading chunk starting at byte ${offset}`); + + // Prepare headers for the chunk upload + const headers = { + "X-Content-SHA256": hashHex, + "X-File-Name": btoa(unescape(encodeURIComponent(file.name))), // File name in base64 + "X-Chunk-Start": offset.toString(), + "Content-Type": file.type || "application/octet-stream" + }; + if (uploadId) { + headers["X-Upload-ID"] = uploadId; + } + + const response = await fetch(BASE_URL, { + method: "POST", + headers: headers, + body: chunk + }); + + if (!response.ok) { + const errorData = await response.json(); + appendLog(`Chunk upload failed: ${errorData.error}`); + throw new Error(`Upload failed at offset ${offset}: ${errorData.error}`); + } + + const resultData = await response.json(); + // Save uploadId from first response if not set + if (!uploadId && resultData.upload_id) { + uploadId = resultData.upload_id; + } + + // If final response contains content_id, upload is complete + if (resultData.content_id) { + appendLog(`Upload complete. File ID: ${resultData.content_id}`); + return resultData; + } + + // Update offset based on server-reported current size + if (resultData.current_size !== undefined) { + offset = resultData.current_size; + appendLog(`Server reports current_size: ${offset}`); + } else { + appendLog("Unexpected response from server, missing current_size."); + throw new Error("Missing current_size in response"); + } + } + } catch (err) { + appendLog(`Error during upload: ${err}`); + throw err; + } + } + // Upload button event listener document.getElementById('uploadBtn').addEventListener('click', async () => { const fileInput = document.getElementById('uploadFile'); @@ -101,42 +165,15 @@ } const file = fileInput.files[0]; - appendLog("Starting hash computation..."); try { - // Compute SHA-256 hash of the file in hex format - const hashHex = await computeSHA256(file); - appendLog(`Computed SHA-256 hash: ${hashHex}`); - - // Prepare the POST request with file as body and additional header for filename - const encodedFileName = btoa(unescape(encodeURIComponent(file.name))); - const response = await fetch(BASE_URL, { - method: "POST", - headers: { - "X-Content-SHA256": hashHex, - "X-File-Name": encodedFileName, // NEW: передаём имя файла в base64 - "Content-Type": file.type || "application/octet-stream" - }, - body: file - }); - - - if (!response.ok) { - const errorData = await response.json(); - uploadResult.textContent = `Ошибка: ${errorData.error}`; - appendLog(`Upload failed: ${errorData.error}`); - return; - } - - const resultData = await response.json(); + const resultData = await uploadFileInChunks(file); uploadResult.textContent = `Файл загружен успешно. content_sha256: ${resultData.content_sha256}`; - appendLog(`Upload successful. Response: ${JSON.stringify(resultData)}`); } catch (err) { uploadResult.textContent = "Ошибка при загрузке файла."; - appendLog(`Error during upload: ${err}`); } }); - // Load file button event listener for streaming + // Load file for streaming (remains unchanged) document.getElementById('loadFileBtn').addEventListener('click', async () => { const fileHash = document.getElementById('fileHashInput').value.trim(); const mediaContainer = document.getElementById('mediaContainer'); @@ -147,12 +184,10 @@ return; } - // Construct file URL const fileUrl = `${BASE_URL}/${fileHash}`; appendLog(`Fetching file info for hash: ${fileHash}`); try { - // Perform a HEAD request to determine Content-Type const headResponse = await fetch(fileUrl, { method: "HEAD" }); if (!headResponse.ok) { mediaContainer.textContent = "Файл не найден."; @@ -164,7 +199,6 @@ appendLog(`Content-Type: ${contentType}`); let mediaElement; - // Create appropriate element based on Content-Type if (contentType.startsWith("image/")) { mediaElement = document.createElement("img"); mediaElement.style.maxWidth = "100%"; @@ -176,12 +210,10 @@ mediaElement = document.createElement("audio"); mediaElement.controls = true; } else { - // For other types, create a download link mediaElement = document.createElement("a"); mediaElement.textContent = "Скачать файл"; } - // Set the src or href attribute to stream the file if (mediaElement.tagName === "A") { mediaElement.href = fileUrl; mediaElement.download = "";