uploader-bot/app/api/routes/progressive_storage.py

323 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import subprocess
import asyncio
from uuid import uuid4
from datetime import datetime
from mimetypes import guess_type
from base64 import b64decode
import aiofiles
from base58 import b58encode
from sanic import response
from app.core.logger import make_log
from app.core.models.node_storage import StoredContent
from app.core._config import UPLOADS_DIR
from app.core._utils.resolve_content import resolve_content
# POST /api/v1.5/storage
async def s_api_v1_5_storage_post(request):
# Log the receipt of a chunk upload request
make_log("uploader_v1.5", "Received chunk upload request", level="INFO")
# Get the provided file hash from header (hex format)
provided_hash_hex = request.headers.get("X-Content-SHA256")
if not provided_hash_hex:
make_log("uploader_v1.5", "Missing X-Content-SHA256 header", level="ERROR")
return response.json({"error": "Missing X-Content-SHA256 header"}, status=400)
try:
provided_hash_bytes = bytes.fromhex(provided_hash_hex)
provided_hash_b58 = b58encode(provided_hash_bytes).decode()
make_log("uploader_v1.5", f"Provided hash (base58): {provided_hash_b58}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Invalid X-Content-SHA256 header format: {e}", level="ERROR")
return response.json({"error": "Invalid X-Content-SHA256 header format"}, status=400)
existing = request.ctx.db_session.query(StoredContent).filter_by(hash=provided_hash_b58).first()
if existing:
make_log("uploader_v1.5", f"File with hash {provided_hash_b58} already exists in DB", level="INFO")
serialized_v2 = existing.cid.serialize_v2() # Get v2 content id
serialized_v1 = existing.cid.serialize_v1() # Get v1 content id
# Return early response since the file is already stored
return response.json({
"upload_id": request.headers.get("X-Upload-ID", str(uuid4())), # Use provided or generate a new upload_id
"content_sha256": provided_hash_b58,
"content_id": serialized_v2,
"content_id_v1": serialized_v1,
"content_url": f"dmy://storage?cid={serialized_v2}",
})
# Get the provided file name from header and decode it from base64
provided_filename_b64 = request.headers.get("X-File-Name")
if not provided_filename_b64:
make_log("uploader_v1.5", "Missing X-File-Name header", level="ERROR")
return response.json({"error": "Missing X-File-Name header"}, status=400)
try:
provided_filename = b64decode(provided_filename_b64).decode("utf-8")
except Exception as e:
make_log("uploader_v1.5", f"Invalid X-File-Name header: {e}", level="ERROR")
return response.json({"error": "Invalid X-File-Name header"}, status=400)
# Get X-Chunk-Start header (must be provided) and parse it as integer
chunk_start_header = request.headers.get("X-Chunk-Start")
if chunk_start_header is None:
make_log("uploader_v1.5", "Missing X-Chunk-Start header", level="ERROR")
return response.json({"error": "Missing X-Chunk-Start header"}, status=400)
try:
chunk_start = int(chunk_start_header)
except Exception as e:
make_log("uploader_v1.5", f"Invalid X-Chunk-Start header: {e}", level="ERROR")
return response.json({"error": "Invalid X-Chunk-Start header"}, status=400)
# Enforce maximum chunk size (80 MB) using Content-Length header if provided
max_chunk_size = 80 * 1024 * 1024 # 80 MB
content_length = request.headers.get("Content-Length")
if content_length is not None:
try:
content_length = int(content_length)
if content_length > max_chunk_size:
make_log("uploader_v1.5", f"Chunk size {content_length} exceeds maximum allowed", level="ERROR")
return response.json({"error": "Chunk size exceeds maximum allowed (80 MB)"}, status=400)
except:
pass
# Determine if this is a new upload or a continuation (resume)
upload_id = request.headers.get("X-Upload-ID")
is_new_upload = False
if not upload_id:
# New upload session: generate a new uuid
upload_id = str(uuid4())
is_new_upload = True
make_log("uploader_v1.5", f"Starting new upload session with ID: {upload_id}", level="INFO")
else:
make_log("uploader_v1.5", f"Resuming upload session with ID: {upload_id}", level="INFO")
# Determine the temporary file path based on upload_id
temp_path = os.path.join(UPLOADS_DIR, f"v1.5_upload_{upload_id}")
# Check current size of the temporary file (if it exists)
current_size = 0
if os.path.exists(temp_path):
current_size = os.path.getsize(temp_path)
# If the provided chunk_start is less than current_size, the chunk is already received
if chunk_start < current_size:
make_log("uploader_v1.5", f"Chunk starting at {chunk_start} already received, current size: {current_size}", level="INFO")
return response.json({"upload_id": upload_id, "current_size": current_size})
elif chunk_start > current_size:
make_log("uploader_v1.5", f"Chunk start {chunk_start} does not match current file size {current_size}", level="ERROR")
return response.json({"error": "Chunk start does not match current file size"}, status=400)
# Append the received chunk to the temporary file
try:
mode = 'wb' if is_new_upload else 'ab'
async with aiofiles.open(temp_path, mode) as out_file:
data = request.body # Get the full body if available
if data:
await out_file.write(data) # Write the whole body at once
else:
async for chunk in request.stream:
await out_file.write(chunk)
new_size = os.path.getsize(temp_path)
make_log("uploader_v1.5", f"Appended chunk. New file size: {new_size}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Error saving chunk: {e}", level="ERROR")
return response.json({"error": "Failed to save chunk"}, status=500)
# Compute the SHA256 hash of the temporary file using subprocess
try:
proc = await asyncio.create_subprocess_exec(
'sha256sum', temp_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error_msg = stderr.decode().strip()
make_log("uploader_v1.5", f"sha256sum error: {error_msg}", level="ERROR")
return response.json({"error": "Failed to compute file hash"}, status=500)
computed_hash_hex = stdout.decode().split()[0].strip()
computed_hash_bytes = bytes.fromhex(computed_hash_hex)
computed_hash_b58 = b58encode(computed_hash_bytes).decode()
make_log("uploader_v1.5", f"Computed hash (base58): {computed_hash_b58}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Error computing file hash: {e}", level="ERROR")
return response.json({"error": "Error computing file hash"}, status=500)
# If computed hash matches the provided one, the final chunk has been received
if computed_hash_b58 == provided_hash_b58:
final_path = os.path.join(UPLOADS_DIR, f"{computed_hash_b58}")
try:
os.rename(temp_path, final_path)
make_log("uploader_v1.5", f"Final chunk received. File renamed to: {final_path}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Error renaming file: {e}", level="ERROR")
return response.json({"error": "Failed to finalize file storage"}, status=500)
db_session = request.ctx.db_session
existing = db_session.query(StoredContent).filter_by(hash=computed_hash_b58).first()
if existing:
make_log("uploader_v1.5", f"File with hash {computed_hash_b58} already exists in DB", level="INFO")
serialized_v2 = existing.cid.serialize_v2()
serialized_v1 = existing.cid.serialize_v1()
return response.json({
"upload_id": upload_id,
"content_sha256": computed_hash_b58,
"content_id": serialized_v2,
"content_id_v1": serialized_v1,
"content_url": f"dmy://storage?cid={serialized_v2}",
})
try:
user_id = request.ctx.user.id if request.ctx.user else None
new_content = StoredContent(
type='local/content_bin',
hash=computed_hash_b58,
user_id=user_id,
filename=provided_filename,
key_id=None,
meta={},
created=datetime.utcnow()
)
db_session.add(new_content)
db_session.commit()
make_log("uploader_v1.5", f"New file stored and indexed for user {user_id} with hash {computed_hash_b58}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Database error: {e}", level="ERROR")
return response.json({"error": "Database error"}, status=500)
serialized_v2 = new_content.cid.serialize_v2()
serialized_v1 = new_content.cid.serialize_v1()
return response.json({
"upload_id": upload_id,
"content_sha256": computed_hash_b58,
"content_id": serialized_v2,
"content_id_v1": serialized_v1,
"content_url": f"dmy://storage?cid={serialized_v2}",
})
else:
# Not the final chunk yet return current upload status
return response.json({"upload_id": upload_id, "current_size": os.path.getsize(temp_path)})
# GET /api/v1.5/storage/<file_hash>
async def s_api_v1_5_storage_get(request, file_hash):
make_log("uploader_v1.5", f"Received file retrieval request for hash: {file_hash}", level="INFO")
try:
file_hash = b58encode(resolve_content(file_hash)[0].content_hash).decode()
except:
pass
final_path = os.path.join(UPLOADS_DIR, f"{file_hash}")
if not os.path.exists(final_path):
make_log("uploader_v1.5", f"File not found: {final_path}", level="ERROR")
return response.json({"error": "File not found"}, status=404)
db_session = request.ctx.db_session
stored = db_session.query(StoredContent).filter_by(hash=file_hash).first()
if stored and stored.filename:
filename_for_mime = stored.filename
else:
filename_for_mime = final_path
mime_type, _ = guess_type(filename_for_mime)
if not mime_type:
mime_type = "application/octet-stream"
file_size = os.path.getsize(final_path)
range_header = request.headers.get("Range")
if range_header:
make_log("uploader_v1.5", f"Processing Range header: {range_header}", level="INFO")
range_spec = range_header.strip().lower()
if not range_spec.startswith("bytes="):
make_log("uploader_v1.5", f"Invalid Range header: {range_header}", level="ERROR")
return response.json({"error": "Invalid Range header"}, status=400)
range_spec = range_spec[len("bytes="):]
range_parts = [part.strip() for part in range_spec.split(',')]
parsed_ranges = []
try:
for part in range_parts:
if '-' not in part:
raise ValueError("Invalid range format")
start_str, end_str = part.split('-', 1)
if start_str == "":
suffix_length = int(end_str)
start = 0 if suffix_length > file_size else file_size - suffix_length
end = file_size - 1
else:
start = int(start_str)
end = file_size - 1 if end_str == "" else int(end_str)
if start > end or end >= file_size:
raise ValueError("Requested Range Not Satisfiable")
parsed_ranges.append((start, end))
except Exception as e:
make_log("uploader_v1.5", f"Invalid Range header: {range_header} - {e}", level="ERROR")
return response.json({"error": "Invalid Range header"}, status=400)
if len(parsed_ranges) == 1:
# Single range streaming
start, end = parsed_ranges[0]
content_length = end - start + 1
headers = {
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(content_length),
"Content-Type": mime_type,
}
# Create response for streaming
stream_response = await request.respond(headers=headers, status=206, content_type=mime_type)
make_log("uploader_v1.5", f"Starting to stream file from byte {start} to {end}", level="INFO")
async with aiofiles.open(final_path, mode='rb') as f:
await f.seek(start)
remaining = content_length
chunk_size = 1024 * 1024 # chunk size in bytes
while remaining > 0:
read_size = min(chunk_size, remaining)
data = await f.read(read_size)
if not data:
break
remaining -= len(data)
await stream_response.send(data)
make_log("uploader_v1.5", f"Finished streaming file: {final_path}", level="INFO")
await stream_response.eof()
return stream_response
else:
# Multipart range streaming
boundary = uuid4().hex
headers = {
"Content-Type": f"multipart/byteranges; boundary={boundary}",
"Accept-Ranges": "bytes",
}
stream_response = await request.respond(headers=headers, status=206)
for start, end in parsed_ranges:
part_header = (
f"--{boundary}\r\n"
f"Content-Type: {mime_type}\r\n"
f"Content-Range: bytes {start}-{end}/{file_size}\r\n"
f"\r\n"
)
await stream_response.send(part_header.encode())
part_length = end - start + 1
async with aiofiles.open(final_path, mode='rb') as f:
await f.seek(start)
remaining = part_length
chunk_size = 1024 * 1024
while remaining > 0:
read_size = min(chunk_size, remaining)
data = await f.read(read_size)
if not data:
break
remaining -= len(data)
await stream_response.send(data)
await stream_response.send(b"\r\n")
await stream_response.send(f"--{boundary}--\r\n".encode())
await stream_response.eof()
return stream_response
else:
make_log("uploader_v1.5", f"Returning full file for video/audio: {final_path}", level="INFO")
return await response.file(final_path, mime_type=mime_type)