from __future__ import annotations from typing import TypeVar, TYPE_CHECKING import json import binascii from datetime import datetime from DAP.Core import logIt from CellFrame.Common import Datum, DatumTx, DatumToken, DatumDecree, DatumAnchor, DatumEmission if TYPE_CHECKING: from .net import CFNet from .consensus import CFBlock, CFEvent from .items import CFItem from .crypto import CFSign from .types import ticker, TSD class CFDatum: # __slots__ = ["_origin_datum", "hash", "type", "created_at", "atom", "size", "version", "net"] def __init__(self, atom: CFBlock | CFEvent | None, datum: Datum, net: CFNet | None = None): self._origin_datum = datum self.hash = str(datum.hash) self.type = datum.getTypeStr() self.version = datum.versionStr self.size = datum.getSize() try: self.created_at = datum.tsCreated except OSError: self.created_at = datetime.fromtimestamp(0) logIt.error(f"Datum [{datum.hash}] has invalid timestamp!") if atom is None and net is None: raise AttributeError("A datum without a parent atom requires a net") self.atom = atom # atom == None - datum in mempool self.net = net def get_sub_datum( self) -> CFDatumTX | CFDatumToken | CFDatumEmission | CFDatumAnchor | CFDatumDecree | CFDatumCustom: from .mappings import CFSubDatumBuilder sub_datum = CFSubDatumBuilder(self.type).build(self) return sub_datum class SubDatum: def __init__(self, parent_datum: CFDatum, sub_datum): self._parent_datum = parent_datum self._origin_sub_datum = sub_datum self.type = "DEFAULT" if sub_datum is None: self.hash = parent_datum.hash self.type = "CORRUPTED" logIt.error(f"Datum type:{parent_datum.type} hash:{parent_datum.hash} is CORRUPTED") @property def net(self) -> 'CFNet': if self._parent_datum.atom is None: # mempool datum return self._parent_datum.net return self._parent_datum.atom.chain.net T = TypeVar('T', bound=CFItem) class CFDatumTX(SubDatum): def __init__(self, parent_datum: CFDatum, sub_datum: DatumTx): super().__init__(parent_datum, sub_datum) if self.type == "CORRUPTED": return self.hash = str(sub_datum.hash) self.created_at = sub_datum.dateCreated ledger = self.net.get_ledger() self.ticker = ledger.get_tx_ticker(self) self.accepted = bool(ticker) @property def items(self) -> list[CFItem]: return self.get_items() def get_items(self, filter_type: type[T] | None = None) -> list[T]: from .mappings import CFItemMapper all_items = [CFItemMapper.build(item, self.net) for item in self._origin_sub_datum.getItems() if item is not None] if filter_type: return [item for item in all_items if isinstance(item, filter_type)] else: return all_items class CFDatumToken(SubDatum): def __init__(self, parent_datum: CFDatum, sub_datum: DatumToken): super().__init__(parent_datum, sub_datum) if self.type == "CORRUPTED": return self.ticker = sub_datum.ticker self.type = sub_datum.typeStr self.data = sub_datum.data try: self.signs = [CFSign(sign, self.net) for sign in sub_datum.signs] except AttributeError as e: self.signs = [] class CFDatumEmission(SubDatum): def __init__(self, parent_datum: CFDatum, sub_datum: DatumEmission): super().__init__(parent_datum, sub_datum) if self.type == "CORRUPTED": return self.hash = str(sub_datum.hash) self.version = sub_datum.version self.type = sub_datum.typeStr self.ticker = sub_datum.ticker self.address = str(sub_datum.addr) # TODO: Math --> CFMath self.value = sub_datum.value.coins if self.type == "TOKEN_EMISSION_TYPE_AUTH": self.data = [CFSign(sign, self.net) for sign in sub_datum.signs] else: self.data = sub_datum.data self.signs = [CFSign(sign, self.net) for sign in sub_datum.signs] self.tsd = {} for tsd_type in TSD: tsd = self._origin_sub_datum.getTSD(tsd_type.value) if tsd is None: continue try: tsd_data = tsd.decode('utf-8') except UnicodeDecodeError: tsd_data = binascii.hexlify(tsd).decode('utf-8') except: logIt.error(f"Incorrect TSD data. Skip TSD with type={tsd_type}") continue if tsd_type == TSD.TYPE_DATA: try: tsd_data = json.loads(tsd_data) except: pass self.tsd[tsd_type.name] = tsd_data class CFDatumDecree(SubDatum): def __init__(self, parent_datum: CFDatum, sub_datum: DatumDecree): super().__init__(parent_datum, sub_datum) if self.type == "CORRUPTED": return self.hash = str(sub_datum.hash) self.created_at = sub_datum.tsCreated self.type = sub_datum.typeStr self.subtype = sub_datum.subtypeStr self.signs = [CFSign(sign, self.net) for sign in sub_datum.signs] class CFDatumAnchor(SubDatum): def __init__(self, parent_datum: CFDatum, sub_datum: DatumAnchor): super().__init__(parent_datum, sub_datum) if self.type == "CORRUPTED": return self.hash = str(sub_datum.hash) self.created_at = sub_datum.created self.signs = [CFSign(sign, self.net) for sign in sub_datum.signs] class CFDatumCustom(SubDatum): """ get data from dataRaw() """ def __init__(self, parent_datum: CFDatum, sub_datum: bytes): super().__init__(parent_datum, sub_datum) if self.type == "CORRUPTED": return self._origin_sub_datum = None self.data = sub_datum.hex()