wallet-v3cr3/scripts/_core.py

182 lines
6.3 KiB
Python

import os
import pickle
from decimal import Decimal
from time import sleep
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
from tonsdk.boc import begin_cell
from tonsdk.contract import Contract
from tonsdk.contract.token.ft import JettonWallet
from tonsdk.contract.token.nft import NFTItem
from tonsdk.utils import Address
from toncenter import TonCenter
from wallet_contract import WalletV3CR3
ACTION_TYPES_DESC = {
1: "Send TON",
2: "Send Jetton",
3: "Send NFT",
4: "Update wallet contract",
7: "New decentralized note",
}
def unpack_wallet(wallet_settings):
kwargs = {}
if wallet_settings['custom_address']:
kwargs['address'] = Address(wallet_settings['custom_address'])
wallet = WalletV3CR3(
public_key=bytes.fromhex(wallet_settings['public_key']),
private_key=bytes.fromhex(wallet_settings['secret_key']),
subwallet_id=wallet_settings['subwallet_id'],
trusted_hashpart=0,
**kwargs
)
return wallet
def generate_key(password, salt, iterations=100000):
key = PBKDF2(password, salt, dkLen=32, count=iterations)
return key
def encrypt_data(data: bytes, password: bytes) -> bytes:
salt = os.urandom(16)
key = generate_key(password, salt)
cipher = AES.new(key, AES.MODE_GCM)
ciphertext, tag = cipher.encrypt_and_digest(data)
return pickle.dumps([salt, ciphertext, tag, cipher.nonce])
def decrypt_data(data: bytes, password: bytes) -> bytes:
salt, ciphertext, tag, nonce = pickle.loads(data)
key = generate_key(password, salt)
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
plaintext = cipher.decrypt_and_verify(ciphertext, tag)
return plaintext
async def perform_action(wallet_settings, _commands):
wallet = unpack_wallet(wallet_settings)
toncenter = TonCenter(testnet=wallet_settings.get("is_testnet", True))
async def get_wallet_seqno():
result = await toncenter.run_get_method(wallet.address.to_string(1, 1, 1), 'seqno')
if result.get('exit_code', -1) != 0:
seqno = 0
else:
seqno = int(result['stack'][0][1], 16)
sleep(1)
return seqno
seqno = -100
_new_deploy = False
while seqno < 0:
seqno = await get_wallet_seqno()
if seqno < 1:
_new_deploy = True
result = await toncenter.send_boc(
wallet.create_external_message(
begin_cell()
.store_cell(wallet.create_signing_message(0, 60))
.store_ref(begin_cell().end_cell())
.end_cell(), 0
)['message'].to_boc(False)
)
print("Init transaction broadcast result:", result)
sleep(7)
if _new_deploy:
seqno += 1
print(f"=== Current seqno: {seqno} ===")
assert input("Broadcast transaction? (yes/NO): ").lower() == 'yes'
signing_message = begin_cell().store_cell(wallet.create_signing_message(seqno, 60))
signing_message = signing_message.store_ref(_commands)
query = wallet.create_external_message(
signing_message.end_cell(), seqno, False
)
print("Raw signed transaction:", query['message'].to_boc(False).hex())
result = await toncenter.send_boc(query['message'].to_boc(False))
print("Transaction broadcast result:", result)
def serialize_command(action, response_address: Address = None):
if action['type'] == 1:
return (
begin_cell()
.store_uint(0xAAC1, 32)
.store_uint(action['args'].get('send_mode', 1), 8)
.store_ref(
Contract.create_common_msg_info(
Contract.create_internal_message_header(
Address(action['args']['destination']), Decimal(action['args']['amount'])
), action['args'].get('state_init'), action['args'].get('payload_cell')
)
)
.end_cell()
)
elif action['type'] == 2:
return (
begin_cell()
.store_uint(0xAAC1, 32)
.store_uint(action['args'].get('send_mode', 1), 8)
.store_ref(
Contract.create_common_msg_info(
Contract.create_internal_message_header(
Address(action['args']['jetton_wallet']), Decimal(1e8)
), None, JettonWallet().create_transfer_body(
Address(action['args']['recipient']), action['args']['amount'],
forward_amount=1e7, forward_payload=(
(bytes(4) + action['args']['comment']) if action['args'].get('comment') else None
),
response_address=response_address
)
)
)
.end_cell()
)
elif action['type'] == 3:
return (
begin_cell()
.store_uint(0xAAC1, 32)
.store_uint(action['args'].get('send_mode', 1), 8)
.store_ref(
Contract.create_common_msg_info(
Contract.create_internal_message_header(
Address(action['args']['nft_address']), Decimal(1e8)
), None, NFTItem().create_transfer_body(
Address(action['args']['recipient']), response_address=response_address,
forward_amount = 1e7
)
)
)
.end_cell()
)
elif action['type'] == 4:
return (
begin_cell()
.store_uint(0xAAA0, 32)
.store_ref(action['args']['new_code'])
.store_ref(action['args']['new_data'])
.end_cell()
)
raise Exception('Unsupported action type')
def print_actions(actions, prefix='\n' + "=== {actions_count} actions selected"):
if not actions: return
print(prefix.format(actions_count=len(actions)))
for action in actions:
print(f"Action {ACTION_TYPES_DESC[action['type']]}: {action['args']}")
print("===")