uploader-bot/app/api/routes/progressive_storage.py

438 lines
21 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import subprocess
import asyncio
from uuid import uuid4
from datetime import datetime
from mimetypes import guess_type
from base64 import b64decode
import aiofiles
from base58 import b58encode
from sanic import response
from app.core.logger import make_log
from sqlalchemy import select
from app.core.models.node_storage import StoredContent
from app.core._config import UPLOADS_DIR
from app.core.models.content_v3 import ContentDerivative
from app.core._utils.resolve_content import resolve_content
from app.core.network.nodesig import verify_request
from app.core.models.my_network import KnownNode
from sqlalchemy import select as sa_select
import httpx
from app.core._crypto.signer import Signer
from app.core._secrets import hot_seed
from app.core._utils.b58 import b58encode as _b58e, b58decode as _b58d
import json, time
# POST /api/v1.5/storage
async def s_api_v1_5_storage_post(request):
# Log the receipt of a chunk upload request
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Received chunk upload request", level="INFO")
# Get the provided file name from header and decode it from base64
provided_filename_b64 = request.headers.get("X-File-Name")
if not provided_filename_b64:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Missing X-File-Name header", level="ERROR")
return response.json({"error": "Missing X-File-Name header"}, status=400)
try:
provided_filename = b64decode(provided_filename_b64).decode("utf-8")
except Exception as e:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Invalid X-File-Name header: {e}", level="ERROR")
return response.json({"error": "Invalid X-File-Name header"}, status=400)
# Get X-Chunk-Start header (must be provided) and parse it as integer
chunk_start_header = request.headers.get("X-Chunk-Start")
if chunk_start_header is None:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Missing X-Chunk-Start header", level="ERROR")
return response.json({"error": "Missing X-Chunk-Start header"}, status=400)
try:
chunk_start = int(chunk_start_header)
except Exception as e:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Invalid X-Chunk-Start header: {e}", level="ERROR")
return response.json({"error": "Invalid X-Chunk-Start header"}, status=400)
# Enforce maximum chunk size (80 MB) using Content-Length header if provided
max_chunk_size = 80 * 1024 * 1024 # 80 MB
content_length = request.headers.get("Content-Length")
if content_length is not None:
try:
content_length = int(content_length)
if content_length > max_chunk_size:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Chunk size {content_length} exceeds maximum allowed", level="ERROR")
return response.json({"error": "Chunk size exceeds maximum allowed (80 MB)"}, status=400)
except:
pass
# Determine if this is a new upload or a continuation (resume)
upload_id = request.headers.get("X-Upload-ID")
is_new_upload = False
if not upload_id:
# New upload session: generate a new uuid
upload_id = str(uuid4())
is_new_upload = True
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Start new upload session id={upload_id}", level="INFO")
else:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Resume upload session id={upload_id}", level="DEBUG")
# Determine the temporary file path based on upload_id
temp_path = os.path.join(UPLOADS_DIR, f"v1.5_upload_{upload_id}")
# Check current size of the temporary file (if it exists)
current_size = 0
if os.path.exists(temp_path):
current_size = os.path.getsize(temp_path)
# If the provided chunk_start is less than current_size, the chunk is already received
if chunk_start < current_size:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Chunk at {chunk_start} already received; size={current_size}", level="DEBUG")
return response.json({"upload_id": upload_id, "current_size": current_size})
elif chunk_start > current_size:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Chunk start {chunk_start} != current size {current_size}", level="ERROR")
return response.json({"error": "Chunk start does not match current file size"}, status=400)
# Append the received chunk to the temporary file
try:
mode = 'wb' if is_new_upload else 'ab'
async with aiofiles.open(temp_path, mode) as out_file:
data = request.body # Get the full body if available
if data:
await out_file.write(data) # Write the whole body at once
else:
async for chunk in request.stream:
await out_file.write(chunk)
new_size = os.path.getsize(temp_path)
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Appended chunk. size={new_size}", level="DEBUG")
except Exception as e:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Error saving chunk: {e}", level="ERROR")
return response.json({"error": "Failed to save chunk"}, status=500)
# If computed hash matches the provided one, the final chunk has been received
is_last_chunk = int(request.headers.get("X-Last-Chunk", "0")) == 1
if is_last_chunk:
# Compute the SHA256 hash of the temporary file using subprocess
try:
proc = await asyncio.create_subprocess_exec(
'sha256sum', temp_path,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await proc.communicate()
if proc.returncode != 0:
error_msg = stderr.decode().strip()
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} sha256sum error: {error_msg}", level="ERROR")
return response.json({"error": "Failed to compute file hash"}, status=500)
computed_hash_hex = stdout.decode().split()[0].strip()
computed_hash_bytes = bytes.fromhex(computed_hash_hex)
computed_hash_b58 = b58encode(computed_hash_bytes).decode()
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Computed hash (base58): {computed_hash_b58}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Error computing file hash: {e}", level="ERROR")
return response.json({"error": "Error computing file hash"}, status=500)
final_path = os.path.join(UPLOADS_DIR, f"{computed_hash_b58}")
try:
os.rename(temp_path, final_path)
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Final chunk received. Renamed to: {final_path}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Error renaming file: {e}", level="ERROR")
return response.json({"error": "Failed to finalize file storage"}, status=500)
db_session = request.ctx.db_session
existing = (await db_session.execute(select(StoredContent).where(StoredContent.hash == computed_hash_b58))).scalars().first()
if existing:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} File already exists in DB: {computed_hash_b58}", level="INFO")
serialized_v2 = existing.cid.serialize_v2()
serialized_v1 = existing.cid.serialize_v1()
return response.json({
"upload_id": upload_id,
"content_sha256": computed_hash_b58,
"content_id": serialized_v2,
"content_id_v1": serialized_v1,
"content_url": f"dmy://storage?cid={serialized_v2}",
})
try:
user_id = request.ctx.user.id if request.ctx.user else None
new_content = StoredContent(
type='local/content_bin',
hash=computed_hash_b58,
user_id=user_id,
filename=provided_filename,
key_id=None,
meta={},
created=datetime.utcnow()
)
db_session.add(new_content)
await db_session.commit()
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Stored new file user={user_id} hash={computed_hash_b58}", level="INFO")
except Exception as e:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Database error: {e}", level="ERROR")
return response.json({"error": "Database error"}, status=500)
serialized_v2 = new_content.cid.serialize_v2()
serialized_v1 = new_content.cid.serialize_v1()
return response.json({
"upload_id": upload_id,
"content_sha256": computed_hash_b58,
"content_id": serialized_v2,
"content_id_v1": serialized_v1,
"content_url": f"dmy://storage?cid={serialized_v2}",
})
else:
# Not the final chunk yet return current upload status
return response.json({"upload_id": upload_id, "current_size": os.path.getsize(temp_path)})
# GET /api/v1.5/storage/<file_hash>
async def s_api_v1_5_storage_get(request, file_hash):
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Retrieve file hash={file_hash}", level="INFO")
try:
file_hash = b58encode(resolve_content(file_hash)[0].content_hash).decode()
except:
pass
final_path = os.path.join(UPLOADS_DIR, f"{file_hash}")
if not os.path.exists(final_path):
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} File not found: {final_path}", level="ERROR")
return response.json({"error": "File not found"}, status=404)
db_session = request.ctx.db_session
stored = (await db_session.execute(select(StoredContent).where(StoredContent.hash == file_hash))).scalars().first()
if stored and stored.filename:
filename_for_mime = stored.filename
else:
filename_for_mime = final_path
mime_type, _ = guess_type(filename_for_mime)
if not mime_type:
mime_type = "application/octet-stream"
file_size = os.path.getsize(final_path)
range_header = request.headers.get("Range")
# touch derivative last_access_at if exists
try:
cd = (await request.ctx.db_session.execute(select(ContentDerivative).where(ContentDerivative.local_path.like(f"%/{file_hash}")))).scalars().first()
if cd:
cd.last_access_at = datetime.utcnow()
await request.ctx.db_session.commit()
except Exception:
pass
if range_header:
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Processing Range: {range_header}", level="DEBUG")
range_spec = range_header.strip().lower()
if not range_spec.startswith("bytes="):
make_log("uploader_v1.5", f"Invalid Range header: {range_header}", level="ERROR")
return response.json({"error": "Invalid Range header"}, status=400)
range_spec = range_spec[len("bytes="):]
range_parts = [part.strip() for part in range_spec.split(',')]
parsed_ranges = []
try:
for part in range_parts:
if '-' not in part:
raise ValueError("Invalid range format")
start_str, end_str = part.split('-', 1)
if start_str == "":
suffix_length = int(end_str)
start = 0 if suffix_length > file_size else file_size - suffix_length
end = file_size - 1
else:
start = int(start_str)
end = file_size - 1 if end_str == "" else int(end_str)
if start > end or end >= file_size:
raise ValueError("Requested Range Not Satisfiable")
parsed_ranges.append((start, end))
except Exception as e:
make_log("uploader_v1.5", f"Invalid Range header: {range_header} - {e}", level="ERROR")
return response.json({"error": "Invalid Range header"}, status=400)
if len(parsed_ranges) == 1:
# Single range streaming
start, end = parsed_ranges[0]
content_length = end - start + 1
headers = {
"Content-Range": f"bytes {start}-{end}/{file_size}",
"Accept-Ranges": "bytes",
"Content-Length": str(content_length),
"Content-Type": mime_type,
}
# Create response for streaming
stream_response = await request.respond(headers=headers, status=206, content_type=mime_type)
make_log("uploader_v1.5", f"Starting to stream file from byte {start} to {end}", level="INFO")
async with aiofiles.open(final_path, mode='rb') as f:
await f.seek(start)
remaining = content_length
chunk_size = 1024 * 1024 # chunk size in bytes
while remaining > 0:
read_size = min(chunk_size, remaining)
data = await f.read(read_size)
if not data:
break
remaining -= len(data)
await stream_response.send(data)
make_log("uploader_v1.5", f"Finished streaming file: {final_path}", level="INFO")
await stream_response.eof()
return stream_response
else:
# Multipart range streaming
boundary = uuid4().hex
headers = {
"Content-Type": f"multipart/byteranges; boundary={boundary}",
"Accept-Ranges": "bytes",
}
stream_response = await request.respond(headers=headers, status=206)
for start, end in parsed_ranges:
part_header = (
f"--{boundary}\r\n"
f"Content-Type: {mime_type}\r\n"
f"Content-Range: bytes {start}-{end}/{file_size}\r\n"
f"\r\n"
)
await stream_response.send(part_header.encode())
part_length = end - start + 1
async with aiofiles.open(final_path, mode='rb') as f:
await f.seek(start)
remaining = part_length
chunk_size = 1024 * 1024
while remaining > 0:
read_size = min(chunk_size, remaining)
data = await f.read(read_size)
if not data:
break
remaining -= len(data)
await stream_response.send(data)
await stream_response.send(b"\r\n")
await stream_response.send(f"--{boundary}--\r\n".encode())
await stream_response.eof()
return stream_response
else:
make_log("uploader_v1.5", f"Returning full file for video/audio: {final_path}", level="INFO")
return await response.file(final_path, mime_type=mime_type)
# GET /api/v1/storage.fetch/<file_hash>
# Внутренний эндпойнт для межузлового запроса (NodeSig). Возвращает файл, если он есть локально.
async def s_api_v1_storage_fetch(request, file_hash):
ok, node_id, reason = verify_request(request, request.app.ctx.memory)
if not ok:
return response.json({"error": reason or "UNAUTHORIZED"}, status=401)
# Только доверенные узлы
try:
session = request.ctx.db_session
row = (await session.execute(sa_select(KnownNode).where(KnownNode.public_key == node_id))).scalars().first()
role = (row.meta or {}).get('role') if row and row.meta else None
if role != 'trusted':
return response.json({"error": "DENIED_NOT_TRUSTED"}, status=403)
except Exception:
pass
# Переиспользуем реализацию v1.5
return await s_api_v1_5_storage_get(request, file_hash)
# GET /api/v1/storage.proxy/<file_hash>
# Проксирование для web-клиента: если локально нет файла, попытка получить у доверенных узлов по NodeSig
async def s_api_v1_storage_proxy(request, file_hash):
# Require either valid NodeSig (unlikely for public clients) or a signed access token
# Token fields: pub, exp, scope, uid, sig over json {hash,scope,exp,uid}
def _verify_access_token() -> bool:
try:
pub = (request.args.get('pub') or '').strip()
exp = int(request.args.get('exp') or '0')
scope = (request.args.get('scope') or '').strip()
uid = int(request.args.get('uid') or '0')
sig = (request.args.get('sig') or '').strip()
if not pub or not exp or not scope or not sig:
return False
if exp < int(time.time()):
return False
payload = {
'hash': file_hash,
'scope': scope,
'exp': exp,
'uid': uid,
}
blob = json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()
import nacl.signing
vk = nacl.signing.VerifyKey(_b58d(pub))
vk.verify(blob, _b58d(sig))
# Note: we do not require a session-bound user for media fetches,
# the shortlived signature itself is sufficient.
return True
except Exception:
return False
ok_nodesig, _nid, _reason = verify_request(request, request.app.ctx.memory)
if not ok_nodesig and not _verify_access_token():
return response.json({'error': 'UNAUTHORIZED'}, status=401)
# Сначала пробуем локально без возврата 404
try:
from base58 import b58encode as _b58e
try:
# Поддержка как хэша, так и CID
from app.core._utils.resolve_content import resolve_content as _res
cid, _ = _res(file_hash)
file_hash = _b58e(cid.content_hash).decode()
except Exception:
pass
final_path = os.path.join(UPLOADS_DIR, f"{file_hash}")
if os.path.exists(final_path):
return await s_api_v1_5_storage_get(request, file_hash)
except Exception:
pass
# Локально нет — пробуем у доверенных
try:
async with request.app.ctx.memory.transaction("storage.proxy"):
# Соберём список trusted узлов
session = request.ctx.db_session
nodes = (await session.execute(sa_select(KnownNode))).scalars().all()
candidates = []
for n in nodes:
role = (n.meta or {}).get('role') if n.meta else None
if role != 'trusted':
continue
host = (n.meta or {}).get('public_host') or (n.ip or '')
if not host:
continue
base = host.rstrip('/')
if not base.startswith('http'):
base = f"http://{base}:{n.port or 80}"
candidates.append(base)
# Проксируем с передачей Range, стриминг
range_header = request.headers.get("Range")
timeout = httpx.Timeout(10.0, read=60.0)
for base in candidates:
url = f"{base}/api/v1/storage.fetch/{file_hash}"
try:
# Подпишем NodeSig
from app.core._secrets import hot_seed, hot_pubkey
from app.core.network.nodesig import sign_headers
from app.core._utils.b58 import b58encode as _b58e
pk_b58 = _b58e(hot_pubkey).decode()
headers = sign_headers('GET', f"/api/v1/storage.fetch/{file_hash}", b"", hot_seed, pk_b58)
if range_header:
headers['Range'] = range_header
async with httpx.AsyncClient(timeout=timeout) as client:
r = await client.get(url, headers=headers)
if r.status_code == 404:
continue
if r.status_code not in (200, 206):
continue
# Проксируем заголовки контента
resp = await request.respond(status=r.status_code, headers={
k: v for k, v in r.headers.items() if k.lower() in ("content-type", "content-length", "content-range", "accept-ranges")
})
async for chunk in r.aiter_bytes(chunk_size=1024*1024):
await resp.send(chunk)
await resp.eof()
return resp
except Exception as e:
continue
except Exception:
pass
return response.json({"error": "File not found"}, status=404)