uploader-bot/app/api/routes/progressive_storage.py

279 lines
13 KiB
Python

import os
import subprocess
import asyncio
from uuid import uuid4
from datetime import datetime
from mimetypes import guess_type
import aiofiles
from base58 import b58encode
from sanic import response
from app.core.logger import make_log
from app.core.models.node_storage import StoredContent
from app.core._config import UPLOADS_DIR
from app.core._utils.resolve_content import resolve_content
# POST /api/v1.5/storage
async def s_api_v1_5_storage_post(request):
# Log the start of the file upload process
make_log("uploader_v1.5", "Received file upload request", level="INFO")
# Get the provided file hash from header (hex format)
provided_hash_hex = request.headers.get("X-Content-SHA256")
if not provided_hash_hex:
make_log("uploader_v1.5", "Missing X-Content-SHA256 header", level="ERROR")
return response.json({"error": "Missing X-Content-SHA256 header"}, status=400)
try:
provided_hash_bytes = bytes.fromhex(provided_hash_hex)
provided_hash_b58 = b58encode(provided_hash_bytes).decode()
make_log("uploader_v1.5", f"Provided hash (base58): {provided_hash_b58}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Invalid X-Content-SHA256 header format: {e}", level="ERROR")
return response.json({"error": "Invalid X-Content-SHA256 header format"}, status=400)
provided_filename = request.headers.get("X-File-Name")
# Check if the file already exists in the database
db_session = request.ctx.db_session
existing = db_session.query(StoredContent).filter_by(hash=provided_hash_b58).first()
if existing:
make_log("uploader_v1.5", f"File with hash {provided_hash_b58} already exists in DB", level="INFO")
serialized_v2 = existing.serialize_v2() # returns a string
serialized_v1 = existing.serialize_v1() # returns a string
return response.json({
"content_sha256": provided_hash_b58,
"content_id": serialized_v2,
"content_id_v1": serialized_v1,
"content_url": f"dmy://storage?cid={serialized_v2}",
})
# Save uploaded file to a temporary location using streaming
temp_filename = f"v1.5_upload_{uuid4()}"
temp_path = os.path.join(UPLOADS_DIR, temp_filename)
make_log("uploader_v1.5", f"Saving file to temporary location: {temp_path}", level="INFO")
try:
async with aiofiles.open(temp_path, 'wb') as out_file:
async for chunk in request.stream:
await out_file.write(chunk)
make_log("uploader_v1.5", f"Finished saving file to temporary location: {temp_path}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Error saving uploaded file: {e}", level="ERROR")
return response.json({"error": "Failed to save uploaded file"}, status=500)
# Compute file SHA256 using subprocess to avoid loading the file into memory
make_log("uploader_v1.5", f"Computing file hash using subprocess for file: {temp_path}", level="INFO")
try:
proc = await asyncio.create_subprocess_exec(
'sha256sum', temp_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error_msg = stderr.decode().strip()
make_log("uploader_v1.5", f"sha256sum error: {error_msg}", level="ERROR")
return response.json({"error": "Failed to compute file hash"}, status=500)
# Parse output: "<hash> <filename>"
computed_hash_hex = stdout.decode().split()[0].strip()
computed_hash_bytes = bytes.fromhex(computed_hash_hex)
computed_hash_b58 = b58encode(computed_hash_bytes).decode()
make_log("uploader_v1.5", f"Computed hash (base58): {computed_hash_b58}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Error computing file hash: {e}", level="ERROR")
return response.json({"error": "Error computing file hash"}, status=500)
# Verify that the computed hash matches the provided hash
if computed_hash_b58 != provided_hash_b58:
make_log("uploader_v1.5", f"Hash mismatch: provided {provided_hash_b58} vs computed {computed_hash_b58}", level="ERROR")
try:
os.remove(temp_path)
make_log("uploader_v1.5", f"Temporary file removed due to hash mismatch: {temp_path}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Error removing temp file: {e}", level="ERROR")
return response.json({"error": "Hash mismatch"}, status=400)
# Determine the final file path
final_path = os.path.join(UPLOADS_DIR, f"v1.5_{computed_hash_b58}")
if os.path.exists(final_path):
make_log("uploader_v1.5", f"File already exists on disk: {final_path}", level="INFO")
try:
os.remove(temp_path)
make_log("uploader_v1.5", f"Temporary file removed: {temp_path}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Error removing temp file: {e}", level="ERROR")
existing = db_session.query(StoredContent).filter_by(hash=computed_hash_b58).first()
if existing:
serialized_v2 = existing.serialize_v2()
serialized_v1 = existing.serialize_v1()
return response.json({
"content_sha256": computed_hash_b58,
"content_id": serialized_v2,
"content_id_v1": serialized_v1,
"content_url": f"dmy://storage?cid={serialized_v2}",
})
else:
try:
os.rename(temp_path, final_path)
make_log("uploader_v1.5", f"Renamed temporary file to final location: {final_path}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Error renaming file: {e}", level="ERROR")
return response.json({"error": "Failed to finalize file storage"}, status=500)
# Create a new StoredContent record with user_id from request.ctx.user and commit to DB
try:
new_content = StoredContent(
type='local/content_bin',
hash=computed_hash_b58,
user_id=request.ctx.user.id, # 'user_id' is added to StoredContent
filename=provided_filename,
key_id=None,
meta={},
created=datetime.utcnow()
)
db_session.add(new_content)
db_session.commit()
make_log("uploader_v1.5", f"New file stored and indexed for user {request.ctx.user.id} with hash {computed_hash_b58}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"Database error: {e}", level="ERROR")
return response.json({"error": "Database error"}, status=500)
serialized_v2 = new_content.serialize_v2()
serialized_v1 = new_content.serialize_v1()
return response.json({
"content_sha256": computed_hash_b58,
"content_id": serialized_v2,
"content_id_v1": serialized_v1,
"content_url": f"dmy://storage?cid={serialized_v2}",
})
# GET /api/v1.5/storage/<file_hash>
async def s_api_v1_5_storage_get(request, file_hash):
# Log the file retrieval request
make_log("uploader_v1.5", f"Received file retrieval request for hash: {file_hash}", level="INFO")
# Determine the file path based on the provided file_hash
final_path = os.path.join(UPLOADS_DIR, f"v1.5_{file_hash}")
if not os.path.exists(final_path):
make_log("uploader_v1.5", f"File not found: {final_path}", level="ERROR")
return response.json({"error": "File not found"}, status=404)
# Retrieve the StoredContent record from the database
db_session = request.ctx.db_session
stored = db_session.query(StoredContent).filter_by(hash=file_hash).first()
if stored and stored.filename:
filename_for_mime = stored.filename
else:
# If the record is not found or filename is not set, fallback to the file path
filename_for_mime = final_path
# Determine MIME type using filename from StoredContent
mime_type, _ = guess_type(filename_for_mime)
if not mime_type:
mime_type = "application/octet-stream"
file_size = os.path.getsize(final_path)
range_header = request.headers.get("Range")
if range_header:
make_log("uploader_v1.5", f"Processing Range header: {range_header}", level="INFO")
range_spec = range_header.strip().lower()
if not range_spec.startswith("bytes="):
make_log("uploader_v1.5", f"Invalid Range header: {range_header}", level="ERROR")
return response.json({"error": "Invalid Range header"}, status=400)
range_spec = range_spec[len("bytes="):]
# Split by comma to handle multiple ranges
range_parts = [part.strip() for part in range_spec.split(',')]
parsed_ranges = []
try:
for part in range_parts:
if '-' not in part:
raise ValueError("Invalid range format")
start_str, end_str = part.split('-', 1)
if start_str == "":
# Suffix byte range: last N bytes
suffix_length = int(end_str)
if suffix_length > file_size:
start = 0
else:
start = file_size - suffix_length
end = file_size - 1
else:
start = int(start_str)
if end_str == "":
end = file_size - 1
else:
end = int(end_str)
if start > end or end >= file_size:
raise ValueError("Requested Range Not Satisfiable")
parsed_ranges.append((start, end))
except Exception as e:
make_log("uploader_v1.5", f"Invalid Range header: {range_header} - {e}", level="ERROR")
return response.json({"error": "Invalid Range header"}, status=400)
# If only one range is requested, use single range response
if len(parsed_ranges) == 1:
start, end = parsed_ranges[0]
content_length = end - start + 1
headers = {
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(content_length),
"Content-Type": mime_type,
}
async def stream_file_range():
# Stream single range content
make_log("uploader_v1.5", f"Starting to stream file from byte {start} to {end}", level="INFO")
async with aiofiles.open(final_path, mode='rb') as f:
await f.seek(start)
remaining = content_length
chunk_size = 1024 * 1024 # 1MB chunks
while remaining > 0:
read_size = min(chunk_size, remaining)
data = await f.read(read_size)
if not data:
break
remaining -= len(data)
yield data
make_log("uploader_v1.5", f"Finished streaming file: {final_path}", level="INFO")
return response.stream(stream_file_range, status=206, headers=headers)
else:
# Multiple ranges requested: create a multipart/byteranges response
boundary = uuid4().hex # Generate a random boundary string
headers = {
"Content-Type": f"multipart/byteranges; boundary={boundary}",
"Accept-Ranges": "bytes",
}
async def stream_multipart():
# For each range, yield the boundary, part headers, and the file content
for start, end in parsed_ranges:
part_header = (
f"--{boundary}\r\n"
f"Content-Type: {mime_type}\r\n"
f"Content-Range: bytes {start}-{end}/{file_size}\r\n"
f"\r\n"
)
yield part_header.encode()
part_length = end - start + 1
async with aiofiles.open(final_path, mode='rb') as f:
await f.seek(start)
remaining = part_length
chunk_size = 1024 * 1024 # 1MB chunks
while remaining > 0:
read_size = min(chunk_size, remaining)
data = await f.read(read_size)
if not data:
break
remaining -= len(data)
yield data
yield b"\r\n"
# Final boundary marker
yield f"--{boundary}--\r\n".encode()
return response.stream(stream_multipart, status=206, headers=headers)
else:
# No Range header: return full file
make_log("uploader_v1.5", f"Returning full file for video/audio: {final_path}", level="INFO")
return await response.file(final_path, mime_type=mime_type)