Skip to content
Snippets Groups Projects
net.py 9.95 KiB
Newer Older
boo's avatar
boo committed
from typing import Iterator, Callable, Protocol, Literal
boo's avatar
boo committed

import traceback

boo's avatar
boo committed
from pycfhelpers.logger import log
boo's avatar
boo committed
from DAP.Crypto import HashFast, Cert
from CellFrame.Network import Net, NetID
boo's avatar
boo committed
from CellFrame.Chain import ChainAtomPtr, Ledger, ChainAddr, Mempool
boo's avatar
boo committed

from .consensus import CFEvent, CFBlock
Dmitry Puzyrkov's avatar
Dmitry Puzyrkov committed
from .datums import CFDatum, CFSubDatum,  CFDatumTX, CFDatumEmission
from .types import CFNetState, ticker, ledger_cache_rc
boo's avatar
boo committed
from ..common.types import ChainTypes
boo's avatar
boo committed
from dataclasses import dataclass


class NetFee:
    def __init__(self, net: 'CFNet'):
        self.net = net
        self.tx_fee = str(self.net._origin_net.txFee)
        self.tx_fee_addr = str(self.net._origin_net.txFeeAddr)
        self.validator_avg_fee = str(self.net._origin_net.validatorAverageFee)
        self.validator_max_fee = str(self.net._origin_net.validatorMaxFee)
        self.validator_min_fee = str(self.net._origin_net.validatorMinFee)
        self.native_ticker = self.net._origin_net.nativeTicker
boo's avatar
boo committed

class CFNetID:
    def __init__(self, str_or_origin : str | NetID):
        if isinstance(str_or_origin, NetID):
            self._origin_net_id = str_or_origin
        if isinstance(str_or_origin, str):
            self._origin_net_id = NetID.fromStr(str_or_origin)
    @property
    def long(self):
        return self._origin_net_id.long()
boo's avatar
boo committed

class CFNet:
    def __init__(self, name_or_id: str | CFNetID ):

        if isinstance(name_or_id, str):
            self._origin_net = Net.byName(name_or_id)

        if isinstance(name_or_id, CFNetID):
            self._origin_net = Net.byId(name_or_id._origin_net_id)
boo's avatar
boo committed

        if not self._origin_net:
            raise RuntimeError(f"No such net: {name_or_id}")
        
        self.name = self._origin_net.getName()
boo's avatar
boo committed
        self.address = str(self._origin_net.getCurAddr())
boo's avatar
boo committed
        self.main = CFChain(self, 'main')
        self.zerochain = CFChain(self, 'zerochain')
boo's avatar
boo committed
        self.fee_data = NetFee(self)
boo's avatar
boo committed

boo's avatar
boo committed
    @staticmethod
    def active_nets() -> list['CFNet']:
        return [CFNet(n.getName()) for n in Net.getNets()]

boo's avatar
boo committed
    @staticmethod
    def net_id_from_wallet_str(wallet: str) -> int:
boo's avatar
boo committed
        return ChainAddr.fromStr(str(wallet)).getNetId().long()

boo's avatar
boo committed
    @property
    def id(self) -> CFNetID:
        return CFNetID(self._origin_net.id)
boo's avatar
boo committed

    @property
    def chains(self) -> list['CFChain']:
        return [self.main, self.zerochain]

boo's avatar
boo committed
    @property
    def group_alias(self) -> str:
        return self._origin_net.gdb_group_alias
boo's avatar
boo committed

boo's avatar
boo committed
    def get_ledger(self) -> 'CFLedger':
boo's avatar
boo committed
        return CFLedger(self, self._origin_net.getLedger())

    def register_gdbsync_notification_callback(self, callback: Callable, *args, **kwargs):
boo's avatar
boo committed
        def callback_wrapper(op_code, group, key, value, *other):
            callback(self, op_code, group, key, value, *args, net=self, **kwargs)

        self._origin_net.addNotify(callback_wrapper, ())

    def change_state(self, state: CFNetState):
        if state == CFNetState.NET_STATE_OFFLINE:
            self._origin_net.stop()
        elif state == CFNetState.NET_STATE_ONLINE:
            self._origin_net.start()
        else:
            raise NotImplemented("This state not implemented")


boo's avatar
boo committed
class MempoolCallback(Protocol):
    def __call__(self, op_code: Literal["a", "d"], datum: CFDatum, *args, chain: 'CFChain', **kwargs) -> None:
        pass


boo's avatar
boo committed
class CFChain:
    def __init__(self, net: CFNet, chain_name: str):
        self.net = net
        self._origin_chain = net._origin_net.getChainByName(chain_name)
        if not self._origin_chain:
            raise RuntimeError(f"chain with name={chain_name} not found in net with name={net.name}")
        self.type = self._origin_chain.getCSName()
        self.name = chain_name

    def get_atoms(self) -> Iterator[CFBlock | CFEvent]:
        iterator = self._origin_chain.createAtomIter(False)
        ptr = self._origin_chain.atomIterGetFirst(iterator)

        if not ptr:
boo's avatar
boo committed
            log.message("not ptr")
boo's avatar
boo committed
            return []

        atom, size = ptr
        while atom:
            if size <= 0:
                atom, size = self._origin_chain.atomIterGetNext(iterator)
                continue
            if self.type == ChainTypes.esbocs:
                yield CFBlock(atom=atom, chain=self)
            elif self.type == ChainTypes.dag_poa:
                yield CFEvent(atom=atom, chain=self)
            else:
                raise TypeError(f"Invalid Chain type={self.type}")

            atom, size = self._origin_chain.atomIterGetNext(iterator)

    def get_datums(self, type : CFSubDatum | None = None) -> Iterator[CFDatum]:
boo's avatar
boo committed
        for atom in self.get_atoms():
            for datum in atom.get_datums():
                if type is None:
                    yield datum
                elif isinstance(datum, type):
                    yield datum
                else:
                    continue
                    
boo's avatar
boo committed

    def get_transactions(self) -> Iterator[CFDatumTX]:
        for datum in self.get_datums():
            if datum._origin_datum.isDatumTX():
                yield datum.get_sub_datum()

boo's avatar
boo committed
    def get_mempool(self) -> 'CFMempool':
        return CFMempool(self)

boo's avatar
boo committed
    def register_mempool_notification_callback(self, callback: MempoolCallback, *args, **kwargs):
boo's avatar
boo committed
        def callback_wrapper(op_code, group, key, value, *other):
            try:
boo's avatar
boo committed
                mempool = CFMempool(self)
Dmitry Puzyrkov's avatar
Dmitry Puzyrkov committed
                if value:
                    datum = mempool.get_datum_from_bytes(value)
                else:
                    datum = key

                log.message(f"Mempool callback: {op_code=} {key=} {datum=}  ")
boo's avatar
boo committed
                callback(op_code, datum, *args, chain=self, **kwargs)
boo's avatar
boo committed
            except Exception:
boo's avatar
boo committed
                log.error(f"Error = {traceback.format_exc()}")
boo's avatar
boo committed

        try:
            self._origin_chain.addMempoolNotify(callback_wrapper, ())
        except Exception:
boo's avatar
boo committed
            log.error(f"Error = {traceback.format_exc()}")
boo's avatar
boo committed

    def register_atom_notification_callback(self, callback, *args, **kwargs):
        def callback_wrapper(atom: ChainAtomPtr, size, *other):
            try:
                if self.type == ChainTypes.esbocs:
                    cf_atom = CFBlock(atom, self)
                elif self.type == ChainTypes.dag_poa:
                    cf_atom = CFEvent(atom, self)
                else:
                    raise TypeError(f"Invalid Chain type={self.type}")
                callback(cf_atom, size, *args, chain=self, **kwargs)
            except Exception:
boo's avatar
boo committed
                log.error(traceback.format_exc())
boo's avatar
boo committed

        self._origin_chain.addAtomNotify(callback_wrapper, ())


class CFLedger:
    def __init__(self, net: CFNet, ledger: Ledger = None):
        self.net = net
        self._origin_ledger = ledger or self.net.get_ledger()

    def get_tx_ticker(self, datum: CFDatumTX) -> ticker | None:
        return self._origin_ledger.txGetMainTickerAndLedgerRc(datum._origin_sub_datum)[0]
boo's avatar
boo committed

    def get_tx_ledger_rc(self, datum: CFDatumTX) -> ledger_cache_rc | None:
        return self._origin_ledger.txGetMainTickerAndLedgerRc(datum._origin_sub_datum)[1]
boo's avatar
boo committed

boo's avatar
boo committed
    def calc_address_balances(self, address: str) -> dict[ticker, str] | dict:
        res = {}
        chain_addr = ChainAddr.fromStr(address)
        tickers = self._origin_ledger.addrGetTokenTickerAll(chain_addr)
        for ticker in tickers:
            balance = str(self._origin_ledger.calcBalance(chain_addr, ticker))
            res[ticker] = balance
        return res

boo's avatar
boo committed
    def token_auth_signs_pkey_hashes(self, datum: CFDatum) -> list[str]:  # list[hashes]
        """
        Хэши публичных ключей для подписи эмиссии тикера. cellframe-node-cli ledger list coins -net raiden
        """
        pkey_hashes = self._origin_ledger.tokenAuthPkeysHashes(datum.get_sub_datum().ticker)
        return [str(pkey_hash) for pkey_hash in pkey_hashes]

    def token_auth_signs_valid(self, datum: CFDatum) -> int:
        """
        Количество валидных подписей
        """
        return self._origin_ledger.tokenAuthSignsValid(datum.get_sub_datum().ticker) or 0

    def token_auth_signs_total(self, datum: CFDatum) -> int:
        """
        Максимально возможное количество подписей
        """
        return self._origin_ledger.tokenAuthSignsTotal(datum.get_sub_datum().ticker) or 0

boo's avatar
boo committed
    def has_emission(self, emission: CFDatumEmission) -> bool:
        """
        Check Emission in chain
        """
        sub_datum = self._origin_ledger.tokenEmissionFind(HashFast.fromString(emission.hash))
        if sub_datum is None:
            return False
        return True

    def get_emission(self, emission_hash: str) -> CFDatumEmission:
        """
        Get emission from ledger
        """
        return CFDatumEmission(None, self._origin_ledger.tokenEmissionFind(HashFast.fromString(emission_hash)),net=self.net)
       
boo's avatar
boo committed
    def tx_by_hash(self, hash: str) -> CFDatumTX:
        hf = HashFast.fromString(hash)

        tx = CFDatumTX(None, self._origin_ledger.txFindByHash(hf), net=self.net)
boo's avatar
boo committed

        return tx

boo's avatar
boo committed
    def register_ledger_tx_notification_callback(self, callback, *args, **kwargs):
        def callback_wrapper(ledger, tx, *other):
            callback(ledger, tx, *args, net=self.net, **kwargs)

        self._origin_ledger.txAddNotify(callback_wrapper, (args, kwargs))
boo's avatar
boo committed


class CFMempool:
    def __init__(self, chain: CFChain):
        self.chain = chain

    def get_datums(self) -> list[CFDatum]:
        data = Mempool.list(self.chain.net._origin_net, self.chain._origin_chain)
        return [CFDatum(None, datum, net=self.chain.net) for datum in data.values()]

boo's avatar
boo committed
    def get_datum_from_bytes(self, value: bytes) -> CFDatum | None:
        datum = Mempool.datumExtract(value)
        if datum is None:
            return None
            # raise ValueError(f"Incorrect bytes={value} to extract to Datum")
        return CFDatum(None, datum, self.chain.net)

boo's avatar
boo committed
    def valid_signs_table(self):
        pass

    def reason(self):
        pass
boo's avatar
boo committed

Dmitry Puzyrkov's avatar
Dmitry Puzyrkov committed
    def remove(self, datum: CFDatum | CFSubDatum) -> bool:
        return Mempool.remove(self.chain._origin_chain, datum.hash)