from typing import Iterator, Callable, Protocol, Literal import traceback from pycfhelpers.logger import log from DAP.Crypto import HashFast, Cert from CellFrame.Network import Net from CellFrame.Chain import ChainAtomPtr, Ledger, ChainAddr, Mempool from .consensus import CFEvent, CFBlock from .datums import CFDatum, CFSubDatum, CFDatumTX, CFDatumEmission from .types import CFNetState, ticker, ledger_cache_rc from ..common.types import ChainTypes from dataclasses import dataclass class NetFee: def __init__(self, net: 'CFNet'): self.net = net self.tx_fee = str(self.net._origin_net.txFee) self.tx_fee_addr = str(self.net._origin_net.txFeeAddr) self.validator_avg_fee = str(self.net._origin_net.validatorAverageFee) self.validator_max_fee = str(self.net._origin_net.validatorMaxFee) self.validator_min_fee = str(self.net._origin_net.validatorMinFee) self.native_ticker = self.net._origin_net.nativeTicker class CFNet: def __init__(self, name: str): self.name = name self._origin_net = Net.byName(name) if not self._origin_net: raise RuntimeError(f"No such net: {name}") self.address = str(self._origin_net.getCurAddr()) self.main = CFChain(self, 'main') self.zerochain = CFChain(self, 'zerochain') self.fee_data = NetFee(self) @staticmethod def active_nets() -> list['CFNet']: return [CFNet(n.getName()) for n in Net.getNets()] @staticmethod def net_id_from_wallet(wallet) -> int: return ChainAddr.fromStr(str(wallet)).getNetId().long() @property def id(self) -> int: return self._origin_net.id.long() @property def chains(self) -> list['CFChain']: return [self.main, self.zerochain] @property def group_alias(self) -> str: return self._origin_net.gdb_group_alias def get_ledger(self) -> 'CFLedger': return CFLedger(self, self._origin_net.getLedger()) def register_gdbsync_notification_callback(self, callback: Callable, *args, **kwargs): def callback_wrapper(op_code, group, key, value, *other): callback(self, op_code, group, key, value, *args, net=self, **kwargs) self._origin_net.addNotify(callback_wrapper, ()) def change_state(self, state: CFNetState): if state == CFNetState.NET_STATE_OFFLINE: self._origin_net.stop() elif state == CFNetState.NET_STATE_ONLINE: self._origin_net.start() else: raise NotImplemented("This state not implemented") class MempoolCallback(Protocol): def __call__(self, op_code: Literal["a", "d"], datum: CFDatum, *args, chain: 'CFChain', **kwargs) -> None: pass class CFChain: def __init__(self, net: CFNet, chain_name: str): self.net = net self._origin_chain = net._origin_net.getChainByName(chain_name) if not self._origin_chain: raise RuntimeError(f"chain with name={chain_name} not found in net with name={net.name}") self.type = self._origin_chain.getCSName() self.name = chain_name def get_atoms(self) -> Iterator[CFBlock | CFEvent]: iterator = self._origin_chain.createAtomIter(False) ptr = self._origin_chain.atomIterGetFirst(iterator) if not ptr: log.message("not ptr") return [] atom, size = ptr while atom: if size <= 0: atom, size = self._origin_chain.atomIterGetNext(iterator) continue if self.type == ChainTypes.esbocs: yield CFBlock(atom=atom, chain=self) elif self.type == ChainTypes.dag_poa: yield CFEvent(atom=atom, chain=self) else: raise TypeError(f"Invalid Chain type={self.type}") atom, size = self._origin_chain.atomIterGetNext(iterator) def get_datums(self) -> Iterator[CFDatum]: for atom in self.get_atoms(): for datum in atom.get_datums(): yield datum def get_transactions(self) -> Iterator[CFDatumTX]: for datum in self.get_datums(): if datum._origin_datum.isDatumTX(): yield datum.get_sub_datum() def get_mempool(self) -> 'CFMempool': return CFMempool(self) def register_mempool_notification_callback(self, callback: MempoolCallback, *args, **kwargs): def callback_wrapper(op_code, group, key, value, *other): try: mempool = CFMempool(self) if value: datum = mempool.get_datum_from_bytes(value) else: datum = key log.message(f"Mempool callback: {op_code=} {key=} {datum=} ") callback(op_code, datum, *args, chain=self, **kwargs) except Exception: log.error(f"Error = {traceback.format_exc()}") try: self._origin_chain.addMempoolNotify(callback_wrapper, ()) except Exception: log.error(f"Error = {traceback.format_exc()}") def register_atom_notification_callback(self, callback, *args, **kwargs): def callback_wrapper(atom: ChainAtomPtr, size, *other): try: if self.type == ChainTypes.esbocs: cf_atom = CFBlock(atom, self) elif self.type == ChainTypes.dag_poa: cf_atom = CFEvent(atom, self) else: raise TypeError(f"Invalid Chain type={self.type}") callback(cf_atom, size, *args, chain=self, **kwargs) except Exception: log.error(traceback.format_exc()) self._origin_chain.addAtomNotify(callback_wrapper, ()) class CFLedger: def __init__(self, net: CFNet, ledger: Ledger = None): self.net = net self._origin_ledger = ledger or self.net.get_ledger() def get_tx_ticker(self, datum: CFDatumTX) -> ticker | None: return self._origin_ledger.txGetMainTickerAndLedgerRc(datum._origin_sub_datum)[0] def get_tx_ledger_rc(self, datum: CFDatumTX) -> ledger_cache_rc | None: return self._origin_ledger.txGetMainTickerAndLedgerRc(datum._origin_sub_datum)[1] def calc_address_balances(self, address: str) -> dict[ticker, str] | dict: res = {} chain_addr = ChainAddr.fromStr(address) tickers = self._origin_ledger.addrGetTokenTickerAll(chain_addr) for ticker in tickers: balance = str(self._origin_ledger.calcBalance(chain_addr, ticker)) res[ticker] = balance return res def token_auth_signs_pkey_hashes(self, datum: CFDatum) -> list[str]: # list[hashes] """ Хэши публичных ключей для подписи эмиссии тикера. cellframe-node-cli ledger list coins -net raiden """ pkey_hashes = self._origin_ledger.tokenAuthPkeysHashes(datum.get_sub_datum().ticker) return [str(pkey_hash) for pkey_hash in pkey_hashes] def token_auth_signs_valid(self, datum: CFDatum) -> int: """ Количество валидных подписей """ return self._origin_ledger.tokenAuthSignsValid(datum.get_sub_datum().ticker) or 0 def token_auth_signs_total(self, datum: CFDatum) -> int: """ Максимально возможное количество подписей """ return self._origin_ledger.tokenAuthSignsTotal(datum.get_sub_datum().ticker) or 0 def has_emission(self, emission: CFDatumEmission) -> bool: """ Check Emission in chain """ sub_datum = self._origin_ledger.tokenEmissionFind(HashFast.fromString(emission.hash)) if sub_datum is None: return False return True def tx_by_hash(self, hash: str) -> CFDatumTX: hf = HashFast.fromString(hash) tx = CFDatumTX(self._origin_ledger.txFindByHash(hf)) return tx def register_ledger_tx_notification_callback(self, callback, *args, **kwargs): def callback_wrapper(ledger, tx, *other): callback(ledger, tx, *args, net=self.net, **kwargs) self._origin_ledger.txAddNotify(callback_wrapper, (args, kwargs)) class CFMempool: def __init__(self, chain: CFChain): self.chain = chain def get_datums(self) -> list[CFDatum]: data = Mempool.list(self.chain.net._origin_net, self.chain._origin_chain) return [CFDatum(None, datum, net=self.chain.net) for datum in data.values()] def get_datum_from_bytes(self, value: bytes) -> CFDatum | None: datum = Mempool.datumExtract(value) if datum is None: return None # raise ValueError(f"Incorrect bytes={value} to extract to Datum") return CFDatum(None, datum, self.chain.net) def valid_signs_table(self): pass def reason(self): pass def remove(self, datum: CFDatum | CFSubDatum) -> bool: return Mempool.remove(self.chain._origin_chain, datum.hash)