from __future__ import annotations from typing import TypeVar, TYPE_CHECKING import json import binascii from datetime import datetime from pycfhelpers.logger import log from .crypto import CFCertificate from CellFrame.Common import Datum, DatumTx, DatumToken, DatumDecree, DatumAnchor, DatumEmission if TYPE_CHECKING: from .net import CFNet from .consensus import CFBlock, CFEvent from .items import CFItem, CFTxOut from .crypto import CFSign from .types import ticker, TSD, CFLedgerCacheResponse class CFDatum: # __slots__ = ["_origin_datum", "hash", "type", "created_at", "atom", "size", "version", "net"] def __init__(self, atom: CFBlock | CFEvent | None, datum: Datum, net: CFNet | None = None): self._origin_datum = datum self.hash = str(datum.hash) self.type = datum.getTypeStr() self.version = datum.versionStr self.size = datum.getSize() try: self.created_at = datum.tsCreated except OSError: self.created_at = datetime.fromtimestamp(0) log.error(f"Datum [{datum.hash}] has invalid timestamp!") if atom is None and net is None: raise AttributeError("A datum without a parent atom requires a net") self.atom = atom # atom == None - datum in mempool self.net = net def get_sub_datum( self) -> CFDatumTX | CFDatumToken | CFDatumEmission | CFDatumAnchor | CFDatumDecree | CFDatumCustom: from .mappings import CFSubDatumBuilder sub_datum = CFSubDatumBuilder(self.type).build(self) return sub_datum def serialize(self) -> bytes: return self._origin_datum.raw def __repr__(self): return f"{self.type}:{self.hash}" class CFSubDatum: def __init__(self, parent_datum: CFDatum, sub_datum: DatumTx | DatumToken | DatumEmission | DatumDecree | DatumAnchor | bytes, net : 'CFNet' | None = None): self._parent_datum = parent_datum self._origin_sub_datum = sub_datum self.hash = str(sub_datum.hash) self.type = "DEFAULT" if self._parent_datum and self._parent_datum.atom is None: # mempool datum self._net = self._parent_datum.net elif self._parent_datum: #from chains self._net = self._parent_datum.atom.chain.net else: # some custom created emission with no parent. self._net = net if sub_datum is None: self.hash = parent_datum.hash self.type = "CORRUPTED" log.error(f"Datum type:{parent_datum.type} hash:{parent_datum.hash} is CORRUPTED") @property def net(self) -> 'CFNet': return self._net T = TypeVar('T', bound=CFItem) class CFDatumTX(CFSubDatum): def __init__(self, parent_datum: CFDatum, sub_datum: DatumTx, net : 'CFNet' | None = None): super().__init__(parent_datum, sub_datum, net=net) if self.type == "CORRUPTED": return self.created_at = sub_datum.dateCreated ledger = self.net.get_ledger() self.ticker = ledger.get_tx_ticker(self) self.ledger_rc = ledger.get_tx_ledger_rc(self) self.accepted = (self.ledger_rc == CFLedgerCacheResponse.DAP_LEDGER_TX_ALREADY_CACHED) or ( self.ledger_rc == CFLedgerCacheResponse.DAP_LEDGER_TX_CHECK_OK) @property def items(self) -> list[CFItem]: return self.get_items() def get_items(self, filter_type: type[T] | None = None) -> list[T]: from .mappings import CFItemMapper all_items = [CFItemMapper.build(item, self.net) for item in self._origin_sub_datum.getItems() if item is not None] if filter_type: return [item for item in all_items if isinstance(item, filter_type)] else: return all_items class CFDatumToken(CFSubDatum): def __init__(self, parent_datum: CFDatum, sub_datum: DatumToken): super().__init__(parent_datum, sub_datum) if self.type == "CORRUPTED": return self.ticker = sub_datum.ticker self.type = sub_datum.typeStr self.data = sub_datum.data try: self.signs = [CFSign(sign, self.net) for sign in sub_datum.signs] except AttributeError as e: self.signs = [] class CFDatumEmission(CFSubDatum): def __init__(self, parent_datum: CFDatum, sub_datum: DatumEmission, net:'CFNet' | None = None): super().__init__(parent_datum, sub_datum, net=net) if self.type == "CORRUPTED": return self.version = sub_datum.version self.type = sub_datum.typeStr self.ticker = sub_datum.ticker self.address = str(sub_datum.addr) # TODO: Math --> CFMath self.value = sub_datum.value if self.type == "TOKEN_EMISSION_TYPE_AUTH": self.data = [CFSign(sign, self.net) for sign in sub_datum.signs] else: self.data = sub_datum.data self.tsd = {} for tsd_type in TSD: tsd = self._origin_sub_datum.getTSD(tsd_type.value) if tsd is None: continue try: tsd_data = tsd.decode('utf-8') except UnicodeDecodeError: tsd_data = binascii.hexlify(tsd).decode('utf-8') except: log.error(f"Incorrect TSD data. Skip TSD with type={tsd_type}") continue if tsd_type == TSD.TYPE_DATA: try: tsd_data = json.loads(tsd_data) except: pass self.tsd[tsd_type.name] = tsd_data @property def signs(self)->list[CFSign]: return [CFSign(sign, self.net) for sign in self._origin_sub_datum.signs] def __repr__(self): return f"{self.type}:{self.hash}" @property def valid_sign_hashes(self) -> list[str]: token_auth_signs_pkey_hashes = self.net.get_ledger().token_auth_signs_pkey_hashes(self._parent_datum) return [sign.pkey_hash for sign in self.signs if sign.pkey_hash in token_auth_signs_pkey_hashes] def add_sign(self, certificate: CFCertificate) -> str: self._origin_sub_datum.addSign(certificate._origin_certificate) return self.signs[-1] class CFDatumDecree(CFSubDatum): def __init__(self, parent_datum: CFDatum, sub_datum: DatumDecree): super().__init__(parent_datum, sub_datum) if self.type == "CORRUPTED": return self.created_at = sub_datum.tsCreated self.type = sub_datum.typeStr self.subtype = sub_datum.subtypeStr self.signs = [CFSign(sign, self.net) for sign in sub_datum.signs] class CFDatumAnchor(CFSubDatum): def __init__(self, parent_datum: CFDatum, sub_datum: DatumAnchor): super().__init__(parent_datum, sub_datum) if self.type == "CORRUPTED": return self.created_at = sub_datum.created self.signs = [CFSign(sign, self.net) for sign in sub_datum.signs] class CFDatumCustom(CFSubDatum): """ get data from dataRaw() """ def __init__(self, parent_datum: CFDatum, sub_datum: bytes): super().__init__(parent_datum, sub_datum) if self.type == "CORRUPTED": return self._origin_sub_datum = None self.data = sub_datum.hex()