diff --git a/app/api/__init__.py b/app/api/__init__.py index 0834226..64750c5 100644 --- a/app/api/__init__.py +++ b/app/api/__init__.py @@ -118,7 +118,7 @@ async def s_handle_exception(request, exception): response_buffer = await close_db_session(request, response_buffer) response_buffer.headers["Access-Control-Allow-Origin"] = "*" response_buffer.headers["Access-Control-Allow-Methods"] = "GET, POST, OPTIONS" - response_buffer.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-request-id"] + response_buffer.headers["Access-Control-Allow-Headers"] = "Origin, Content-Type, Accept, Authorization, Referer, User-Agent, Sec-Fetch-Dest, Sec-Fetch-Mode, Sec-Fetch-Site, x-request-id" response_buffer.headers["Access-Control-Allow-Credentials"] = "true" response_buffer.headers["X-Session-Id"] = session_id response_buffer.headers["X-Error-Id"] = error_id diff --git a/app/api/middleware.py b/app/api/middleware.py index 9c3a874..34cbbae 100644 --- a/app/api/middleware.py +++ b/app/api/middleware.py @@ -12,6 +12,9 @@ from app.core.models.user import User from sqlalchemy import select from app.core.storage import new_session from datetime import datetime, timedelta +from app.core.log_context import ( + ctx_session_id, ctx_user_id, ctx_method, ctx_path, ctx_remote +) def attach_headers(response, request=None): @@ -163,6 +166,17 @@ async def attach_user_to_request(request): # Correlation/session id for this request: prefer proxy-provided X-Request-ID incoming_req_id = request.headers.get('X-Request-Id') or request.headers.get('X-Request-ID') request.ctx.session_id = (incoming_req_id or uuid4().hex)[:32] + # Populate contextvars for automatic logging context + try: + ctx_session_id.set(request.ctx.session_id) + ctx_method.set(request.method) + ctx_path.set(request.path) + _remote = (request.headers.get('X-Forwarded-For') or request.remote_addr or request.ip) + if _remote and isinstance(_remote, str) and ',' in _remote: + _remote = _remote.split(',')[0].strip() + ctx_remote.set(_remote) + except BaseException: + pass try: make_log( "HTTP", @@ -172,6 +186,12 @@ async def attach_user_to_request(request): except BaseException: pass await try_authorization(request) + # Update user_id in context after auth + try: + if request.ctx.user and request.ctx.user.id: + ctx_user_id.set(request.ctx.user.id) + except BaseException: + pass await save_activity(request) await try_service_authorization(request) @@ -202,4 +222,13 @@ async def close_request_handler(request, response): async def close_db_session(request, response): request, response = await close_request_handler(request, response) response = attach_headers(response, request) + # Clear contextvars + try: + ctx_session_id.set(None) + ctx_user_id.set(None) + ctx_method.set(None) + ctx_path.set(None) + ctx_remote.set(None) + except BaseException: + pass return response diff --git a/app/core/background/convert_service.py b/app/core/background/convert_service.py index 692bb12..0a3c783 100644 --- a/app/core/background/convert_service.py +++ b/app/core/background/convert_service.py @@ -255,11 +255,23 @@ async def main_fn(memory): seqno = 0 while True: try: - make_log("ConvertProcess", "Service running", level="debug") + rid = __import__('uuid').uuid4().hex[:8] + try: + from app.core.log_context import ctx_rid + ctx_rid.set(rid) + except BaseException: + pass + make_log("ConvertProcess", "Service running", level="debug", rid=rid) await convert_loop(memory) await asyncio.sleep(5) await send_status("convert_service", f"working (seqno={seqno})") seqno += 1 except BaseException as e: - make_log("ConvertProcess", f"Error: {e}", level="error") + make_log("ConvertProcess", f"Error: {e}", level="error", rid=locals().get('rid')) await asyncio.sleep(3) + finally: + try: + from app.core.log_context import ctx_rid + ctx_rid.set(None) + except BaseException: + pass diff --git a/app/core/background/indexer_service.py b/app/core/background/indexer_service.py index 1931cc1..47c075a 100644 --- a/app/core/background/indexer_service.py +++ b/app/core/background/indexer_service.py @@ -298,15 +298,27 @@ async def main_fn(memory, ): seqno = 0 while True: try: + rid = __import__('uuid').uuid4().hex[:8] + try: + from app.core.log_context import ctx_rid + ctx_rid.set(rid) + except BaseException: + pass + make_log("Indexer", f"Loop start", level="debug", rid=rid) platform_found, seqno = await indexer_loop(memory, platform_found, seqno) except BaseException as e: - make_log("Indexer", f"Error: {e}" + '\n' + traceback.format_exc(), level="error") + make_log("Indexer", f"Error: {e}" + '\n' + traceback.format_exc(), level="error", rid=locals().get('rid')) if platform_found: await send_status("indexer", f"working (seqno={seqno})") await asyncio.sleep(5) seqno += 1 + try: + from app.core.log_context import ctx_rid + ctx_rid.set(None) + except BaseException: + pass diff --git a/app/core/background/license_service.py b/app/core/background/license_service.py index 057f859..67cc9a0 100644 --- a/app/core/background/license_service.py +++ b/app/core/background/license_service.py @@ -122,14 +122,26 @@ async def main_fn(memory, ): seqno = 0 while True: try: + rid = __import__('uuid').uuid4().hex[:8] + try: + from app.core.log_context import ctx_rid + ctx_rid.set(rid) + except BaseException: + pass + make_log("LicenseIndex", f"Loop start", level="debug", rid=rid) platform_found, seqno = await license_index_loop(memory, platform_found, seqno) if platform_found: await send_status("licenses", f"working (seqno={seqno})") except BaseException as e: - make_log("LicenseIndex", f"Error: {e}" + '\n' + traceback.format_exc(), level="error") + make_log("LicenseIndex", f"Error: {e}" + '\n' + traceback.format_exc(), level="error", rid=locals().get('rid')) await asyncio.sleep(1) seqno += 1 + try: + from app.core.log_context import ctx_rid + ctx_rid.set(None) + except BaseException: + pass # if __name__ == '__main__': # loop = asyncio.get_event_loop() diff --git a/app/core/background/ton_service.py b/app/core/background/ton_service.py index 2814b84..d3d4d11 100644 --- a/app/core/background/ton_service.py +++ b/app/core/background/ton_service.py @@ -122,8 +122,14 @@ async def main_fn(memory): while True: try: + rid = __import__('uuid').uuid4().hex[:8] + try: + from app.core.log_context import ctx_rid + ctx_rid.set(rid) + except BaseException: + pass sw_seqno_value = await get_sw_seqno() - make_log("TON", f"Service running ({sw_seqno_value})", level="debug") + make_log("TON", f"Service running ({sw_seqno_value})", level="debug", rid=rid) async with db_session() as session: # Проверка отправленных сообщений @@ -166,7 +172,7 @@ async def main_fn(memory): try: await process_incoming_message(blockchain_message) except BaseException as e: - pass # make_log("TON_Daemon", f"Error while processing incoming message: {e}" + '\n' + traceback.format_exc(), level='debug') + pass # make_log("TON_Daemon", f"Error while processing incoming message: {e}" + '\n' + traceback.format_exc(), level='debug', rid=rid) try: sw_transactions = await toncenter.get_transactions(highload_wallet.address.to_string(1, 1, 1), limit=100) @@ -174,9 +180,9 @@ async def main_fn(memory): try: await process_incoming_transaction(sw_transaction) except BaseException as e: - make_log("TON_Daemon", f"Error while processing incoming transaction: {e}", level="debug") + make_log("TON_Daemon", f"Error while processing incoming transaction: {e}", level="debug", rid=rid) except BaseException as e: - make_log("TON_Daemon", f"Error while getting service wallet transactions: {e}", level="ERROR") + make_log("TON_Daemon", f"Error while getting service wallet transactions: {e}", level="ERROR", rid=rid) await send_status("ton_daemon", f"working: processing out-txs (seqno={sw_seqno_value})") # Отправка подписанных сообщений @@ -185,7 +191,7 @@ async def main_fn(memory): BlockchainTask.status == 'processing' ).order_by(BlockchainTask.updated.asc()))).scalars().all() for blockchain_task in _processing: - make_log("TON_Daemon", f"Processing task (processing) {blockchain_task.id}") + make_log("TON_Daemon", f"Processing task (processing) {blockchain_task.id}", rid=rid) query_boc = bytes.fromhex(blockchain_task.meta['signed_message']) errors_list = [] @@ -261,7 +267,7 @@ async def main_fn(memory): ) query_boc = query['message'].to_boc(False) except BaseException as e: - make_log("TON", f"Error creating transfer message: {e}", level="error") + make_log("TON", f"Error creating transfer message: {e}", level="error", rid=rid) query_boc = begin_cell().end_cell().to_boc(False) blockchain_task.meta = { @@ -270,9 +276,9 @@ async def main_fn(memory): 'signed_message': query_boc.hex(), } await session.commit() - make_log("TON", f"Created signed message for task {blockchain_task.id}" + '\n' + traceback.format_exc(), level="info") + make_log("TON", f"Created signed message for task {blockchain_task.id}" + '\n' + traceback.format_exc(), level="info", rid=rid) except BaseException as e: - make_log("TON", f"Error processing task {blockchain_task.id}: {e}" + '\n' + traceback.format_exc(), level="error") + make_log("TON", f"Error processing task {blockchain_task.id}: {e}" + '\n' + traceback.format_exc(), level="error", rid=rid) continue await asyncio.sleep(1) @@ -280,13 +286,17 @@ async def main_fn(memory): await asyncio.sleep(1) await send_status("ton_daemon", f"working (seqno={sw_seqno_value})") except BaseException as e: - make_log("TON", f"Error: {e}", level="error") + make_log("TON", f"Error: {e}", level="error", rid=locals().get('rid')) await asyncio.sleep(3) + finally: + try: + from app.core.log_context import ctx_rid + ctx_rid.set(None) + except BaseException: + pass # if __name__ == '__main__': # loop = asyncio.get_event_loop() # loop.run_until_complete(main()) # loop.close() - - diff --git a/app/core/background/uploader_service.py b/app/core/background/uploader_service.py index d1c1303..cc4fc0c 100644 --- a/app/core/background/uploader_service.py +++ b/app/core/background/uploader_service.py @@ -13,14 +13,26 @@ async def main_fn(memory): seqno = 0 while True: try: - make_log("Uploader", "Service running", level="debug") + rid = __import__('uuid').uuid4().hex[:8] + try: + from app.core.log_context import ctx_rid + ctx_rid.set(rid) + except BaseException: + pass + make_log("Uploader", f"Service running", level="debug", rid=rid) await uploader_loop() await asyncio.sleep(5) await send_status("uploader_daemon", f"working (seqno={seqno})") seqno += 1 except BaseException as e: - make_log("Uploader", f"Error: {e}", level="error") + make_log("Uploader", f"Error: {e}", level="error", rid=locals().get('rid')) await asyncio.sleep(3) + finally: + try: + from app.core.log_context import ctx_rid + ctx_rid.set(None) + except BaseException: + pass # if __name__ == '__main__': # loop = asyncio.get_event_loop() @@ -28,5 +40,3 @@ async def main_fn(memory): # loop.close() - - diff --git a/app/core/log_context.py b/app/core/log_context.py new file mode 100644 index 0000000..c14d5f4 --- /dev/null +++ b/app/core/log_context.py @@ -0,0 +1,12 @@ +from contextvars import ContextVar + +# Correlation for HTTP requests +ctx_session_id: ContextVar[str | None] = ContextVar('ctx_session_id', default=None) +ctx_user_id: ContextVar[int | None] = ContextVar('ctx_user_id', default=None) +ctx_method: ContextVar[str | None] = ContextVar('ctx_method', default=None) +ctx_path: ContextVar[str | None] = ContextVar('ctx_path', default=None) +ctx_remote: ContextVar[str | None] = ContextVar('ctx_remote', default=None) + +# Correlation for background loop iterations +ctx_rid: ContextVar[str | None] = ContextVar('ctx_rid', default=None) + diff --git a/app/core/logger.py b/app/core/logger.py index 6c39b2b..e87f110 100644 --- a/app/core/logger.py +++ b/app/core/logger.py @@ -1,5 +1,6 @@ import os from app.core.projscale_logger import logger +from app.core.log_context import ctx_session_id, ctx_user_id, ctx_method, ctx_path, ctx_remote, ctx_rid import logging LOG_LEVELS = { @@ -17,6 +18,23 @@ def make_log(issuer, message, *args, level='INFO', **kwargs): assert level.upper() in LOG_LEVELS.keys(), f"Unknown log level" _log = getattr(logger, level.lower()) + # Merge context variables if not explicitly provided + context_fields = { + 'sid': kwargs.get('sid') or kwargs.get('session_id') or ctx_session_id.get(), + 'user_id': kwargs.get('user_id') or ctx_user_id.get(), + 'method': kwargs.get('method') or ctx_method.get(), + 'path': kwargs.get('path') or ctx_path.get(), + 'remote': kwargs.get('remote') or ctx_remote.get(), + 'rid': kwargs.get('rid') or ctx_rid.get(), + } + # Only include non-empty context + for k, v in list(context_fields.items()): + if v is None: + context_fields.pop(k) + # Do not override provided kwargs; merge missing only + for k, v in context_fields.items(): + kwargs.setdefault(k, v) + log_buffer = f"[{issuer if not (issuer is None) else 'System'}] {message}" if args: log_buffer += f" | {args}"