281 lines
13 KiB
Python
281 lines
13 KiB
Python
import os
|
|
import subprocess
|
|
import asyncio
|
|
from uuid import uuid4
|
|
from datetime import datetime
|
|
from mimetypes import guess_type
|
|
|
|
import aiofiles
|
|
from base58 import b58encode
|
|
from base64 import b64decode
|
|
from sanic import response
|
|
|
|
from app.core.logger import make_log
|
|
from app.core.models.node_storage import StoredContent
|
|
from app.core._config import UPLOADS_DIR
|
|
from app.core._utils.resolve_content import resolve_content
|
|
|
|
|
|
# POST /api/v1.5/storage
|
|
async def s_api_v1_5_storage_post(request):
|
|
# Log the start of the file upload process
|
|
make_log("uploader_v1.5", "Received file upload request", level="INFO")
|
|
|
|
# Get the provided file hash from header (hex format)
|
|
provided_hash_hex = request.headers.get("X-Content-SHA256")
|
|
if not provided_hash_hex:
|
|
make_log("uploader_v1.5", "Missing X-Content-SHA256 header", level="ERROR")
|
|
return response.json({"error": "Missing X-Content-SHA256 header"}, status=400)
|
|
try:
|
|
provided_hash_bytes = bytes.fromhex(provided_hash_hex)
|
|
provided_hash_b58 = b58encode(provided_hash_bytes).decode()
|
|
make_log("uploader_v1.5", f"Provided hash (base58): {provided_hash_b58}", level="INFO")
|
|
except Exception as e:
|
|
make_log("uploader_v1.5", f"Invalid X-Content-SHA256 header format: {e}", level="ERROR")
|
|
return response.json({"error": "Invalid X-Content-SHA256 header format"}, status=400)
|
|
|
|
provided_filename = request.headers.get("X-File-Name")
|
|
provided_filename = b64decode(provided_filename).decode("utf-8")
|
|
|
|
# Check if the file already exists in the database
|
|
db_session = request.ctx.db_session
|
|
existing = db_session.query(StoredContent).filter_by(hash=provided_hash_b58).first()
|
|
if existing:
|
|
make_log("uploader_v1.5", f"File with hash {provided_hash_b58} already exists in DB", level="INFO")
|
|
serialized_v2 = existing.serialize_v2() # returns a string
|
|
serialized_v1 = existing.serialize_v1() # returns a string
|
|
return response.json({
|
|
"content_sha256": provided_hash_b58,
|
|
"content_id": serialized_v2,
|
|
"content_id_v1": serialized_v1,
|
|
"content_url": f"dmy://storage?cid={serialized_v2}",
|
|
})
|
|
|
|
# Save uploaded file to a temporary location using streaming
|
|
temp_filename = f"v1.5_upload_{uuid4()}"
|
|
temp_path = os.path.join(UPLOADS_DIR, temp_filename)
|
|
make_log("uploader_v1.5", f"Saving file to temporary location: {temp_path}", level="INFO")
|
|
try:
|
|
async with aiofiles.open(temp_path, 'wb') as out_file:
|
|
async for chunk in request.stream:
|
|
await out_file.write(chunk)
|
|
make_log("uploader_v1.5", f"Finished saving file to temporary location: {temp_path}", level="INFO")
|
|
except Exception as e:
|
|
make_log("uploader_v1.5", f"Error saving uploaded file: {e}", level="ERROR")
|
|
return response.json({"error": "Failed to save uploaded file"}, status=500)
|
|
|
|
# Compute file SHA256 using subprocess to avoid loading the file into memory
|
|
make_log("uploader_v1.5", f"Computing file hash using subprocess for file: {temp_path}", level="INFO")
|
|
try:
|
|
proc = await asyncio.create_subprocess_exec(
|
|
'sha256sum', temp_path,
|
|
stdout=asyncio.subprocess.PIPE,
|
|
stderr=asyncio.subprocess.PIPE
|
|
)
|
|
stdout, stderr = await proc.communicate()
|
|
if proc.returncode != 0:
|
|
error_msg = stderr.decode().strip()
|
|
make_log("uploader_v1.5", f"sha256sum error: {error_msg}", level="ERROR")
|
|
return response.json({"error": "Failed to compute file hash"}, status=500)
|
|
# Parse output: "<hash> <filename>"
|
|
computed_hash_hex = stdout.decode().split()[0].strip()
|
|
computed_hash_bytes = bytes.fromhex(computed_hash_hex)
|
|
computed_hash_b58 = b58encode(computed_hash_bytes).decode()
|
|
make_log("uploader_v1.5", f"Computed hash (base58): {computed_hash_b58}", level="INFO")
|
|
except Exception as e:
|
|
make_log("uploader_v1.5", f"Error computing file hash: {e}", level="ERROR")
|
|
return response.json({"error": "Error computing file hash"}, status=500)
|
|
|
|
# Verify that the computed hash matches the provided hash
|
|
if computed_hash_b58 != provided_hash_b58:
|
|
make_log("uploader_v1.5", f"Hash mismatch: provided {provided_hash_b58} vs computed {computed_hash_b58}", level="ERROR")
|
|
try:
|
|
os.remove(temp_path)
|
|
make_log("uploader_v1.5", f"Temporary file removed due to hash mismatch: {temp_path}", level="INFO")
|
|
except Exception as e:
|
|
make_log("uploader_v1.5", f"Error removing temp file: {e}", level="ERROR")
|
|
return response.json({"error": "Hash mismatch"}, status=400)
|
|
|
|
# Determine the final file path
|
|
final_path = os.path.join(UPLOADS_DIR, f"v1.5_{computed_hash_b58}")
|
|
if os.path.exists(final_path):
|
|
make_log("uploader_v1.5", f"File already exists on disk: {final_path}", level="INFO")
|
|
try:
|
|
os.remove(temp_path)
|
|
make_log("uploader_v1.5", f"Temporary file removed: {temp_path}", level="INFO")
|
|
except Exception as e:
|
|
make_log("uploader_v1.5", f"Error removing temp file: {e}", level="ERROR")
|
|
existing = db_session.query(StoredContent).filter_by(hash=computed_hash_b58).first()
|
|
if existing:
|
|
serialized_v2 = existing.serialize_v2()
|
|
serialized_v1 = existing.serialize_v1()
|
|
return response.json({
|
|
"content_sha256": computed_hash_b58,
|
|
"content_id": serialized_v2,
|
|
"content_id_v1": serialized_v1,
|
|
"content_url": f"dmy://storage?cid={serialized_v2}",
|
|
})
|
|
else:
|
|
try:
|
|
os.rename(temp_path, final_path)
|
|
make_log("uploader_v1.5", f"Renamed temporary file to final location: {final_path}", level="INFO")
|
|
except Exception as e:
|
|
make_log("uploader_v1.5", f"Error renaming file: {e}", level="ERROR")
|
|
return response.json({"error": "Failed to finalize file storage"}, status=500)
|
|
|
|
# Create a new StoredContent record with user_id from request.ctx.user and commit to DB
|
|
try:
|
|
new_content = StoredContent(
|
|
type='local/content_bin',
|
|
hash=computed_hash_b58,
|
|
user_id=request.ctx.user.id, # 'user_id' is added to StoredContent
|
|
filename=provided_filename,
|
|
key_id=None,
|
|
meta={},
|
|
created=datetime.utcnow()
|
|
)
|
|
db_session.add(new_content)
|
|
db_session.commit()
|
|
make_log("uploader_v1.5", f"New file stored and indexed for user {request.ctx.user.id} with hash {computed_hash_b58}", level="INFO")
|
|
except Exception as e:
|
|
make_log("uploader_v1.5", f"Database error: {e}", level="ERROR")
|
|
return response.json({"error": "Database error"}, status=500)
|
|
|
|
serialized_v2 = new_content.serialize_v2()
|
|
serialized_v1 = new_content.serialize_v1()
|
|
return response.json({
|
|
"content_sha256": computed_hash_b58,
|
|
"content_id": serialized_v2,
|
|
"content_id_v1": serialized_v1,
|
|
"content_url": f"dmy://storage?cid={serialized_v2}",
|
|
})
|
|
|
|
|
|
# GET /api/v1.5/storage/<file_hash>
|
|
async def s_api_v1_5_storage_get(request, file_hash):
|
|
# Log the file retrieval request
|
|
make_log("uploader_v1.5", f"Received file retrieval request for hash: {file_hash}", level="INFO")
|
|
|
|
# Determine the file path based on the provided file_hash
|
|
final_path = os.path.join(UPLOADS_DIR, f"v1.5_{file_hash}")
|
|
if not os.path.exists(final_path):
|
|
make_log("uploader_v1.5", f"File not found: {final_path}", level="ERROR")
|
|
return response.json({"error": "File not found"}, status=404)
|
|
|
|
# Retrieve the StoredContent record from the database
|
|
db_session = request.ctx.db_session
|
|
stored = db_session.query(StoredContent).filter_by(hash=file_hash).first()
|
|
if stored and stored.filename:
|
|
filename_for_mime = stored.filename
|
|
else:
|
|
# If the record is not found or filename is not set, fallback to the file path
|
|
filename_for_mime = final_path
|
|
|
|
# Determine MIME type using filename from StoredContent
|
|
mime_type, _ = guess_type(filename_for_mime)
|
|
if not mime_type:
|
|
mime_type = "application/octet-stream"
|
|
|
|
file_size = os.path.getsize(final_path)
|
|
range_header = request.headers.get("Range")
|
|
|
|
if range_header:
|
|
make_log("uploader_v1.5", f"Processing Range header: {range_header}", level="INFO")
|
|
range_spec = range_header.strip().lower()
|
|
if not range_spec.startswith("bytes="):
|
|
make_log("uploader_v1.5", f"Invalid Range header: {range_header}", level="ERROR")
|
|
return response.json({"error": "Invalid Range header"}, status=400)
|
|
range_spec = range_spec[len("bytes="):]
|
|
# Split by comma to handle multiple ranges
|
|
range_parts = [part.strip() for part in range_spec.split(',')]
|
|
parsed_ranges = []
|
|
try:
|
|
for part in range_parts:
|
|
if '-' not in part:
|
|
raise ValueError("Invalid range format")
|
|
start_str, end_str = part.split('-', 1)
|
|
if start_str == "":
|
|
# Suffix byte range: last N bytes
|
|
suffix_length = int(end_str)
|
|
if suffix_length > file_size:
|
|
start = 0
|
|
else:
|
|
start = file_size - suffix_length
|
|
end = file_size - 1
|
|
else:
|
|
start = int(start_str)
|
|
if end_str == "":
|
|
end = file_size - 1
|
|
else:
|
|
end = int(end_str)
|
|
if start > end or end >= file_size:
|
|
raise ValueError("Requested Range Not Satisfiable")
|
|
parsed_ranges.append((start, end))
|
|
except Exception as e:
|
|
make_log("uploader_v1.5", f"Invalid Range header: {range_header} - {e}", level="ERROR")
|
|
return response.json({"error": "Invalid Range header"}, status=400)
|
|
|
|
# If only one range is requested, use single range response
|
|
if len(parsed_ranges) == 1:
|
|
start, end = parsed_ranges[0]
|
|
content_length = end - start + 1
|
|
headers = {
|
|
"Content-Range": f"bytes {start}-{end}/{file_size}",
|
|
"Accept-Ranges": "bytes",
|
|
"Content-Length": str(content_length),
|
|
"Content-Type": mime_type,
|
|
}
|
|
async def stream_file_range():
|
|
# Stream single range content
|
|
make_log("uploader_v1.5", f"Starting to stream file from byte {start} to {end}", level="INFO")
|
|
async with aiofiles.open(final_path, mode='rb') as f:
|
|
await f.seek(start)
|
|
remaining = content_length
|
|
chunk_size = 1024 * 1024 # 1MB chunks
|
|
while remaining > 0:
|
|
read_size = min(chunk_size, remaining)
|
|
data = await f.read(read_size)
|
|
if not data:
|
|
break
|
|
remaining -= len(data)
|
|
yield data
|
|
make_log("uploader_v1.5", f"Finished streaming file: {final_path}", level="INFO")
|
|
return response.stream(stream_file_range, status=206, headers=headers)
|
|
else:
|
|
# Multiple ranges requested: create a multipart/byteranges response
|
|
boundary = uuid4().hex # Generate a random boundary string
|
|
headers = {
|
|
"Content-Type": f"multipart/byteranges; boundary={boundary}",
|
|
"Accept-Ranges": "bytes",
|
|
}
|
|
async def stream_multipart():
|
|
# For each range, yield the boundary, part headers, and the file content
|
|
for start, end in parsed_ranges:
|
|
part_header = (
|
|
f"--{boundary}\r\n"
|
|
f"Content-Type: {mime_type}\r\n"
|
|
f"Content-Range: bytes {start}-{end}/{file_size}\r\n"
|
|
f"\r\n"
|
|
)
|
|
yield part_header.encode()
|
|
part_length = end - start + 1
|
|
async with aiofiles.open(final_path, mode='rb') as f:
|
|
await f.seek(start)
|
|
remaining = part_length
|
|
chunk_size = 1024 * 1024 # 1MB chunks
|
|
while remaining > 0:
|
|
read_size = min(chunk_size, remaining)
|
|
data = await f.read(read_size)
|
|
if not data:
|
|
break
|
|
remaining -= len(data)
|
|
yield data
|
|
yield b"\r\n"
|
|
# Final boundary marker
|
|
yield f"--{boundary}--\r\n".encode()
|
|
return response.stream(stream_multipart, status=206, headers=headers)
|
|
else:
|
|
# No Range header: return full file
|
|
make_log("uploader_v1.5", f"Returning full file for video/audio: {final_path}", level="INFO")
|
|
return await response.file(final_path, mime_type=mime_type)
|