Newer
Older
from typing import Iterator
import traceback
from DAP.Core import logIt
from DAP.Crypto import HashFast
from CellFrame.Network import Net
from CellFrame.Chain import ChainAtomPtr, Ledger, ChainAddr, Mempool
from .consensus import CFEvent, CFBlock
from .datums import CFDatum, CFDatumTX
from .types import CFNetState, ticker
from ..common.types import ChainTypes
from dataclasses import dataclass
class NetFee:
def __init__(self, net: 'CFNet'):
self.net = net
self.tx_fee = str(self.net._origin_net.txFee)
self.tx_fee_addr = str(self.net._origin_net.txFeeAddr)
self.validator_avg_fee = str(self.net._origin_net.validatorAverageFee)
self.validator_max_fee = str(self.net._origin_net.validatorMaxFee)
self.validator_min_fee = str(self.net._origin_net.validatorMinFee)
self.native_ticker = self.net._origin_net.nativeTicker
class CFNet:
def __init__(self, name: str):
self.name = name
self._origin_net = Net.byName(name)
if not self._origin_net:
raise RuntimeError(f"No such net: {name}")
self.main = CFChain(self, 'main')
self.zerochain = CFChain(self, 'zerochain')
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
@property
def id(self) -> int:
return self._origin_net.id.long()
@property
def chains(self) -> list['CFChain']:
return [self.main, self.zerochain]
def get_ledger(self):
return CFLedger(self, self._origin_net.getLedger())
def register_gdbsync_notification_callback(self, callback, *args, **kwargs):
def callback_wrapper(op_code, group, key, value, *other):
callback(self, op_code, group, key, value, *args, net=self, **kwargs)
self._origin_net.addNotify(callback_wrapper, ())
def change_state(self, state: CFNetState):
if state == CFNetState.NET_STATE_OFFLINE:
self._origin_net.stop()
elif state == CFNetState.NET_STATE_ONLINE:
self._origin_net.start()
else:
raise NotImplemented("This state not implemented")
class CFChain:
def __init__(self, net: CFNet, chain_name: str):
self.net = net
self._origin_chain = net._origin_net.getChainByName(chain_name)
if not self._origin_chain:
raise RuntimeError(f"chain with name={chain_name} not found in net with name={net.name}")
self.type = self._origin_chain.getCSName()
self.name = chain_name
def get_atoms(self) -> Iterator[CFBlock | CFEvent]:
iterator = self._origin_chain.createAtomIter(False)
ptr = self._origin_chain.atomIterGetFirst(iterator)
if not ptr:
logIt.message("not ptr")
return []
atom, size = ptr
while atom:
if size <= 0:
atom, size = self._origin_chain.atomIterGetNext(iterator)
continue
if self.type == ChainTypes.esbocs:
yield CFBlock(atom=atom, chain=self)
elif self.type == ChainTypes.dag_poa:
yield CFEvent(atom=atom, chain=self)
else:
raise TypeError(f"Invalid Chain type={self.type}")
atom, size = self._origin_chain.atomIterGetNext(iterator)
def get_datums(self) -> Iterator[CFDatum]:
for atom in self.get_atoms():
for datum in atom.get_datums():
yield datum
def get_transactions(self) -> Iterator[CFDatumTX]:
for datum in self.get_datums():
if datum._origin_datum.isDatumTX():
yield datum.get_sub_datum()
def get_mempool(self) -> 'CFMempool':
return CFMempool(self)
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def register_mempool_notification_callback(self, callback, *args, **kwargs):
def callback_wrapper(op_code, group, key, value, *other):
logIt.message(f"{other=}")
try:
callback(op_code, group, key, value, *args, chain=self, **kwargs)
except Exception:
logIt.error(f"Error = {traceback.format_exc()}")
try:
self._origin_chain.addMempoolNotify(callback_wrapper, ())
except Exception:
logIt.error(f"Error = {traceback.format_exc()}")
def register_atom_notification_callback(self, callback, *args, **kwargs):
def callback_wrapper(atom: ChainAtomPtr, size, *other):
try:
if self.type == ChainTypes.esbocs:
cf_atom = CFBlock(atom, self)
elif self.type == ChainTypes.dag_poa:
cf_atom = CFEvent(atom, self)
else:
raise TypeError(f"Invalid Chain type={self.type}")
callback(cf_atom, size, *args, chain=self, **kwargs)
except Exception:
logIt.error(traceback.format_exc())
self._origin_chain.addAtomNotify(callback_wrapper, ())
class CFLedger:
def __init__(self, net: CFNet, ledger: Ledger = None):
self.net = net
self._origin_ledger = ledger or self.net.get_ledger()
def get_tx_ticker(self, datum: CFDatumTX) -> ticker | None:
return self._origin_ledger.txGetTokenTickerByHash(HashFast.fromString(str(datum.hash)))
def calc_address_balances(self, address: str) -> dict[ticker, str] | dict:
res = {}
chain_addr = ChainAddr.fromStr(address)
tickers = self._origin_ledger.addrGetTokenTickerAll(chain_addr)
for ticker in tickers:
balance = str(self._origin_ledger.calcBalance(chain_addr, ticker))
res[ticker] = balance
return res
def register_ledger_tx_notification_callback(self, callback, *args, **kwargs):
def callback_wrapper(ledger, tx, *other):
callback(ledger, tx, *args, net=self.net, **kwargs)
self._origin_ledger.txAddNotify(callback_wrapper, (args, kwargs))
class CFMempool:
def __init__(self, chain: CFChain):
self.chain = chain
def get_datums(self) -> list[CFDatum]:
data = Mempool.list(self.chain.net._origin_net, self.chain._origin_chain)
logIt.message(f"{data.keys()=}")
return [CFDatum(None, datum, net=self.chain.net) for datum in data.values()]
# def emissionExtract