Newer
Older
from __future__ import annotations
from typing import TypeVar, TYPE_CHECKING
import json
import binascii
from datetime import datetime
from CellFrame.Common import Datum, DatumTx, DatumToken, DatumDecree, DatumAnchor, DatumEmission
if TYPE_CHECKING:
from .net import CFNet
from .consensus import CFBlock, CFEvent
from .items import CFItem, CFTxOut
from .types import ticker, TSD, CFLedgerCacheResponse
# __slots__ = ["_origin_datum", "hash", "type", "created_at", "atom", "size", "version", "net"]
def __init__(self, atom: CFBlock | CFEvent | None, datum: Datum, net: CFNet | None = None):
try:
self.created_at = datum.tsCreated
except OSError:
self.created_at = datetime.fromtimestamp(0)
log.error(f"Datum [{datum.hash}] has invalid timestamp!")
if atom is None and net is None:
raise AttributeError("A datum without a parent atom requires a net")
self.atom = atom # atom == None - datum in mempool
self.net = net
def get_sub_datum(
self) -> CFDatumTX | CFDatumToken | CFDatumEmission | CFDatumAnchor | CFDatumDecree | CFDatumCustom:
from .mappings import CFSubDatumBuilder
sub_datum = CFSubDatumBuilder(self.type).build(self)
return sub_datum
def __repr__(self):
return f"{self.type}:{self.hash}"
sub_datum: DatumTx | DatumToken | DatumEmission | DatumDecree | DatumAnchor | bytes,
net : 'CFNet' | None = None):
if self._parent_datum and self._parent_datum.atom is None: # mempool datum
self._net = self._parent_datum.net
elif self._parent_datum: #from chains
self._net = self._parent_datum.atom.chain.net
else: # some custom created emission with no parent.
self._net = net
if sub_datum is None:
self.hash = parent_datum.hash
self.type = "CORRUPTED"
log.error(f"Datum type:{parent_datum.type} hash:{parent_datum.hash} is CORRUPTED")
return self._net
def __init__(self, parent_datum: CFDatum, sub_datum: DatumTx, net : 'CFNet' | None = None):
super().__init__(parent_datum, sub_datum, net=net)
self.created_at = sub_datum.dateCreated
ledger = self.net.get_ledger()
self.ticker = ledger.get_tx_ticker(self)
self.ledger_rc = ledger.get_tx_ledger_rc(self)
self.accepted = (self.ledger_rc == CFLedgerCacheResponse.DAP_LEDGER_TX_ALREADY_CACHED) or (
self.ledger_rc == CFLedgerCacheResponse.DAP_LEDGER_TX_CHECK_OK)
@property
def items(self) -> list[CFItem]:
return self.get_items()
def get_items(self, filter_type: type[T] | None = None) -> list[T]:
from .mappings import CFItemMapper
all_items = [CFItemMapper.build(item, self.net) for item in self._origin_sub_datum.getItems()
if item is not None]
if filter_type:
return [item for item in all_items if isinstance(item, filter_type)]
else:
return all_items
def __init__(self, parent_datum: CFDatum, sub_datum: DatumToken):
super().__init__(parent_datum, sub_datum)
if self.type == "CORRUPTED":
return
self.ticker = sub_datum.ticker
self.type = sub_datum.typeStr
self.data = sub_datum.data
try:
self.signs = [CFSign(sign, self.net) for sign in sub_datum.signs]
except AttributeError as e:
self.signs = []
def __init__(self, parent_datum: CFDatum, sub_datum: DatumEmission, net:'CFNet' | None = None):
super().__init__(parent_datum, sub_datum, net=net)
self.version = sub_datum.version
self.type = sub_datum.typeStr
self.ticker = sub_datum.ticker
self.address = str(sub_datum.addr)
# TODO: Math --> CFMath
self.value = sub_datum.value
if self.type == "TOKEN_EMISSION_TYPE_AUTH":
self.data = [CFSign(sign, self.net) for sign in sub_datum.signs]
else:
self.data = sub_datum.data
self.tsd = {}
for tsd_type in TSD:
tsd = self._origin_sub_datum.getTSD(tsd_type.value)
if tsd is None:
continue
try:
tsd_data = tsd.decode('utf-8')
except UnicodeDecodeError:
tsd_data = binascii.hexlify(tsd).decode('utf-8')
except:
log.error(f"Incorrect TSD data. Skip TSD with type={tsd_type}")
continue
if tsd_type == TSD.TYPE_DATA:
try:
tsd_data = json.loads(tsd_data)
except:
pass
self.tsd[tsd_type.name] = tsd_data
@property
def signs(self)->list[CFSign]:
return [CFSign(sign, self.net) for sign in self._origin_sub_datum.signs]
def __repr__(self):
return f"{self.type}:{self.hash}"
@property
def valid_sign_hashes(self) -> list[str]:
token_auth_signs_pkey_hashes = self.net.get_ledger().token_auth_signs_pkey_hashes(self._parent_datum)
return [sign.pkey_hash for sign in self.signs if
sign.pkey_hash in token_auth_signs_pkey_hashes]
def add_sign(self, certificate: CFCertificate) -> str:
def __init__(self, parent_datum: CFDatum, sub_datum: DatumDecree):
super().__init__(parent_datum, sub_datum)
if self.type == "CORRUPTED":
return
self.created_at = sub_datum.tsCreated
self.type = sub_datum.typeStr
self.subtype = sub_datum.subtypeStr
self.signs = [CFSign(sign, self.net) for sign in sub_datum.signs]
def __init__(self, parent_datum: CFDatum, sub_datum: DatumAnchor):
super().__init__(parent_datum, sub_datum)
if self.type == "CORRUPTED":
return
self.created_at = sub_datum.created
self.signs = [CFSign(sign, self.net) for sign in sub_datum.signs]