uploader-bot/app/api/routes/_system.py

67 lines
2.4 KiB
Python

from sanic import response
from base58 import b58encode, b58decode
from app.core._secrets import hot_pubkey, service_wallet, hot_seed
from app.core._blockchain.ton.platform import platform
from datetime import datetime, timedelta
from app.core.logger import make_log
from app.core._crypto.signer import Signer
import subprocess
import json
def get_git_info():
branch_name = subprocess.check_output(["git", "branch", "--show-current"]).decode('utf-8').strip()
commit_hash = subprocess.check_output(["git", "rev-parse", "HEAD"]).decode('utf-8').strip()
return branch_name, commit_hash
async def s_api_v1_node(request): # /api/v1/node
return response.json({
'id': b58encode(hot_pubkey).decode(),
'node_address': service_wallet.address.to_string(1, 1, 1),
'master_address': platform.address.to_string(1, 1, 1),
'indexer_height': 0,
'services': {
service_key: {
'status': (service['status'] if (datetime.now() - service['timestamp']).total_seconds() < 30 else 'not working: timeout'),
'delay': round((datetime.now() - service['timestamp']).total_seconds(), 3) if service['timestamp'] else -1,
}
for service_key, service in request.app.ctx.memory.known_states.items()
}
})
async def s_api_system_send_status(request):
if not request.json:
return response.json({'error': 'No data'}, status=400)
message = request.json.get('message', '')
signature = request.json.get('signature', '')
if not message or not signature:
return response.json({'error': 'No message or signature'}, status=400)
message = b58decode(message)
signer = Signer(hot_seed)
if not signer.verify(message, signature):
return response.json({'error': 'Invalid signature'}, status=400)
message = json.loads(message)
assert message.get('service') in request.app.ctx.memory.known_states, "Unknown service"
request.app.ctx.memory.known_states[
message['service']
] = {
'status': message['status'],
'timestamp': datetime.now(),
}
make_log("Health", f"Service {message['service']} status: {message['status']}", level='info')
return response.json({'message': 'Status received'})
async def s_api_system_version(request):
branch_name, commit_hash = get_git_info()
return response.json({
"codebase_hash": commit_hash,
"codebase_branch": branch_name,
})