better logs
This commit is contained in:
parent
82758fb11a
commit
695969f015
|
|
@ -1,6 +1,8 @@
|
||||||
import traceback
|
import traceback
|
||||||
|
|
||||||
from sanic import Sanic, response
|
from sanic import Sanic, response
|
||||||
|
from uuid import uuid4
|
||||||
|
import traceback as _traceback
|
||||||
|
|
||||||
from app.core.logger import make_log
|
from app.core.logger import make_log
|
||||||
|
|
||||||
|
|
@ -63,18 +65,61 @@ app.add_route(s_api_v1_5_content_list, "/api/v1.5/content.list", methods=["GET",
|
||||||
|
|
||||||
@app.exception(BaseException)
|
@app.exception(BaseException)
|
||||||
async def s_handle_exception(request, exception):
|
async def s_handle_exception(request, exception):
|
||||||
response_buffer = response.json({"error": "An internal server error occurred"}, status=500)
|
# Correlate error to request
|
||||||
|
session_id = getattr(request.ctx, 'session_id', None) or uuid4().hex[:16]
|
||||||
|
error_id = uuid4().hex[:8]
|
||||||
|
|
||||||
|
status = 500
|
||||||
|
code = type(exception).__name__
|
||||||
|
message = "Internal HTTP Error"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
raise exception
|
raise exception
|
||||||
except AssertionError as e:
|
except AssertionError as e:
|
||||||
response_buffer = response.json({"error": str(e)}, status=400)
|
status = 400
|
||||||
|
code = 'AssertionError'
|
||||||
|
message = str(e) or 'Bad Request'
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
make_log("sanic_exception", f"Exception: {e}" + '\n' + str(traceback.format_exc()), level='error')
|
# keep default 500, but expose exception message to aid debugging
|
||||||
|
message = str(e) or message
|
||||||
|
|
||||||
|
# Build structured log with full context and traceback
|
||||||
|
try:
|
||||||
|
tb = _traceback.format_exc()
|
||||||
|
user_id = getattr(getattr(request.ctx, 'user', None), 'id', None)
|
||||||
|
log_ctx = {
|
||||||
|
'sid': session_id,
|
||||||
|
'eid': error_id,
|
||||||
|
'path': request.path,
|
||||||
|
'method': request.method,
|
||||||
|
'query': dict(request.args) if hasattr(request, 'args') else {},
|
||||||
|
'user_id': user_id,
|
||||||
|
'remote': (request.headers.get('X-Forwarded-For') or request.remote_addr or request.ip),
|
||||||
|
'code': code,
|
||||||
|
'message': message,
|
||||||
|
'traceback': tb,
|
||||||
|
}
|
||||||
|
make_log('http_exception', 'API exception', level='error', **log_ctx)
|
||||||
|
except BaseException:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Return enriched error response for the client
|
||||||
|
payload = {
|
||||||
|
'error': True,
|
||||||
|
'code': code,
|
||||||
|
'message': message,
|
||||||
|
'session_id': session_id,
|
||||||
|
'error_id': error_id,
|
||||||
|
'path': request.path,
|
||||||
|
'method': request.method,
|
||||||
|
}
|
||||||
|
|
||||||
|
response_buffer = response.json(payload, status=status)
|
||||||
response_buffer = await close_db_session(request, response_buffer)
|
response_buffer = await close_db_session(request, response_buffer)
|
||||||
response_buffer.headers["Access-Control-Allow-Origin"] = "*"
|
response_buffer.headers["Access-Control-Allow-Origin"] = "*"
|
||||||
response_buffer.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
|
response_buffer.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
|
||||||
response_buffer.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site"
|
response_buffer.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-request-id"]
|
||||||
response_buffer.headers["Access-Control-Allow-Credentials"] = "true"
|
response_buffer.headers["Access-Control-Allow-Credentials"] = "true"
|
||||||
|
response_buffer.headers["X-Session-Id"] = session_id
|
||||||
|
response_buffer.headers["X-Error-Id"] = error_id
|
||||||
return response_buffer
|
return response_buffer
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,6 @@
|
||||||
from base58 import b58decode
|
from base58 import b58decode
|
||||||
from sanic import response as sanic_response
|
from sanic import response as sanic_response
|
||||||
|
from uuid import uuid4
|
||||||
|
|
||||||
from app.core._crypto.signer import Signer
|
from app.core._crypto.signer import Signer
|
||||||
from app.core._secrets import hot_seed
|
from app.core._secrets import hot_seed
|
||||||
|
|
@ -13,11 +14,17 @@ from app.core.storage import new_session
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
|
||||||
def attach_headers(response):
|
def attach_headers(response, request=None):
|
||||||
response.headers["Access-Control-Allow-Origin"] = "*"
|
response.headers["Access-Control-Allow-Origin"] = "*"
|
||||||
response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
|
response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS"
|
||||||
response.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-file-name, x-last-chunk, x-chunk-start, x-upload-id"
|
response.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-file-name, x-last-chunk, x-chunk-start, x-upload-id, x-request-id"
|
||||||
# response.headers["Access-Control-Allow-Credentials"] = "true"
|
# response.headers["Access-Control-Allow-Credentials"] = "true"
|
||||||
|
try:
|
||||||
|
sid = getattr(request.ctx, 'session_id', None) if request else None
|
||||||
|
if sid:
|
||||||
|
response.headers["X-Session-Id"] = sid
|
||||||
|
except BaseException:
|
||||||
|
pass
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -121,7 +128,14 @@ async def save_activity(request):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
try:
|
try:
|
||||||
activity_meta["headers"] = dict(request.headers)
|
# Sanitize sensitive headers
|
||||||
|
headers = dict(request.headers)
|
||||||
|
for hk in list(headers.keys()):
|
||||||
|
if str(hk).lower() in [
|
||||||
|
'authorization', 'cookie', 'x-service-signature', 'x-message-hash'
|
||||||
|
]:
|
||||||
|
headers[hk] = '<redacted>'
|
||||||
|
activity_meta["headers"] = headers
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
@ -130,7 +144,7 @@ async def save_activity(request):
|
||||||
meta=activity_meta,
|
meta=activity_meta,
|
||||||
user_id=request.ctx.user.id if request.ctx.user else None,
|
user_id=request.ctx.user.id if request.ctx.user else None,
|
||||||
user_ip=activity_meta.get("ip", "0.0.0.0"),
|
user_ip=activity_meta.get("ip", "0.0.0.0"),
|
||||||
created=datetime.now()
|
created=datetime.utcnow()
|
||||||
)
|
)
|
||||||
request.ctx.db_session.add(new_user_activity)
|
request.ctx.db_session.add(new_user_activity)
|
||||||
await request.ctx.db_session.commit()
|
await request.ctx.db_session.commit()
|
||||||
|
|
@ -138,7 +152,7 @@ async def save_activity(request):
|
||||||
|
|
||||||
async def attach_user_to_request(request):
|
async def attach_user_to_request(request):
|
||||||
if request.method == 'OPTIONS':
|
if request.method == 'OPTIONS':
|
||||||
return attach_headers(sanic_response.text("OK"))
|
return attach_headers(sanic_response.text("OK"), request)
|
||||||
|
|
||||||
request.ctx.db_session = new_session()
|
request.ctx.db_session = new_session()
|
||||||
request.ctx.verified_hash = None
|
request.ctx.verified_hash = None
|
||||||
|
|
@ -146,6 +160,17 @@ async def attach_user_to_request(request):
|
||||||
request.ctx.user_key = None
|
request.ctx.user_key = None
|
||||||
request.ctx.user_uploader_wrapper = Wrapped_CBotChat(request.app.ctx.memory._telegram_bot, db_session=request.ctx.db_session)
|
request.ctx.user_uploader_wrapper = Wrapped_CBotChat(request.app.ctx.memory._telegram_bot, db_session=request.ctx.db_session)
|
||||||
request.ctx.user_client_wrapper = Wrapped_CBotChat(request.app.ctx.memory._client_telegram_bot, db_session=request.ctx.db_session)
|
request.ctx.user_client_wrapper = Wrapped_CBotChat(request.app.ctx.memory._client_telegram_bot, db_session=request.ctx.db_session)
|
||||||
|
# Correlation/session id for this request: prefer proxy-provided X-Request-ID
|
||||||
|
incoming_req_id = request.headers.get('X-Request-Id') or request.headers.get('X-Request-ID')
|
||||||
|
request.ctx.session_id = (incoming_req_id or uuid4().hex)[:32]
|
||||||
|
try:
|
||||||
|
make_log(
|
||||||
|
"HTTP",
|
||||||
|
f"Request start sid={request.ctx.session_id} {request.method} {request.path}",
|
||||||
|
level='info'
|
||||||
|
)
|
||||||
|
except BaseException:
|
||||||
|
pass
|
||||||
await try_authorization(request)
|
await try_authorization(request)
|
||||||
await save_activity(request)
|
await save_activity(request)
|
||||||
await try_service_authorization(request)
|
await try_service_authorization(request)
|
||||||
|
|
@ -160,12 +185,21 @@ async def close_request_handler(request, response):
|
||||||
except BaseException:
|
except BaseException:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
response = attach_headers(response)
|
try:
|
||||||
|
make_log(
|
||||||
|
"HTTP",
|
||||||
|
f"Request end sid={getattr(request.ctx, 'session_id', None)} {request.method} {request.path} status={getattr(response, 'status', None)}",
|
||||||
|
level='info'
|
||||||
|
)
|
||||||
|
except BaseException:
|
||||||
|
pass
|
||||||
|
|
||||||
|
response = attach_headers(response, request)
|
||||||
|
|
||||||
return request, response
|
return request, response
|
||||||
|
|
||||||
|
|
||||||
async def close_db_session(request, response):
|
async def close_db_session(request, response):
|
||||||
request, response = await close_request_handler(request, response)
|
request, response = await close_request_handler(request, response)
|
||||||
response = attach_headers(response)
|
response = attach_headers(response, request)
|
||||||
return response
|
return response
|
||||||
|
|
|
||||||
|
|
@ -100,7 +100,7 @@ async def s_api_v1_storage_post(request):
|
||||||
"content_url": f"dmy://storage?cid={new_cid}",
|
"content_url": f"dmy://storage?cid={new_cid}",
|
||||||
})
|
})
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
make_log("Storage", f"Error: {e}" + '\n' + traceback.format_exc(), level="error")
|
make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Error: {e}" + '\n' + traceback.format_exc(), level="error")
|
||||||
return response.json({"error": f"Error: {e}"}, status=500)
|
return response.json({"error": f"Error: {e}"}, status=500)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -121,10 +121,10 @@ async def s_api_v1_storage_get(request, file_hash=None):
|
||||||
if not content:
|
if not content:
|
||||||
return response.json({"error": "File not found"}, status=404)
|
return response.json({"error": "File not found"}, status=404)
|
||||||
|
|
||||||
make_log("Storage", f"File {content_sha256} requested by {request.ctx.user}")
|
make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} File {content_sha256} requested by user={getattr(getattr(request.ctx, 'user', None), 'id', None)}")
|
||||||
file_path = os.path.join(UPLOADS_DIR, content_sha256)
|
file_path = os.path.join(UPLOADS_DIR, content_sha256)
|
||||||
if not os.path.exists(file_path):
|
if not os.path.exists(file_path):
|
||||||
make_log("Storage", f"File {content_sha256} not found locally", level="error")
|
make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} File {content_sha256} not found locally", level="error")
|
||||||
return response.json({"error": "File not found"}, status=404)
|
return response.json({"error": "File not found"}, status=404)
|
||||||
|
|
||||||
async with aiofiles.open(file_path, "rb") as file:
|
async with aiofiles.open(file_path, "rb") as file:
|
||||||
|
|
@ -187,25 +187,25 @@ async def s_api_v1_storage_get(request, file_hash=None):
|
||||||
try:
|
try:
|
||||||
audio = AudioSegment.from_file(file_path)
|
audio = AudioSegment.from_file(file_path)
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
make_log("Storage", f"Error loading audio from file: {e}", level="debug")
|
make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Error loading audio from file: {e}", level="debug")
|
||||||
|
|
||||||
if not audio:
|
if not audio:
|
||||||
try:
|
try:
|
||||||
audio = AudioSegment(content_file_bin)
|
audio = AudioSegment(content_file_bin)
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
make_log("Storage", f"Error loading audio from binary: {e}", level="debug")
|
make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Error loading audio from binary: {e}", level="debug")
|
||||||
|
|
||||||
audio = audio[:seconds_limit * 1000] if seconds_limit else audio
|
audio = audio[:seconds_limit * 1000] if seconds_limit else audio
|
||||||
audio.export(tempfile_path, format="mp3", cover=cover_tempfile_path)
|
audio.export(tempfile_path, format="mp3", cover=cover_tempfile_path)
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
make_log("Storage", f"Error converting audio: {e}" + '\n' + traceback.format_exc(), level="error")
|
make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Error converting audio: {e}" + '\n' + traceback.format_exc(), level="error")
|
||||||
|
|
||||||
if os.path.exists(tempfile_path):
|
if os.path.exists(tempfile_path):
|
||||||
async with aiofiles.open(tempfile_path, "rb") as file:
|
async with aiofiles.open(tempfile_path, "rb") as file:
|
||||||
content_file_bin = await file.read()
|
content_file_bin = await file.read()
|
||||||
|
|
||||||
accept_type = 'audio/mpeg'
|
accept_type = 'audio/mpeg'
|
||||||
make_log("Storage", f"Audio {content_sha256} converted successfully")
|
make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Audio {content_sha256} converted successfully", level='debug')
|
||||||
else:
|
else:
|
||||||
tempfile_path = tempfile_path[:-5]
|
tempfile_path = tempfile_path[:-5]
|
||||||
|
|
||||||
|
|
@ -222,13 +222,13 @@ async def s_api_v1_storage_get(request, file_hash=None):
|
||||||
break
|
break
|
||||||
quality -= 5
|
quality -= 5
|
||||||
except BaseException as e:
|
except BaseException as e:
|
||||||
make_log("Storage", f"Error converting image: {e}" + '\n' + traceback.format_exc(), level="error")
|
make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Error converting image: {e}" + '\n' + traceback.format_exc(), level="error")
|
||||||
|
|
||||||
if os.path.exists(tempfile_path):
|
if os.path.exists(tempfile_path):
|
||||||
async with aiofiles.open(tempfile_path, "rb") as file:
|
async with aiofiles.open(tempfile_path, "rb") as file:
|
||||||
content_file_bin = await file.read()
|
content_file_bin = await file.read()
|
||||||
|
|
||||||
make_log("Storage", f"Image {content_sha256} converted successfully")
|
make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Image {content_sha256} converted successfully", level='debug')
|
||||||
accept_type = 'image/jpeg'
|
accept_type = 'image/jpeg'
|
||||||
else:
|
else:
|
||||||
tempfile_path = tempfile_path[:-5]
|
tempfile_path = tempfile_path[:-5]
|
||||||
|
|
|
||||||
|
|
@ -20,28 +20,28 @@ from app.core._utils.resolve_content import resolve_content
|
||||||
# POST /api/v1.5/storage
|
# POST /api/v1.5/storage
|
||||||
async def s_api_v1_5_storage_post(request):
|
async def s_api_v1_5_storage_post(request):
|
||||||
# Log the receipt of a chunk upload request
|
# Log the receipt of a chunk upload request
|
||||||
make_log("uploader_v1.5", "Received chunk upload request", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Received chunk upload request", level="INFO")
|
||||||
|
|
||||||
# Get the provided file name from header and decode it from base64
|
# Get the provided file name from header and decode it from base64
|
||||||
provided_filename_b64 = request.headers.get("X-File-Name")
|
provided_filename_b64 = request.headers.get("X-File-Name")
|
||||||
if not provided_filename_b64:
|
if not provided_filename_b64:
|
||||||
make_log("uploader_v1.5", "Missing X-File-Name header", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Missing X-File-Name header", level="ERROR")
|
||||||
return response.json({"error": "Missing X-File-Name header"}, status=400)
|
return response.json({"error": "Missing X-File-Name header"}, status=400)
|
||||||
try:
|
try:
|
||||||
provided_filename = b64decode(provided_filename_b64).decode("utf-8")
|
provided_filename = b64decode(provided_filename_b64).decode("utf-8")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
make_log("uploader_v1.5", f"Invalid X-File-Name header: {e}", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Invalid X-File-Name header: {e}", level="ERROR")
|
||||||
return response.json({"error": "Invalid X-File-Name header"}, status=400)
|
return response.json({"error": "Invalid X-File-Name header"}, status=400)
|
||||||
|
|
||||||
# Get X-Chunk-Start header (must be provided) and parse it as integer
|
# Get X-Chunk-Start header (must be provided) and parse it as integer
|
||||||
chunk_start_header = request.headers.get("X-Chunk-Start")
|
chunk_start_header = request.headers.get("X-Chunk-Start")
|
||||||
if chunk_start_header is None:
|
if chunk_start_header is None:
|
||||||
make_log("uploader_v1.5", "Missing X-Chunk-Start header", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Missing X-Chunk-Start header", level="ERROR")
|
||||||
return response.json({"error": "Missing X-Chunk-Start header"}, status=400)
|
return response.json({"error": "Missing X-Chunk-Start header"}, status=400)
|
||||||
try:
|
try:
|
||||||
chunk_start = int(chunk_start_header)
|
chunk_start = int(chunk_start_header)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
make_log("uploader_v1.5", f"Invalid X-Chunk-Start header: {e}", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Invalid X-Chunk-Start header: {e}", level="ERROR")
|
||||||
return response.json({"error": "Invalid X-Chunk-Start header"}, status=400)
|
return response.json({"error": "Invalid X-Chunk-Start header"}, status=400)
|
||||||
|
|
||||||
# Enforce maximum chunk size (80 MB) using Content-Length header if provided
|
# Enforce maximum chunk size (80 MB) using Content-Length header if provided
|
||||||
|
|
@ -51,7 +51,7 @@ async def s_api_v1_5_storage_post(request):
|
||||||
try:
|
try:
|
||||||
content_length = int(content_length)
|
content_length = int(content_length)
|
||||||
if content_length > max_chunk_size:
|
if content_length > max_chunk_size:
|
||||||
make_log("uploader_v1.5", f"Chunk size {content_length} exceeds maximum allowed", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Chunk size {content_length} exceeds maximum allowed", level="ERROR")
|
||||||
return response.json({"error": "Chunk size exceeds maximum allowed (80 MB)"}, status=400)
|
return response.json({"error": "Chunk size exceeds maximum allowed (80 MB)"}, status=400)
|
||||||
except:
|
except:
|
||||||
pass
|
pass
|
||||||
|
|
@ -63,9 +63,9 @@ async def s_api_v1_5_storage_post(request):
|
||||||
# New upload session: generate a new uuid
|
# New upload session: generate a new uuid
|
||||||
upload_id = str(uuid4())
|
upload_id = str(uuid4())
|
||||||
is_new_upload = True
|
is_new_upload = True
|
||||||
make_log("uploader_v1.5", f"Starting new upload session with ID: {upload_id}", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Start new upload session id={upload_id}", level="INFO")
|
||||||
else:
|
else:
|
||||||
make_log("uploader_v1.5", f"Resuming upload session with ID: {upload_id}", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Resume upload session id={upload_id}", level="DEBUG")
|
||||||
|
|
||||||
# Determine the temporary file path based on upload_id
|
# Determine the temporary file path based on upload_id
|
||||||
temp_path = os.path.join(UPLOADS_DIR, f"v1.5_upload_{upload_id}")
|
temp_path = os.path.join(UPLOADS_DIR, f"v1.5_upload_{upload_id}")
|
||||||
|
|
@ -77,10 +77,10 @@ async def s_api_v1_5_storage_post(request):
|
||||||
|
|
||||||
# If the provided chunk_start is less than current_size, the chunk is already received
|
# If the provided chunk_start is less than current_size, the chunk is already received
|
||||||
if chunk_start < current_size:
|
if chunk_start < current_size:
|
||||||
make_log("uploader_v1.5", f"Chunk starting at {chunk_start} already received, current size: {current_size}", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Chunk at {chunk_start} already received; size={current_size}", level="DEBUG")
|
||||||
return response.json({"upload_id": upload_id, "current_size": current_size})
|
return response.json({"upload_id": upload_id, "current_size": current_size})
|
||||||
elif chunk_start > current_size:
|
elif chunk_start > current_size:
|
||||||
make_log("uploader_v1.5", f"Chunk start {chunk_start} does not match current file size {current_size}", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Chunk start {chunk_start} != current size {current_size}", level="ERROR")
|
||||||
return response.json({"error": "Chunk start does not match current file size"}, status=400)
|
return response.json({"error": "Chunk start does not match current file size"}, status=400)
|
||||||
|
|
||||||
# Append the received chunk to the temporary file
|
# Append the received chunk to the temporary file
|
||||||
|
|
@ -94,9 +94,9 @@ async def s_api_v1_5_storage_post(request):
|
||||||
async for chunk in request.stream:
|
async for chunk in request.stream:
|
||||||
await out_file.write(chunk)
|
await out_file.write(chunk)
|
||||||
new_size = os.path.getsize(temp_path)
|
new_size = os.path.getsize(temp_path)
|
||||||
make_log("uploader_v1.5", f"Appended chunk. New file size: {new_size}", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Appended chunk. size={new_size}", level="DEBUG")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
make_log("uploader_v1.5", f"Error saving chunk: {e}", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Error saving chunk: {e}", level="ERROR")
|
||||||
return response.json({"error": "Failed to save chunk"}, status=500)
|
return response.json({"error": "Failed to save chunk"}, status=500)
|
||||||
|
|
||||||
# If computed hash matches the provided one, the final chunk has been received
|
# If computed hash matches the provided one, the final chunk has been received
|
||||||
|
|
@ -112,28 +112,28 @@ async def s_api_v1_5_storage_post(request):
|
||||||
stdout, stderr = await proc.communicate()
|
stdout, stderr = await proc.communicate()
|
||||||
if proc.returncode != 0:
|
if proc.returncode != 0:
|
||||||
error_msg = stderr.decode().strip()
|
error_msg = stderr.decode().strip()
|
||||||
make_log("uploader_v1.5", f"sha256sum error: {error_msg}", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} sha256sum error: {error_msg}", level="ERROR")
|
||||||
return response.json({"error": "Failed to compute file hash"}, status=500)
|
return response.json({"error": "Failed to compute file hash"}, status=500)
|
||||||
computed_hash_hex = stdout.decode().split()[0].strip()
|
computed_hash_hex = stdout.decode().split()[0].strip()
|
||||||
computed_hash_bytes = bytes.fromhex(computed_hash_hex)
|
computed_hash_bytes = bytes.fromhex(computed_hash_hex)
|
||||||
computed_hash_b58 = b58encode(computed_hash_bytes).decode()
|
computed_hash_b58 = b58encode(computed_hash_bytes).decode()
|
||||||
make_log("uploader_v1.5", f"Computed hash (base58): {computed_hash_b58}", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Computed hash (base58): {computed_hash_b58}", level="INFO")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
make_log("uploader_v1.5", f"Error computing file hash: {e}", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Error computing file hash: {e}", level="ERROR")
|
||||||
return response.json({"error": "Error computing file hash"}, status=500)
|
return response.json({"error": "Error computing file hash"}, status=500)
|
||||||
|
|
||||||
final_path = os.path.join(UPLOADS_DIR, f"{computed_hash_b58}")
|
final_path = os.path.join(UPLOADS_DIR, f"{computed_hash_b58}")
|
||||||
try:
|
try:
|
||||||
os.rename(temp_path, final_path)
|
os.rename(temp_path, final_path)
|
||||||
make_log("uploader_v1.5", f"Final chunk received. File renamed to: {final_path}", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Final chunk received. Renamed to: {final_path}", level="INFO")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
make_log("uploader_v1.5", f"Error renaming file: {e}", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Error renaming file: {e}", level="ERROR")
|
||||||
return response.json({"error": "Failed to finalize file storage"}, status=500)
|
return response.json({"error": "Failed to finalize file storage"}, status=500)
|
||||||
|
|
||||||
db_session = request.ctx.db_session
|
db_session = request.ctx.db_session
|
||||||
existing = (await db_session.execute(select(StoredContent).where(StoredContent.hash == computed_hash_b58))).scalars().first()
|
existing = (await db_session.execute(select(StoredContent).where(StoredContent.hash == computed_hash_b58))).scalars().first()
|
||||||
if existing:
|
if existing:
|
||||||
make_log("uploader_v1.5", f"File with hash {computed_hash_b58} already exists in DB", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} File already exists in DB: {computed_hash_b58}", level="INFO")
|
||||||
serialized_v2 = existing.cid.serialize_v2()
|
serialized_v2 = existing.cid.serialize_v2()
|
||||||
serialized_v1 = existing.cid.serialize_v1()
|
serialized_v1 = existing.cid.serialize_v1()
|
||||||
return response.json({
|
return response.json({
|
||||||
|
|
@ -158,9 +158,9 @@ async def s_api_v1_5_storage_post(request):
|
||||||
)
|
)
|
||||||
db_session.add(new_content)
|
db_session.add(new_content)
|
||||||
await db_session.commit()
|
await db_session.commit()
|
||||||
make_log("uploader_v1.5", f"New file stored and indexed for user {user_id} with hash {computed_hash_b58}", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Stored new file user={user_id} hash={computed_hash_b58}", level="INFO")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
make_log("uploader_v1.5", f"Database error: {e}", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Database error: {e}", level="ERROR")
|
||||||
return response.json({"error": "Database error"}, status=500)
|
return response.json({"error": "Database error"}, status=500)
|
||||||
|
|
||||||
serialized_v2 = new_content.cid.serialize_v2()
|
serialized_v2 = new_content.cid.serialize_v2()
|
||||||
|
|
@ -179,7 +179,7 @@ async def s_api_v1_5_storage_post(request):
|
||||||
|
|
||||||
# GET /api/v1.5/storage/<file_hash>
|
# GET /api/v1.5/storage/<file_hash>
|
||||||
async def s_api_v1_5_storage_get(request, file_hash):
|
async def s_api_v1_5_storage_get(request, file_hash):
|
||||||
make_log("uploader_v1.5", f"Received file retrieval request for hash: {file_hash}", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Retrieve file hash={file_hash}", level="INFO")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
file_hash = b58encode(resolve_content(file_hash)[0].content_hash).decode()
|
file_hash = b58encode(resolve_content(file_hash)[0].content_hash).decode()
|
||||||
|
|
@ -188,7 +188,7 @@ async def s_api_v1_5_storage_get(request, file_hash):
|
||||||
|
|
||||||
final_path = os.path.join(UPLOADS_DIR, f"{file_hash}")
|
final_path = os.path.join(UPLOADS_DIR, f"{file_hash}")
|
||||||
if not os.path.exists(final_path):
|
if not os.path.exists(final_path):
|
||||||
make_log("uploader_v1.5", f"File not found: {final_path}", level="ERROR")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} File not found: {final_path}", level="ERROR")
|
||||||
return response.json({"error": "File not found"}, status=404)
|
return response.json({"error": "File not found"}, status=404)
|
||||||
|
|
||||||
db_session = request.ctx.db_session
|
db_session = request.ctx.db_session
|
||||||
|
|
@ -206,7 +206,7 @@ async def s_api_v1_5_storage_get(request, file_hash):
|
||||||
range_header = request.headers.get("Range")
|
range_header = request.headers.get("Range")
|
||||||
|
|
||||||
if range_header:
|
if range_header:
|
||||||
make_log("uploader_v1.5", f"Processing Range header: {range_header}", level="INFO")
|
make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Processing Range: {range_header}", level="DEBUG")
|
||||||
range_spec = range_header.strip().lower()
|
range_spec = range_header.strip().lower()
|
||||||
if not range_spec.startswith("bytes="):
|
if not range_spec.startswith("bytes="):
|
||||||
make_log("uploader_v1.5", f"Invalid Range header: {range_header}", level="ERROR")
|
make_log("uploader_v1.5", f"Invalid Range header: {range_header}", level="ERROR")
|
||||||
|
|
|
||||||
|
|
@ -79,13 +79,13 @@ async def license_index_loop(memory, platform_found: bool, seqno: int) -> [bool,
|
||||||
for user in users:
|
for user in users:
|
||||||
user_wallet_address = await user.wallet_address_async(session)
|
user_wallet_address = await user.wallet_address_async(session)
|
||||||
if not user_wallet_address:
|
if not user_wallet_address:
|
||||||
make_log("LicenseIndex", f"User {user.id} has no wallet address", level="info")
|
make_log("LicenseIndex", f"User {user.id} has no wallet address", level="debug")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
make_log("LicenseIndex", f"User {user.id} has wallet address {user_wallet_address}", level="info")
|
make_log("LicenseIndex", f"User {user.id} has wallet address {user_wallet_address}", level="debug")
|
||||||
last_updated_licenses = user.meta.get('last_updated_licenses')
|
last_updated_licenses = user.meta.get('last_updated_licenses')
|
||||||
must_skip = last_updated_licenses and (datetime.now() - datetime.fromisoformat(last_updated_licenses)) < timedelta(minutes=1)
|
must_skip = last_updated_licenses and (datetime.now() - datetime.fromisoformat(last_updated_licenses)) < timedelta(minutes=1)
|
||||||
make_log("LicenseIndex", f"User: {user.id}, last_updated_licenses: {last_updated_licenses}, must_skip: {must_skip}", level="info")
|
make_log("LicenseIndex", f"User: {user.id}, last_updated_licenses: {last_updated_licenses}, must_skip: {must_skip}", level="debug")
|
||||||
if must_skip:
|
if must_skip:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue