diff --git a/app/api/__init__.py b/app/api/__init__.py index 71582dc..0834226 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -1,6 +1,8 @@ import traceback from sanic import Sanic, response +from uuid import uuid4 +import traceback as _traceback from app.core.logger import make_log @@ -63,18 +65,61 @@ app.add_route(s_api_v1_5_content_list, "/api/v1.5/content.list", methods=["GET", @app.exception(BaseException) async def s_handle_exception(request, exception): - response_buffer = response.json({"error": "An internal server error occurred"}, status=500) + # Correlate error to request + session_id = getattr(request.ctx, 'session_id', None) or uuid4().hex[:16] + error_id = uuid4().hex[:8] + + status = 500 + code = type(exception).__name__ + message = "Internal HTTP Error" + try: raise exception except AssertionError as e: - response_buffer = response.json({"error": str(e)}, status=400) + status = 400 + code = 'AssertionError' + message = str(e) or 'Bad Request' except BaseException as e: - make_log("sanic_exception", f"Exception: {e}" + '\n' + str(traceback.format_exc()), level='error') + # keep default 500, but expose exception message to aid debugging + message = str(e) or message + # Build structured log with full context and traceback + try: + tb = _traceback.format_exc() + user_id = getattr(getattr(request.ctx, 'user', None), 'id', None) + log_ctx = { + 'sid': session_id, + 'eid': error_id, + 'path': request.path, + 'method': request.method, + 'query': dict(request.args) if hasattr(request, 'args') else {}, + 'user_id': user_id, + 'remote': (request.headers.get('X-Forwarded-For') or request.remote_addr or request.ip), + 'code': code, + 'message': message, + 'traceback': tb, + } + make_log('http_exception', 'API exception', level='error', **log_ctx) + except BaseException: + pass + + # Return enriched error response for the client + payload = { + 'error': True, + 'code': code, + 'message': message, + 'session_id': session_id, + 'error_id': error_id, + 'path': request.path, + 'method': request.method, + } + + response_buffer = response.json(payload, status=status) response_buffer = await close_db_session(request, response_buffer) response_buffer.headers["Access-Control-Allow-Origin"] = "*" response_buffer.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" - response_buffer.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site" + response_buffer.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-request-id"] response_buffer.headers["Access-Control-Allow-Credentials"] = "true" + response_buffer.headers["X-Session-Id"] = session_id + response_buffer.headers["X-Error-Id"] = error_id return response_buffer - diff --git a/app/api/middleware.py b/app/api/middleware.py index 7cdd950..9c3a874 100644 --- a/app/api/middleware.py +++ b/app/api/middleware.py @@ -1,5 +1,6 @@ from base58 import b58decode from sanic import response as sanic_response +from uuid import uuid4 from app.core._crypto.signer import Signer from app.core._secrets import hot_seed @@ -13,11 +14,17 @@ from app.core.storage import new_session from datetime import datetime, timedelta -def attach_headers(response): +def attach_headers(response, request=None): response.headers["Access-Control-Allow-Origin"] = "*" response.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" - response.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-file-name, x-last-chunk, x-chunk-start, x-upload-id" + response.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-file-name, x-last-chunk, x-chunk-start, x-upload-id, x-request-id" # response.headers["Access-Control-Allow-Credentials"] = "true" + try: + sid = getattr(request.ctx, 'session_id', None) if request else None + if sid: + response.headers["X-Session-Id"] = sid + except BaseException: + pass return response @@ -121,7 +128,14 @@ async def save_activity(request): pass try: - activity_meta["headers"] = dict(request.headers) + # Sanitize sensitive headers + headers = dict(request.headers) + for hk in list(headers.keys()): + if str(hk).lower() in [ + 'authorization', 'cookie', 'x-service-signature', 'x-message-hash' + ]: + headers[hk] = '' + activity_meta["headers"] = headers except: pass @@ -130,7 +144,7 @@ async def save_activity(request): meta=activity_meta, user_id=request.ctx.user.id if request.ctx.user else None, user_ip=activity_meta.get("ip", "0.0.0.0"), - created=datetime.now() + created=datetime.utcnow() ) request.ctx.db_session.add(new_user_activity) await request.ctx.db_session.commit() @@ -138,7 +152,7 @@ async def save_activity(request): async def attach_user_to_request(request): if request.method == 'OPTIONS': - return attach_headers(sanic_response.text("OK")) + return attach_headers(sanic_response.text("OK"), request) request.ctx.db_session = new_session() request.ctx.verified_hash = None @@ -146,6 +160,17 @@ async def attach_user_to_request(request): request.ctx.user_key = None request.ctx.user_uploader_wrapper = Wrapped_CBotChat(request.app.ctx.memory._telegram_bot, db_session=request.ctx.db_session) request.ctx.user_client_wrapper = Wrapped_CBotChat(request.app.ctx.memory._client_telegram_bot, db_session=request.ctx.db_session) + # Correlation/session id for this request: prefer proxy-provided X-Request-ID + incoming_req_id = request.headers.get('X-Request-Id') or request.headers.get('X-Request-ID') + request.ctx.session_id = (incoming_req_id or uuid4().hex)[:32] + try: + make_log( + "HTTP", + f"Request start sid={request.ctx.session_id} {request.method} {request.path}", + level='info' + ) + except BaseException: + pass await try_authorization(request) await save_activity(request) await try_service_authorization(request) @@ -160,12 +185,21 @@ async def close_request_handler(request, response): except BaseException: pass - response = attach_headers(response) + try: + make_log( + "HTTP", + f"Request end sid={getattr(request.ctx, 'session_id', None)} {request.method} {request.path} status={getattr(response, 'status', None)}", + level='info' + ) + except BaseException: + pass + + response = attach_headers(response, request) return request, response async def close_db_session(request, response): request, response = await close_request_handler(request, response) - response = attach_headers(response) + response = attach_headers(response, request) return response diff --git a/app/api/routes/node_storage.py b/app/api/routes/node_storage.py index 4ef2877..f2e3ab2 100644 --- a/app/api/routes/node_storage.py +++ b/app/api/routes/node_storage.py @@ -100,7 +100,7 @@ async def s_api_v1_storage_post(request): "content_url": f"dmy://storage?cid={new_cid}", }) except BaseException as e: - make_log("Storage", f"Error: {e}" + '\n' + traceback.format_exc(), level="error") + make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Error: {e}" + '\n' + traceback.format_exc(), level="error") return response.json({"error": f"Error: {e}"}, status=500) @@ -121,10 +121,10 @@ async def s_api_v1_storage_get(request, file_hash=None): if not content: return response.json({"error": "File not found"}, status=404) - make_log("Storage", f"File {content_sha256} requested by {request.ctx.user}") + make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} File {content_sha256} requested by user={getattr(getattr(request.ctx, 'user', None), 'id', None)}") file_path = os.path.join(UPLOADS_DIR, content_sha256) if not os.path.exists(file_path): - make_log("Storage", f"File {content_sha256} not found locally", level="error") + make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} File {content_sha256} not found locally", level="error") return response.json({"error": "File not found"}, status=404) async with aiofiles.open(file_path, "rb") as file: @@ -187,25 +187,25 @@ async def s_api_v1_storage_get(request, file_hash=None): try: audio = AudioSegment.from_file(file_path) except BaseException as e: - make_log("Storage", f"Error loading audio from file: {e}", level="debug") + make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Error loading audio from file: {e}", level="debug") if not audio: try: audio = AudioSegment(content_file_bin) except BaseException as e: - make_log("Storage", f"Error loading audio from binary: {e}", level="debug") + make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Error loading audio from binary: {e}", level="debug") audio = audio[:seconds_limit * 1000] if seconds_limit else audio audio.export(tempfile_path, format="mp3", cover=cover_tempfile_path) except BaseException as e: - make_log("Storage", f"Error converting audio: {e}" + '\n' + traceback.format_exc(), level="error") + make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Error converting audio: {e}" + '\n' + traceback.format_exc(), level="error") if os.path.exists(tempfile_path): async with aiofiles.open(tempfile_path, "rb") as file: content_file_bin = await file.read() accept_type = 'audio/mpeg' - make_log("Storage", f"Audio {content_sha256} converted successfully") + make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Audio {content_sha256} converted successfully", level='debug') else: tempfile_path = tempfile_path[:-5] @@ -222,13 +222,13 @@ async def s_api_v1_storage_get(request, file_hash=None): break quality -= 5 except BaseException as e: - make_log("Storage", f"Error converting image: {e}" + '\n' + traceback.format_exc(), level="error") + make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Error converting image: {e}" + '\n' + traceback.format_exc(), level="error") if os.path.exists(tempfile_path): async with aiofiles.open(tempfile_path, "rb") as file: content_file_bin = await file.read() - make_log("Storage", f"Image {content_sha256} converted successfully") + make_log("Storage", f"sid={getattr(request.ctx, 'session_id', None)} Image {content_sha256} converted successfully", level='debug') accept_type = 'image/jpeg' else: tempfile_path = tempfile_path[:-5] diff --git a/app/api/routes/progressive_storage.py b/app/api/routes/progressive_storage.py index 6650f8b..5ddc0d9 100644 --- a/app/api/routes/progressive_storage.py +++ b/app/api/routes/progressive_storage.py @@ -20,28 +20,28 @@ from app.core._utils.resolve_content import resolve_content # POST /api/v1.5/storage async def s_api_v1_5_storage_post(request): # Log the receipt of a chunk upload request - make_log("uploader_v1.5", "Received chunk upload request", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Received chunk upload request", level="INFO") # Get the provided file name from header and decode it from base64 provided_filename_b64 = request.headers.get("X-File-Name") if not provided_filename_b64: - make_log("uploader_v1.5", "Missing X-File-Name header", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Missing X-File-Name header", level="ERROR") return response.json({"error": "Missing X-File-Name header"}, status=400) try: provided_filename = b64decode(provided_filename_b64).decode("utf-8") except Exception as e: - make_log("uploader_v1.5", f"Invalid X-File-Name header: {e}", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Invalid X-File-Name header: {e}", level="ERROR") return response.json({"error": "Invalid X-File-Name header"}, status=400) # Get X-Chunk-Start header (must be provided) and parse it as integer chunk_start_header = request.headers.get("X-Chunk-Start") if chunk_start_header is None: - make_log("uploader_v1.5", "Missing X-Chunk-Start header", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Missing X-Chunk-Start header", level="ERROR") return response.json({"error": "Missing X-Chunk-Start header"}, status=400) try: chunk_start = int(chunk_start_header) except Exception as e: - make_log("uploader_v1.5", f"Invalid X-Chunk-Start header: {e}", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Invalid X-Chunk-Start header: {e}", level="ERROR") return response.json({"error": "Invalid X-Chunk-Start header"}, status=400) # Enforce maximum chunk size (80 MB) using Content-Length header if provided @@ -51,7 +51,7 @@ async def s_api_v1_5_storage_post(request): try: content_length = int(content_length) if content_length > max_chunk_size: - make_log("uploader_v1.5", f"Chunk size {content_length} exceeds maximum allowed", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Chunk size {content_length} exceeds maximum allowed", level="ERROR") return response.json({"error": "Chunk size exceeds maximum allowed (80 MB)"}, status=400) except: pass @@ -63,9 +63,9 @@ async def s_api_v1_5_storage_post(request): # New upload session: generate a new uuid upload_id = str(uuid4()) is_new_upload = True - make_log("uploader_v1.5", f"Starting new upload session with ID: {upload_id}", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Start new upload session id={upload_id}", level="INFO") else: - make_log("uploader_v1.5", f"Resuming upload session with ID: {upload_id}", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Resume upload session id={upload_id}", level="DEBUG") # Determine the temporary file path based on upload_id temp_path = os.path.join(UPLOADS_DIR, f"v1.5_upload_{upload_id}") @@ -77,10 +77,10 @@ async def s_api_v1_5_storage_post(request): # If the provided chunk_start is less than current_size, the chunk is already received if chunk_start < current_size: - make_log("uploader_v1.5", f"Chunk starting at {chunk_start} already received, current size: {current_size}", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Chunk at {chunk_start} already received; size={current_size}", level="DEBUG") return response.json({"upload_id": upload_id, "current_size": current_size}) elif chunk_start > current_size: - make_log("uploader_v1.5", f"Chunk start {chunk_start} does not match current file size {current_size}", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Chunk start {chunk_start} != current size {current_size}", level="ERROR") return response.json({"error": "Chunk start does not match current file size"}, status=400) # Append the received chunk to the temporary file @@ -94,9 +94,9 @@ async def s_api_v1_5_storage_post(request): async for chunk in request.stream: await out_file.write(chunk) new_size = os.path.getsize(temp_path) - make_log("uploader_v1.5", f"Appended chunk. New file size: {new_size}", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Appended chunk. size={new_size}", level="DEBUG") except Exception as e: - make_log("uploader_v1.5", f"Error saving chunk: {e}", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Error saving chunk: {e}", level="ERROR") return response.json({"error": "Failed to save chunk"}, status=500) # If computed hash matches the provided one, the final chunk has been received @@ -112,28 +112,28 @@ async def s_api_v1_5_storage_post(request): stdout, stderr = await proc.communicate() if proc.returncode != 0: error_msg = stderr.decode().strip() - make_log("uploader_v1.5", f"sha256sum error: {error_msg}", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} sha256sum error: {error_msg}", level="ERROR") return response.json({"error": "Failed to compute file hash"}, status=500) computed_hash_hex = stdout.decode().split()[0].strip() computed_hash_bytes = bytes.fromhex(computed_hash_hex) computed_hash_b58 = b58encode(computed_hash_bytes).decode() - make_log("uploader_v1.5", f"Computed hash (base58): {computed_hash_b58}", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Computed hash (base58): {computed_hash_b58}", level="INFO") except Exception as e: - make_log("uploader_v1.5", f"Error computing file hash: {e}", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Error computing file hash: {e}", level="ERROR") return response.json({"error": "Error computing file hash"}, status=500) final_path = os.path.join(UPLOADS_DIR, f"{computed_hash_b58}") try: os.rename(temp_path, final_path) - make_log("uploader_v1.5", f"Final chunk received. File renamed to: {final_path}", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Final chunk received. Renamed to: {final_path}", level="INFO") except Exception as e: - make_log("uploader_v1.5", f"Error renaming file: {e}", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Error renaming file: {e}", level="ERROR") return response.json({"error": "Failed to finalize file storage"}, status=500) db_session = request.ctx.db_session existing = (await db_session.execute(select(StoredContent).where(StoredContent.hash == computed_hash_b58))).scalars().first() if existing: - make_log("uploader_v1.5", f"File with hash {computed_hash_b58} already exists in DB", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} File already exists in DB: {computed_hash_b58}", level="INFO") serialized_v2 = existing.cid.serialize_v2() serialized_v1 = existing.cid.serialize_v1() return response.json({ @@ -158,9 +158,9 @@ async def s_api_v1_5_storage_post(request): ) db_session.add(new_content) await db_session.commit() - make_log("uploader_v1.5", f"New file stored and indexed for user {user_id} with hash {computed_hash_b58}", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Stored new file user={user_id} hash={computed_hash_b58}", level="INFO") except Exception as e: - make_log("uploader_v1.5", f"Database error: {e}", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Database error: {e}", level="ERROR") return response.json({"error": "Database error"}, status=500) serialized_v2 = new_content.cid.serialize_v2() @@ -179,7 +179,7 @@ async def s_api_v1_5_storage_post(request): # GET /api/v1.5/storage/ async def s_api_v1_5_storage_get(request, file_hash): - make_log("uploader_v1.5", f"Received file retrieval request for hash: {file_hash}", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Retrieve file hash={file_hash}", level="INFO") try: file_hash = b58encode(resolve_content(file_hash)[0].content_hash).decode() @@ -188,7 +188,7 @@ async def s_api_v1_5_storage_get(request, file_hash): final_path = os.path.join(UPLOADS_DIR, f"{file_hash}") if not os.path.exists(final_path): - make_log("uploader_v1.5", f"File not found: {final_path}", level="ERROR") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} File not found: {final_path}", level="ERROR") return response.json({"error": "File not found"}, status=404) db_session = request.ctx.db_session @@ -206,7 +206,7 @@ async def s_api_v1_5_storage_get(request, file_hash): range_header = request.headers.get("Range") if range_header: - make_log("uploader_v1.5", f"Processing Range header: {range_header}", level="INFO") + make_log("uploader_v1.5", f"sid={getattr(request.ctx, 'session_id', None)} Processing Range: {range_header}", level="DEBUG") range_spec = range_header.strip().lower() if not range_spec.startswith("bytes="): make_log("uploader_v1.5", f"Invalid Range header: {range_header}", level="ERROR") diff --git a/app/core/background/license_service.py b/app/core/background/license_service.py index d2ed681..057f859 100644 --- a/app/core/background/license_service.py +++ b/app/core/background/license_service.py @@ -79,13 +79,13 @@ async def license_index_loop(memory, platform_found: bool, seqno: int) -> [bool, for user in users: user_wallet_address = await user.wallet_address_async(session) if not user_wallet_address: - make_log("LicenseIndex", f"User {user.id} has no wallet address", level="info") + make_log("LicenseIndex", f"User {user.id} has no wallet address", level="debug") continue - make_log("LicenseIndex", f"User {user.id} has wallet address {user_wallet_address}", level="info") + make_log("LicenseIndex", f"User {user.id} has wallet address {user_wallet_address}", level="debug") last_updated_licenses = user.meta.get('last_updated_licenses') must_skip = last_updated_licenses and (datetime.now() - datetime.fromisoformat(last_updated_licenses)) < timedelta(minutes=1) - make_log("LicenseIndex", f"User: {user.id}, last_updated_licenses: {last_updated_licenses}, must_skip: {must_skip}", level="info") + make_log("LicenseIndex", f"User: {user.id}, last_updated_licenses: {last_updated_licenses}, must_skip: {must_skip}", level="debug") if must_skip: continue