501 lines
20 KiB
Python
501 lines
20 KiB
Python
"""Blockchain indexer service for monitoring transactions and events."""
|
|
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Optional, Set, Any
|
|
|
|
import redis.asyncio as redis
|
|
from sqlalchemy import select, update
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.database import db_manager
|
|
from app.core.models.blockchain import Transaction, Wallet, BlockchainNFT, BlockchainTokenBalance
|
|
from app.core.background.ton_service import TONService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class IndexerService:
|
|
"""Service for indexing blockchain transactions and events."""
|
|
|
|
def __init__(self):
|
|
self.settings = get_settings()
|
|
self.ton_service = TONService()
|
|
self.redis_client: Optional[redis.Redis] = None
|
|
self.is_running = False
|
|
self.tasks: Set[asyncio.Task] = set()
|
|
|
|
# Indexing configuration
|
|
self.batch_size = 100
|
|
self.index_interval = 30 # seconds
|
|
self.confirmation_blocks = 12
|
|
self.max_retries = 3
|
|
|
|
async def start(self) -> None:
|
|
"""Start the indexer service."""
|
|
try:
|
|
logger.info("Starting blockchain indexer service")
|
|
|
|
# Initialize Redis connection
|
|
self.redis_client = redis.from_url(
|
|
self.settings.redis_url,
|
|
encoding="utf-8",
|
|
decode_responses=True,
|
|
socket_keepalive=True,
|
|
socket_keepalive_options={},
|
|
health_check_interval=30,
|
|
)
|
|
|
|
# Test Redis connection
|
|
await self.redis_client.ping()
|
|
logger.info("Redis connection established for indexer")
|
|
|
|
# Start indexing tasks
|
|
self.is_running = True
|
|
|
|
# Create indexing tasks
|
|
tasks = [
|
|
asyncio.create_task(self._index_transactions_loop()),
|
|
asyncio.create_task(self._index_wallets_loop()),
|
|
asyncio.create_task(self._index_nfts_loop()),
|
|
asyncio.create_task(self._index_token_balances_loop()),
|
|
asyncio.create_task(self._cleanup_cache_loop()),
|
|
]
|
|
|
|
self.tasks.update(tasks)
|
|
|
|
# Wait for all tasks
|
|
await asyncio.gather(*tasks, return_exceptions=True)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error starting indexer service: {e}")
|
|
await self.stop()
|
|
raise
|
|
|
|
async def stop(self) -> None:
|
|
"""Stop the indexer service."""
|
|
logger.info("Stopping blockchain indexer service")
|
|
self.is_running = False
|
|
|
|
# Cancel all tasks
|
|
for task in self.tasks:
|
|
if not task.done():
|
|
task.cancel()
|
|
|
|
# Wait for tasks to complete
|
|
if self.tasks:
|
|
await asyncio.gather(*self.tasks, return_exceptions=True)
|
|
|
|
# Close Redis connection
|
|
if self.redis_client:
|
|
await self.redis_client.close()
|
|
|
|
logger.info("Indexer service stopped")
|
|
|
|
async def _index_transactions_loop(self) -> None:
|
|
"""Main loop for indexing transactions."""
|
|
logger.info("Starting transaction indexing loop")
|
|
|
|
while self.is_running:
|
|
try:
|
|
await self._index_pending_transactions()
|
|
await self._update_transaction_confirmations()
|
|
await asyncio.sleep(self.index_interval)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in transaction indexing loop: {e}")
|
|
await asyncio.sleep(self.index_interval)
|
|
|
|
async def _index_wallets_loop(self) -> None:
|
|
"""Loop for updating wallet information."""
|
|
logger.info("Starting wallet indexing loop")
|
|
|
|
while self.is_running:
|
|
try:
|
|
await self._update_wallet_balances()
|
|
await asyncio.sleep(self.index_interval * 2) # Less frequent
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in wallet indexing loop: {e}")
|
|
await asyncio.sleep(self.index_interval * 2)
|
|
|
|
async def _index_nfts_loop(self) -> None:
|
|
"""Loop for indexing NFT collections and transfers."""
|
|
logger.info("Starting NFT indexing loop")
|
|
|
|
while self.is_running:
|
|
try:
|
|
await self._index_nft_collections()
|
|
await self._index_nft_transfers()
|
|
await asyncio.sleep(self.index_interval * 4) # Even less frequent
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in NFT indexing loop: {e}")
|
|
await asyncio.sleep(self.index_interval * 4)
|
|
|
|
async def _index_token_balances_loop(self) -> None:
|
|
"""Loop for updating token balances."""
|
|
logger.info("Starting token balance indexing loop")
|
|
|
|
while self.is_running:
|
|
try:
|
|
await self._update_token_balances()
|
|
await asyncio.sleep(self.index_interval * 3)
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in token balance indexing loop: {e}")
|
|
await asyncio.sleep(self.index_interval * 3)
|
|
|
|
async def _cleanup_cache_loop(self) -> None:
|
|
"""Loop for cleaning up old cache entries."""
|
|
logger.info("Starting cache cleanup loop")
|
|
|
|
while self.is_running:
|
|
try:
|
|
await self._cleanup_old_cache_entries()
|
|
await asyncio.sleep(3600) # Run every hour
|
|
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in cache cleanup loop: {e}")
|
|
await asyncio.sleep(3600)
|
|
|
|
async def _index_pending_transactions(self) -> None:
|
|
"""Index pending transactions from the database."""
|
|
async with db_manager.get_session() as session:
|
|
try:
|
|
# Get pending transactions
|
|
result = await session.execute(
|
|
select(Transaction)
|
|
.where(Transaction.status == "pending")
|
|
.limit(self.batch_size)
|
|
)
|
|
transactions = result.scalars().all()
|
|
|
|
if not transactions:
|
|
return
|
|
|
|
logger.info(f"Indexing {len(transactions)} pending transactions")
|
|
|
|
# Process each transaction
|
|
for transaction in transactions:
|
|
await self._process_transaction(session, transaction)
|
|
|
|
await session.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error indexing pending transactions: {e}")
|
|
await session.rollback()
|
|
|
|
async def _process_transaction(self, session: AsyncSession, transaction: Transaction) -> None:
|
|
"""Process a single transaction."""
|
|
try:
|
|
# Check transaction status on blockchain
|
|
if transaction.tx_hash:
|
|
tx_info = await self.ton_service.get_transaction_info(transaction.tx_hash)
|
|
|
|
if tx_info:
|
|
# Update transaction with blockchain data
|
|
transaction.status = tx_info.get("status", "pending")
|
|
transaction.block_number = tx_info.get("block_number")
|
|
transaction.gas_used = tx_info.get("gas_used")
|
|
transaction.gas_price = tx_info.get("gas_price")
|
|
transaction.confirmations = tx_info.get("confirmations", 0)
|
|
transaction.updated_at = datetime.utcnow()
|
|
|
|
# Cache transaction info
|
|
cache_key = f"tx:{transaction.tx_hash}"
|
|
await self.redis_client.setex(
|
|
cache_key,
|
|
3600, # 1 hour
|
|
json.dumps(tx_info)
|
|
)
|
|
|
|
logger.debug(f"Updated transaction {transaction.tx_hash}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing transaction {transaction.id}: {e}")
|
|
|
|
async def _update_transaction_confirmations(self) -> None:
|
|
"""Update confirmation counts for recent transactions."""
|
|
async with db_manager.get_session() as session:
|
|
try:
|
|
# Get recent confirmed transactions
|
|
cutoff_time = datetime.utcnow() - timedelta(hours=24)
|
|
result = await session.execute(
|
|
select(Transaction)
|
|
.where(
|
|
Transaction.status == "confirmed",
|
|
Transaction.confirmations < self.confirmation_blocks,
|
|
Transaction.updated_at > cutoff_time
|
|
)
|
|
.limit(self.batch_size)
|
|
)
|
|
transactions = result.scalars().all()
|
|
|
|
for transaction in transactions:
|
|
if transaction.tx_hash:
|
|
try:
|
|
confirmations = await self.ton_service.get_transaction_confirmations(
|
|
transaction.tx_hash
|
|
)
|
|
|
|
if confirmations != transaction.confirmations:
|
|
transaction.confirmations = confirmations
|
|
transaction.updated_at = datetime.utcnow()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating confirmations for {transaction.tx_hash}: {e}")
|
|
|
|
await session.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating transaction confirmations: {e}")
|
|
await session.rollback()
|
|
|
|
async def _update_wallet_balances(self) -> None:
|
|
"""Update wallet balances from the blockchain."""
|
|
async with db_manager.get_session() as session:
|
|
try:
|
|
# Get active wallets
|
|
result = await session.execute(
|
|
select(Wallet)
|
|
.where(Wallet.is_active == True)
|
|
.limit(self.batch_size)
|
|
)
|
|
wallets = result.scalars().all()
|
|
|
|
for wallet in wallets:
|
|
try:
|
|
# Get current balance
|
|
balance = await self.ton_service.get_wallet_balance(wallet.address)
|
|
|
|
if balance != wallet.balance:
|
|
wallet.balance = balance
|
|
wallet.updated_at = datetime.utcnow()
|
|
|
|
# Cache balance
|
|
cache_key = f"balance:{wallet.address}"
|
|
await self.redis_client.setex(cache_key, 300, str(balance)) # 5 minutes
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating balance for wallet {wallet.address}: {e}")
|
|
|
|
await session.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating wallet balances: {e}")
|
|
await session.rollback()
|
|
|
|
async def _index_nft_collections(self) -> None:
|
|
"""Index NFT collections and metadata."""
|
|
async with db_manager.get_session() as session:
|
|
try:
|
|
# Get wallets to check for NFTs
|
|
result = await session.execute(
|
|
select(Wallet)
|
|
.where(Wallet.is_active == True)
|
|
.limit(self.batch_size // 4) # Smaller batch for NFTs
|
|
)
|
|
wallets = result.scalars().all()
|
|
|
|
for wallet in wallets:
|
|
try:
|
|
# Get NFTs for this wallet
|
|
nfts = await self.ton_service.get_wallet_nfts(wallet.address)
|
|
|
|
for nft_data in nfts:
|
|
await self._process_nft(session, wallet, nft_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error indexing NFTs for wallet {wallet.address}: {e}")
|
|
|
|
await session.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error indexing NFT collections: {e}")
|
|
await session.rollback()
|
|
|
|
async def _process_nft(self, session: AsyncSession, wallet: Wallet, nft_data: Dict[str, Any]) -> None:
|
|
"""Process a single NFT."""
|
|
try:
|
|
# Check if NFT exists
|
|
result = await session.execute(
|
|
select(BlockchainNFT)
|
|
.where(
|
|
BlockchainNFT.token_id == nft_data["token_id"],
|
|
BlockchainNFT.collection_address == nft_data["collection_address"]
|
|
)
|
|
)
|
|
existing_nft = result.scalar_one_or_none()
|
|
|
|
if existing_nft:
|
|
# Update existing NFT
|
|
existing_nft.owner_address = wallet.address
|
|
existing_nft.metadata = nft_data.get("metadata", {})
|
|
existing_nft.updated_at = datetime.utcnow()
|
|
else:
|
|
# Create new NFT
|
|
new_nft = BlockchainNFT(
|
|
wallet_id=wallet.id,
|
|
token_id=nft_data["token_id"],
|
|
collection_address=nft_data["collection_address"],
|
|
owner_address=wallet.address,
|
|
token_uri=nft_data.get("token_uri"),
|
|
metadata=nft_data.get("metadata", {}),
|
|
created_at=datetime.utcnow()
|
|
)
|
|
session.add(new_nft)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing NFT {nft_data.get('token_id')}: {e}")
|
|
|
|
async def _index_nft_transfers(self) -> None:
|
|
"""Index NFT transfers."""
|
|
# This would involve checking recent blocks for NFT transfer events
|
|
# Implementation depends on the specific blockchain's event system
|
|
pass
|
|
|
|
async def _update_token_balances(self) -> None:
|
|
"""Update token balances for wallets."""
|
|
async with db_manager.get_session() as session:
|
|
try:
|
|
# Get wallets with token balances to update
|
|
result = await session.execute(
|
|
select(Wallet)
|
|
.where(Wallet.is_active == True)
|
|
.limit(self.batch_size // 2)
|
|
)
|
|
wallets = result.scalars().all()
|
|
|
|
for wallet in wallets:
|
|
try:
|
|
# Get token balances
|
|
token_balances = await self.ton_service.get_wallet_token_balances(wallet.address)
|
|
|
|
for token_data in token_balances:
|
|
await self._update_token_balance(session, wallet, token_data)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating token balances for {wallet.address}: {e}")
|
|
|
|
await session.commit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating token balances: {e}")
|
|
await session.rollback()
|
|
|
|
async def _update_token_balance(
|
|
self,
|
|
session: AsyncSession,
|
|
wallet: Wallet,
|
|
token_data: Dict[str, Any]
|
|
) -> None:
|
|
"""Update a single token balance."""
|
|
try:
|
|
# Check if balance record exists
|
|
result = await session.execute(
|
|
select(BlockchainTokenBalance)
|
|
.where(
|
|
BlockchainTokenBalance.wallet_id == wallet.id,
|
|
BlockchainTokenBalance.token_address == token_data["token_address"]
|
|
)
|
|
)
|
|
existing_balance = result.scalar_one_or_none()
|
|
|
|
if existing_balance:
|
|
# Update existing balance
|
|
existing_balance.balance = token_data["balance"]
|
|
existing_balance.decimals = token_data.get("decimals", 18)
|
|
existing_balance.updated_at = datetime.utcnow()
|
|
else:
|
|
# Create new balance record
|
|
new_balance = BlockchainTokenBalance(
|
|
wallet_id=wallet.id,
|
|
token_address=token_data["token_address"],
|
|
token_name=token_data.get("name"),
|
|
token_symbol=token_data.get("symbol"),
|
|
balance=token_data["balance"],
|
|
decimals=token_data.get("decimals", 18),
|
|
created_at=datetime.utcnow()
|
|
)
|
|
session.add(new_balance)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error updating token balance: {e}")
|
|
|
|
async def _cleanup_old_cache_entries(self) -> None:
|
|
"""Clean up old cache entries."""
|
|
try:
|
|
# Get all keys with our prefixes
|
|
patterns = ["tx:*", "balance:*", "nft:*", "token:*"]
|
|
|
|
for pattern in patterns:
|
|
keys = await self.redis_client.keys(pattern)
|
|
|
|
# Check TTL and remove expired keys
|
|
for key in keys:
|
|
ttl = await self.redis_client.ttl(key)
|
|
if ttl == -1: # No expiration set
|
|
await self.redis_client.expire(key, 3600) # Set 1 hour expiration
|
|
|
|
logger.debug("Cache cleanup completed")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during cache cleanup: {e}")
|
|
|
|
async def get_indexing_stats(self) -> Dict[str, Any]:
|
|
"""Get indexing statistics."""
|
|
try:
|
|
async with db_manager.get_session() as session:
|
|
# Get transaction stats
|
|
tx_result = await session.execute(
|
|
select(Transaction.status, asyncio.func.count())
|
|
.group_by(Transaction.status)
|
|
)
|
|
tx_stats = dict(tx_result.fetchall())
|
|
|
|
# Get wallet stats
|
|
wallet_result = await session.execute(
|
|
select(asyncio.func.count())
|
|
.select_from(Wallet)
|
|
.where(Wallet.is_active == True)
|
|
)
|
|
active_wallets = wallet_result.scalar()
|
|
|
|
# Get NFT stats
|
|
nft_result = await session.execute(
|
|
select(asyncio.func.count())
|
|
.select_from(BlockchainNFT)
|
|
)
|
|
total_nfts = nft_result.scalar()
|
|
|
|
return {
|
|
"transaction_stats": tx_stats,
|
|
"active_wallets": active_wallets,
|
|
"total_nfts": total_nfts,
|
|
"is_running": self.is_running,
|
|
"active_tasks": len([t for t in self.tasks if not t.done()]),
|
|
"last_update": datetime.utcnow().isoformat()
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error getting indexing stats: {e}")
|
|
return {"error": str(e)}
|
|
|
|
|
|
# Global indexer instance
|
|
indexer_service = IndexerService()
|