Newer
Older
from typing import Iterator, Callable, Protocol, Literal
from CellFrame.Network import Net, NetID
from CellFrame.Chain import ChainAtomPtr, Ledger, ChainAddr, Mempool
from .datums import CFDatum, CFSubDatum, CFDatumTX, CFDatumEmission
from .types import CFNetState, ticker, ledger_cache_rc
from dataclasses import dataclass
class NetFee:
def __init__(self, net: 'CFNet'):
self.net = net
self.tx_fee = str(self.net._origin_net.txFee)
self.tx_fee_addr = str(self.net._origin_net.txFeeAddr)
self.validator_avg_fee = str(self.net._origin_net.validatorAverageFee)
self.validator_max_fee = str(self.net._origin_net.validatorMaxFee)
self.validator_min_fee = str(self.net._origin_net.validatorMinFee)
self.native_ticker = self.net._origin_net.nativeTicker
class CFNetID:
def __init__(self, str_or_origin : str | NetID):
if isinstance(str_or_origin, NetID):
self._origin_net_id = str_or_origin
if isinstance(str_or_origin, str):
self._origin_net_id = NetID.fromStr(str_or_origin)
@property
def long(self):
return self._origin_net_id.long()
def __init__(self, name_or_id: str | CFNetID ):
if isinstance(name_or_id, str):
self._origin_net = Net.byName(name_or_id)
if isinstance(name_or_id, CFNetID):
self._origin_net = Net.byId(name_or_id._origin_net_id)
if not self._origin_net:
raise RuntimeError(f"No such net: {name_or_id}")
self.name = self._origin_net.getName()
self.main = CFChain(self, 'main')
self.zerochain = CFChain(self, 'zerochain')
@staticmethod
def active_nets() -> list['CFNet']:
return [CFNet(n.getName()) for n in Net.getNets()]
def net_id_from_wallet_str(wallet: str) -> int:
def id(self) -> CFNetID:
return CFNetID(self._origin_net.id)
@property
def chains(self) -> list['CFChain']:
return [self.main, self.zerochain]
def group_alias(self) -> str:
return self._origin_net.gdb_group_alias
def register_gdbsync_notification_callback(self, callback: Callable, *args, **kwargs):
def callback_wrapper(op_code, group, key, value, *other):
callback(self, op_code, group, key, value, *args, net=self, **kwargs)
self._origin_net.addNotify(callback_wrapper, ())
def change_state(self, state: CFNetState):
if state == CFNetState.NET_STATE_OFFLINE:
self._origin_net.stop()
elif state == CFNetState.NET_STATE_ONLINE:
self._origin_net.start()
else:
raise NotImplemented("This state not implemented")
class MempoolCallback(Protocol):
def __call__(self, op_code: Literal["a", "d"], datum: CFDatum, *args, chain: 'CFChain', **kwargs) -> None:
pass
class CFChain:
def __init__(self, net: CFNet, chain_name: str):
self.net = net
self._origin_chain = net._origin_net.getChainByName(chain_name)
if not self._origin_chain:
raise RuntimeError(f"chain with name={chain_name} not found in net with name={net.name}")
self.type = self._origin_chain.getCSName()
self.name = chain_name
def get_atoms(self) -> Iterator[CFBlock | CFEvent]:
iterator = self._origin_chain.createAtomIter(False)
ptr = self._origin_chain.atomIterGetFirst(iterator)
if not ptr:
return []
atom, size = ptr
while atom:
if size <= 0:
atom, size = self._origin_chain.atomIterGetNext(iterator)
continue
if self.type == ChainTypes.esbocs:
yield CFBlock(atom=atom, chain=self)
elif self.type == ChainTypes.dag_poa:
yield CFEvent(atom=atom, chain=self)
else:
raise TypeError(f"Invalid Chain type={self.type}")
atom, size = self._origin_chain.atomIterGetNext(iterator)
def get_datums(self, type : CFSubDatum | None = None) -> Iterator[CFDatum]:
if type is None:
yield datum
elif isinstance(datum, type):
yield datum
else:
continue
def get_transactions(self) -> Iterator[CFDatumTX]:
for datum in self.get_datums():
if datum._origin_datum.isDatumTX():
yield datum.get_sub_datum()
def get_mempool(self) -> 'CFMempool':
return CFMempool(self)
def register_mempool_notification_callback(self, callback: MempoolCallback, *args, **kwargs):
if value:
datum = mempool.get_datum_from_bytes(value)
else:
datum = key
log.message(f"Mempool callback: {op_code=} {key=} {datum=} ")
try:
self._origin_chain.addMempoolNotify(callback_wrapper, ())
except Exception:
def register_atom_notification_callback(self, callback, *args, **kwargs):
def callback_wrapper(atom: ChainAtomPtr, size, *other):
try:
if self.type == ChainTypes.esbocs:
cf_atom = CFBlock(atom, self)
elif self.type == ChainTypes.dag_poa:
cf_atom = CFEvent(atom, self)
else:
raise TypeError(f"Invalid Chain type={self.type}")
callback(cf_atom, size, *args, chain=self, **kwargs)
except Exception:
self._origin_chain.addAtomNotify(callback_wrapper, ())
class CFLedger:
def __init__(self, net: CFNet, ledger: Ledger = None):
self.net = net
self._origin_ledger = ledger or self.net.get_ledger()
def get_tx_ticker(self, datum: CFDatumTX) -> ticker | None:
return self._origin_ledger.txGetMainTickerAndLedgerRc(datum._origin_sub_datum)[0]
def get_tx_ledger_rc(self, datum: CFDatumTX) -> ledger_cache_rc | None:
return self._origin_ledger.txGetMainTickerAndLedgerRc(datum._origin_sub_datum)[1]
def calc_address_balances(self, address: str) -> dict[ticker, str] | dict:
res = {}
chain_addr = ChainAddr.fromStr(address)
tickers = self._origin_ledger.addrGetTokenTickerAll(chain_addr)
for ticker in tickers:
balance = str(self._origin_ledger.calcBalance(chain_addr, ticker))
res[ticker] = balance
return res
def token_auth_signs_pkey_hashes(self, datum: CFDatum) -> list[str]: # list[hashes]
"""
Хэши публичных ключей для подписи эмиссии тикера. cellframe-node-cli ledger list coins -net raiden
"""
pkey_hashes = self._origin_ledger.tokenAuthPkeysHashes(datum.get_sub_datum().ticker)
return [str(pkey_hash) for pkey_hash in pkey_hashes]
def token_auth_signs_valid(self, datum: CFDatum) -> int:
"""
Количество валидных подписей
"""
return self._origin_ledger.tokenAuthSignsValid(datum.get_sub_datum().ticker) or 0
def token_auth_signs_total(self, datum: CFDatum) -> int:
"""
Максимально возможное количество подписей
"""
return self._origin_ledger.tokenAuthSignsTotal(datum.get_sub_datum().ticker) or 0
def has_emission(self, emission: CFDatumEmission) -> bool:
"""
Check Emission in chain
"""
sub_datum = self._origin_ledger.tokenEmissionFind(HashFast.fromString(emission.hash))
if sub_datum is None:
return False
return True
def get_emission(self, emission_hash: str) -> CFDatumEmission:
"""
Get emission from ledger
"""
return CFDatumEmission(None, self._origin_ledger.tokenEmissionFind(HashFast.fromString(emission_hash)),net=self.net)
def tx_by_hash(self, hash: str) -> CFDatumTX:
hf = HashFast.fromString(hash)
tx = CFDatumTX(None, self._origin_ledger.txFindByHash(hf), net=self.net)
def register_ledger_tx_notification_callback(self, callback, *args, **kwargs):
def callback_wrapper(ledger, tx, *other):
callback(ledger, tx, *args, net=self.net, **kwargs)
self._origin_ledger.txAddNotify(callback_wrapper, (args, kwargs))
class CFMempool:
def __init__(self, chain: CFChain):
self.chain = chain
def get_datums(self) -> list[CFDatum]:
data = Mempool.list(self.chain.net._origin_net, self.chain._origin_chain)
return [CFDatum(None, datum, net=self.chain.net) for datum in data.values()]
def get_datum_from_bytes(self, value: bytes) -> CFDatum | None:
datum = Mempool.datumExtract(value)
if datum is None:
return None
# raise ValueError(f"Incorrect bytes={value} to extract to Datum")
return CFDatum(None, datum, self.chain.net)
def valid_signs_table(self):
pass
def reason(self):
pass
def remove(self, datum: CFDatum | CFSubDatum) -> bool:
return Mempool.remove(self.chain._origin_chain, datum.hash)