from typing import Iterator import traceback from DAP.Core import logIt from DAP.Crypto import HashFast from CellFrame.Network import Net from CellFrame.Chain import ChainAtomPtr, Ledger, ChainAddr from .consensus import CFEvent, CFBlock from .datums import CFDatum, CFDatumTX from .types import CFNetState, ticker from ..common.types import ChainTypes class CFNet: def __init__(self, name: str): self.name = name self._origin_net = Net.byName(name) if not self._origin_net: raise RuntimeError(f"No such net: {name}") self.main = CFChain(self, 'main') self.zerochain = CFChain(self, 'zerochain') @property def id(self) -> int: return self._origin_net.id.long() @property def chains(self) -> list['CFChain']: return [self.main, self.zerochain] def get_ledger(self): return CFLedger(self, self._origin_net.getLedger()) def register_gdbsync_notification_callback(self, callback, *args, **kwargs): def callback_wrapper(op_code, group, key, value, *other): callback(self, op_code, group, key, value, *args, net=self, **kwargs) self._origin_net.addNotify(callback_wrapper, ()) def change_state(self, state: CFNetState): if state == CFNetState.NET_STATE_OFFLINE: self._origin_net.stop() elif state == CFNetState.NET_STATE_ONLINE: self._origin_net.start() else: raise NotImplemented("This state not implemented") class CFChain: def __init__(self, net: CFNet, chain_name: str): self.net = net self._origin_chain = net._origin_net.getChainByName(chain_name) if not self._origin_chain: raise RuntimeError(f"chain with name={chain_name} not found in net with name={net.name}") self.type = self._origin_chain.getCSName() self.name = chain_name def get_atoms(self) -> Iterator[CFBlock | CFEvent]: iterator = self._origin_chain.createAtomIter(False) ptr = self._origin_chain.atomIterGetFirst(iterator) if not ptr: logIt.message("not ptr") return [] atom, size = ptr while atom: if size <= 0: atom, size = self._origin_chain.atomIterGetNext(iterator) continue if self.type == ChainTypes.esbocs: yield CFBlock(atom=atom, chain=self) elif self.type == ChainTypes.dag_poa: yield CFEvent(atom=atom, chain=self) else: raise TypeError(f"Invalid Chain type={self.type}") atom, size = self._origin_chain.atomIterGetNext(iterator) def get_datums(self) -> Iterator[CFDatum]: for atom in self.get_atoms(): for datum in atom.get_datums(): yield datum def get_transactions(self) -> Iterator[CFDatumTX]: for datum in self.get_datums(): if datum._origin_datum.isDatumTX(): yield datum.get_sub_datum() def register_mempool_notification_callback(self, callback, *args, **kwargs): def callback_wrapper(op_code, group, key, value, *other): logIt.message(f"{other=}") try: callback(op_code, group, key, value, *args, chain=self, **kwargs) except Exception: logIt.error(f"Error = {traceback.format_exc()}") try: self._origin_chain.addMempoolNotify(callback_wrapper, ()) except Exception: logIt.error(f"Error = {traceback.format_exc()}") def register_atom_notification_callback(self, callback, *args, **kwargs): def callback_wrapper(atom: ChainAtomPtr, size, *other): try: if self.type == ChainTypes.esbocs: cf_atom = CFBlock(atom, self) elif self.type == ChainTypes.dag_poa: cf_atom = CFEvent(atom, self) else: raise TypeError(f"Invalid Chain type={self.type}") callback(cf_atom, size, *args, chain=self, **kwargs) except Exception: logIt.error(traceback.format_exc()) self._origin_chain.addAtomNotify(callback_wrapper, ()) class CFLedger: def __init__(self, net: CFNet, ledger: Ledger = None): self.net = net self._origin_ledger = ledger or self.net.get_ledger() def get_tx_ticker(self, datum: CFDatumTX) -> ticker | None: return self._origin_ledger.txGetTokenTickerByHash(HashFast.fromString(str(datum.hash))) def calc_address_balances(self, address: str) -> dict[ticker, str] | dict: res = {} chain_addr = ChainAddr.fromStr(address) tickers = self._origin_ledger.addrGetTokenTickerAll(chain_addr) for ticker in tickers: balance = str(self._origin_ledger.calcBalance(chain_addr, ticker)) res[ticker] = balance return res def register_ledger_tx_notification_callback(self, callback, *args, **kwargs): def callback_wrapper(ledger, tx, *other): callback(ledger, tx, *args, net=self.net, **kwargs) self._origin_ledger.txAddNotify(callback_wrapper, (args, kwargs))