Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
P
pycfhelpers
Manage
Activity
Members
Labels
Plan
Issues
0
Issue boards
Milestones
Wiki
Code
Merge requests
1
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Container Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
cellframe
python-cellframe-modules
pycfhelpers
Commits
ed0cede0
Commit
ed0cede0
authored
1 year ago
by
dpuzyrkov
Browse files
Options
Downloads
Patches
Plain Diff
[+] initial
parent
eb4f3070
No related branches found
Branches containing commit
No related tags found
No related merge requests found
Changes
4
Hide whitespace changes
Inline
Side-by-side
Showing
4 changed files
__init__.py
+0
-0
0 additions, 0 deletions
__init__.py
cellframenet.py
+439
-0
439 additions, 0 deletions
cellframenet.py
contract.py
+154
-0
154 additions, 0 deletions
contract.py
helpers.py
+54
-0
54 additions, 0 deletions
helpers.py
with
647 additions
and
0 deletions
__init__.py
0 → 100644
+
0
−
0
View file @
ed0cede0
This diff is collapsed.
Click to expand it.
cellframenet.py
0 → 100644
+
439
−
0
View file @
ed0cede0
import
CellFrame
from
CellFrame.Network
import
Net
from
CellFrame.Common
import
DatumEmission
from
DAP
import
Crypto
from
DAP.Crypto
import
Cert
,
Sign
from
CellFrame.Chain
import
Mempool
from
CellFrame.Consensus
import
DAG
,
Block
from
CellFrame.Common
import
Datum
,
DatumTx
,
TxOut
,
TxIn
,
TxToken
,
TxSig
,
TxOutCondSubtypeSrvStakeLock
,
TxInCond
from
DAP.Crypto
import
HashFast
from
DAP.Core
import
logIt
from
datetime
import
datetime
import
hashlib
import
json
from
cfhelpers.helpers
import
json_dump
,
find_tx_out
class
TSD
:
TYPE_UNKNOWN
=
0x0000
TYPE_TIMESTAMP
=
0x0001
TYPE_ADDRESS
=
0x0002
TYPE_VALUE
=
0x0003
TYPE_CONTRACT
=
0x0004
TYPE_NET_ID
=
0x0005
TYPE_BLOCK_NUM
=
0x0006
TYPE_TOKEN_SYM
=
0x0007
TYPE_OUTER_TX_HASH
=
0x0008
TYPE_SOURCE
=
0x0009
TYPE_SOURCE_SUBTYPE
=
0x000A
TYPE_DATA
=
0x000B
TYPE_SENDER
=
0x000C
TYPE_TOKEN_ADDRESS
=
0x000D
TYPE_SIGNATURS
=
0x000E
TYPE_UNIQUE_ID
=
0x000F
TYPE_BASE_TX_HASH
=
0x0010
TYPE_EMISSION_CENTER_UID
=
0x0011
TYPE_EMISSION_CENTER_VER
=
0x0012
class
CellframeEmission
:
def
__init__
(
self
,
datum
):
self
.
datum
=
datum
self
.
hash
=
str
(
self
.
datum
.
hash
)
m
=
hashlib
.
sha256
()
addr
=
datum
.
getTSD
(
TSD
.
TYPE_ADDRESS
)
btx
=
datum
.
getTSD
(
TSD
.
TYPE_BASE_TX_HASH
)
otx
=
datum
.
getTSD
(
TSD
.
TYPE_OUTER_TX_HASH
)
src
=
datum
.
getTSD
(
TSD
.
TYPE_SOURCE
)
stp
=
datum
.
getTSD
(
TSD
.
TYPE_SOURCE_SUBTYPE
)
ts
=
datum
.
getTSD
(
TSD
.
TYPE_TIMESTAMP
)
data
=
datum
.
getTSD
(
TSD
.
TYPE_DATA
)
uid
=
datum
.
getTSD
(
TSD
.
TYPE_UNIQUE_ID
)
m
.
update
(
str
(
addr
).
encode
(
"
utf-8
"
))
m
.
update
(
str
(
btx
).
encode
(
"
utf-8
"
))
m
.
update
(
str
(
otx
).
encode
(
"
utf-8
"
))
m
.
update
(
str
(
src
).
encode
(
"
utf-8
"
))
m
.
update
(
str
(
stp
).
encode
(
"
utf-8
"
))
m
.
update
(
str
(
data
).
encode
(
"
utf-8
"
))
m
.
update
(
str
(
ts
).
encode
(
"
utf-8
"
))
m
.
update
(
str
(
uid
).
encode
(
"
utf-8
"
))
m
.
update
(
str
(
data
).
encode
(
"
utf-8
"
))
self
.
uid
=
m
.
hexdigest
()
def
getTSD
(
self
,
type
):
tsd
=
self
.
datum
.
getTSD
(
type
)
if
tsd
:
try
:
return
tsd
.
decode
(
"
utf-8
"
)
except
:
pass
return
None
def
setTSD
(
self
,
type
,
data
):
self
.
datum
.
addTSD
(
type
,
data
)
#if datum not base-tx - exception
class
CellframeBaseTransactionDatum
:
def
__init__
(
self
,
datum
,
net
,
block
=
None
):
self
.
block
=
block
self
.
datum
=
datum
self
.
hash
=
str
(
datum
.
hash
)
self
.
created
=
datum
.
dateCreated
self
.
net
=
net
#base tx : has txToken item
if
not
self
.
tx_token
():
raise
RuntimeError
(
"
Datum {} not base tx
"
.
format
(
self
.
datum
))
self
.
to_address
=
str
(
self
.
tx_out
().
addr
)
self
.
amount
=
self
.
tx_out
().
value
self
.
emission_hash
=
str
(
self
.
tx_token
().
tokenEmissionHash
)
self
.
emission_
=
None
def
tx_out
(
self
):
try
:
return
next
(
filter
(
lambda
x
:
isinstance
(
x
,
TxOut
),
self
.
datum
.
getItems
()))
except
:
return
None
def
tx_in
(
self
):
try
:
return
next
(
filter
(
lambda
x
:
isinstance
(
x
,
TxIn
),
self
.
datum
.
getItems
()))
except
:
return
None
def
tx_token
(
self
):
try
:
return
next
(
filter
(
lambda
x
:
isinstance
(
x
,
TxToken
),
self
.
datum
.
getItems
()))
except
:
return
None
def
tx_sig
(
self
):
try
:
return
next
(
filter
(
lambda
x
:
isinstance
(
x
,
TxSig
),
self
.
datum
.
getItems
()))
except
:
return
None
def
emission
(
self
):
if
not
self
.
emission_
:
tiker
=
str
(
self
.
tx_token
().
ticker
)
hf
=
HashFast
.
fromString
(
str
(
self
.
tx_token
().
tokenEmissionHash
))
ledger
=
self
.
net
.
getLedger
()
ems
=
ledger
.
tokenEmissionFind
(
tiker
,
hf
)
if
not
ems
:
return
None
self
.
emission_
=
CellframeEmission
(
ems
)
return
self
.
emission_
class
CellframeNetwork
:
def
__init__
(
self
,
name
,
chains
,
group_alias
=
None
):
self
.
name
=
name
self
.
net
=
Net
.
byName
(
name
)
self
.
group_alias
=
group_alias
or
name
if
not
self
.
net
:
raise
RuntimeError
(
"
No such net: {}
"
.
format
(
name
))
for
chain
in
chains
:
setattr
(
self
,
chain
,
self
.
net
.
getChainByName
(
chain
))
def
set_mempool_notification_callback
(
self
,
chain
,
callback
):
callback_name
=
"
{}
"
.
format
(
self
.
name
)
logIt
.
notice
(
"
New mempool notifier for {}
"
.
format
(
callback_name
))
def
callback_wraper
(
op_code
,
group
,
key
,
value
,
net_name
):
callback
(
op_code
,
group
,
key
,
value
,
net_name
,
self
,
chain
)
chain
.
addMempoolNotify
(
callback_wraper
,
callback_name
)
def
set_gdbsync_notification_callback
(
self
,
callback
):
callback_name
=
"
{}
"
.
format
(
self
.
name
)
logIt
.
notice
(
"
New gdb notifier for {}
"
.
format
(
callback_name
))
def
callback_wraper
(
op_code
,
group
,
key
,
value
,
net_name
):
callback
(
self
,
op_code
,
group
,
key
,
value
,
net_name
)
self
.
net
.
addNotify
(
callback_wraper
,
self
.
name
)
def
set_atom_notification_callback
(
self
,
chain
,
callback
):
callback_name
=
"
{}
"
.
format
(
self
.
name
)
logIt
.
notice
(
"
New atom notifier for {}
"
.
format
(
callback_name
))
def
callback_wraper
(
atom
,
size
,
callback_name
):
callback
(
atom
,
size
,
callback_name
,
self
,
chain
)
chain
.
addAtomNotify
(
callback_wraper
,
callback_name
)
def
set_ledger_tx_notification_callback
(
self
,
callback
):
ledger
=
self
.
net
.
getLedger
()
def
callback_wrapper
(
ledger
,
tx
,
argv
):
callback
(
ledger
,
tx
,
argv
,
self
)
ledger
.
txAddNotify
(
callback_wrapper
,
self
.
net
)
def
load_cert
(
certname
):
return
Crypto
.
Cert
.
load
(
certname
)
def
extract_emission_from_mempool_nofitication
(
self
,
chain
,
value
):
ems
=
Mempool
.
emissionExtract
(
chain
,
value
)
if
ems
:
return
CellframeEmission
(
ems
)
else
:
return
None
def
create_base_transaction
(
self
,
emission
,
certs
):
return
Mempool
.
baseTxCreate
(
self
.
main
,
emission
.
datum
.
hash
,
self
.
zerochain
,
emission
.
datum
.
value
,
emission
.
datum
.
ticker
,
emission
.
datum
.
addr
,
certs
)
def
get_emission_by_tsd
(
self
,
tsd_dict
):
atom_count
=
self
.
zerochain
.
countAtom
()
atoms
=
self
.
zerochain
.
getAtoms
(
atom_count
,
1
,
True
)
emissions
=
{}
for
atom
in
atoms
:
event
=
DAG
.
fromAtom
(
atom
[
0
],
atom
[
1
])
if
not
event
.
datum
.
isDatumTokenEmission
():
continue
token_emission
=
event
.
datum
.
getDatumTokenEmission
()
if
not
token_emission
:
continue
results
=
[]
for
key
,
value
in
tsd_dict
.
items
():
tsd
=
token_emission
.
getTSD
(
key
)
if
not
tsd
and
value
==
None
:
results
.
append
(
True
)
continue
try
:
if
tsd
and
tsd
.
decode
(
"
utf-8
"
)
==
value
:
results
.
append
(
True
)
continue
except
:
pass
results
.
append
(
False
)
if
all
(
results
):
emissions
[
str
(
event
.
datum
.
hash
)]
=
CellframeEmission
(
token_emission
)
return
emissions
def
get_emission_from_mempool_by_tsd
(
self
,
tsd_dict
):
datums
=
Mempool
.
list
(
self
.
net
,
self
.
zerochain
).
values
()
emissions
=
{}
for
datum
in
datums
:
if
not
datum
.
isDatumTokenEmission
():
continue
token_emission
=
datum
.
getDatumTokenEmission
()
if
not
token_emission
:
continue
results
=
[]
for
key
,
value
in
tsd_dict
.
items
():
tsd
=
token_emission
.
getTSD
(
key
)
if
not
tsd
and
value
==
None
:
results
.
append
(
True
)
continue
try
:
if
tsd
and
tsd
.
decode
(
"
utf-8
"
)
==
value
:
results
.
append
(
True
)
continue
except
:
pass
results
.
append
(
False
)
if
all
(
results
):
emissions
[
str
(
datum
.
hash
)]
=
CellframeEmission
(
token_emission
)
return
emissions
def
base_transactions_from_blocks
(
self
,
emission_hash
=
None
):
iterator
=
self
.
main
.
createAtomItem
(
False
)
ptr
=
self
.
main
.
atomIterGetFirst
(
iterator
)
if
not
ptr
:
logIt
.
error
(
"
Can
'
t iterate over blocks in {}!
"
.
format
(
self
.
name
))
return
[]
aptr
,
size
=
ptr
#iterate over blocks: atom-pointer should not be none, and size shoud be >0
while
aptr
:
if
size
<=
0
:
#skip such blocks
aptr
,
size
=
self
.
main
.
atomIterGetNext
(
iter
)
continue
block
=
Block
.
fromAtom
(
aptr
,
size
)
if
not
block
.
datums
:
aptr
,
size
=
self
.
main
.
atomIterGetNext
(
iterator
)
continue
for
datum
in
block
.
datums
:
if
datum
.
isDatumTX
():
try
:
#if emshash provided - filter items
basedatum
=
CellframeBaseTransactionDatum
(
datum
.
getDatumTX
(),
block
)
if
emission_hash
:
if
basedatum
.
emission_hash
==
emission_hash
:
yield
basedatum
else
:
continue
else
:
yield
basedatum
except
:
continue
aptr
,
size
=
self
.
main
.
atomIterGetNext
(
iterator
)
def
get_transactions_to_wallet_from_blocks
(
self
,
address
):
def
nextBlockDatums
():
iterator
=
self
.
createAtomItem
(
False
)
aptr
,
size
=
self
.
main
.
atomIterGetFirst
(
iterator
)
#iterate over blocks: atom-pointer should not be none, and size shoud be >0
while
aptr
:
if
size
<=
0
:
#skip such blocks
aptr
,
size
=
self
.
main
.
atomIterGetNext
(
iter
)
continue
block
=
Block
.
fromAtom
(
aptr
,
size
)
if
block
.
datums
:
yield
block
,
block
.
datums
aptr
,
size
=
self
.
main
.
atomIterGetNext
(
iterator
)
def
isDatumToAddress
(
datum_with_block
):
try
:
txn_out
=
next
(
filter
(
lambda
x
:
isinstance
(
x
,
TxOut
),
datum_with_block
.
datum
.
getItems
()))
return
str
(
txn_out
.
addr
)
==
address
except
Exception
as
e
:
return
False
transactions_to_wallet
=
[]
class
DatumWithBlock
:
def
__init__
(
self
,
datum
,
block
):
self
.
datum
=
datum
self
.
block
=
block
for
block
,
datums
in
nextBlockDatums
():
tx_datums
=
[
DatumWithBlock
(
datum
.
getDatumTX
(),
block
)
for
datum
in
filter
(
lambda
datum
:
datum
.
isDatumTX
(),
datums
)]
transactions_to_wallet
.
extend
(
list
(
filter
(
isDatumToAddress
,
tx_datums
)))
return
transactions_to_wallet
def
create_emission
(
self
,
wallet
,
token_symbol
,
value
,
tsd
):
addr
=
CellFrame
.
Chain
.
ChainAddr
.
fromStr
(
wallet
)
ems
=
DatumEmission
(
str
(
value
),
token_symbol
,
addr
)
for
key
,
value
in
tsd
.
items
():
if
isinstance
(
value
,
dict
):
ems
.
addTSD
(
key
,
json_dump
(
value
).
encode
(
"
utf-8
"
))
elif
isinstance
(
value
,
list
):
ems
.
addTSD
(
key
,
json_dump
(
value
).
encode
(
"
utf-8
"
))
else
:
ems
.
addTSD
(
key
,
str
(
value
).
encode
(
"
utf-8
"
))
return
CellframeEmission
(
ems
)
def
place_emission
(
self
,
ems
,
chain
):
return
Mempool
.
emissionPlace
(
chain
,
ems
.
datum
)
def
place_datum
(
self
,
datum
,
chain
):
return
Mempool
.
addDatum
(
chain
,
datum
)
def
remove_key_from_mempool
(
self
,
key
,
chain
):
Mempool
.
remove
(
chain
,
key
)
def
mempool_list
(
self
,
chain
):
return
Mempool
.
list
(
self
.
net
,
chain
)
def
mempool_get_emission
(
self
,
key
):
return
Mempool
.
emissionGet
(
self
.
zerochain
,
key
)
def
mempool_proc
(
self
,
hash
,
chain
):
Mempool
.
proc
(
hash
,
chain
)
def
all_tx_from_ledger
(
self
):
res
=
[]
legder
=
self
.
net
.
getLedger
()
count
=
legder
.
count
()
txs
=
legder
.
getTransactions
(
count
,
1
,
False
)
if
not
txs
:
return
[],
legder
return
txs
,
legder
def
all_tx_from_blocks
(
self
):
iterator
=
self
.
main
.
createAtomItem
(
False
)
ptr
=
self
.
main
.
atomIterGetFirst
(
iterator
)
if
not
ptr
:
logIt
.
error
(
"
Can
'
t iterate over blocks in {}!
"
.
format
(
self
.
name
))
return
[]
aptr
,
size
=
ptr
#iterate over blocks: atom-pointer should not be none, and size shoud be >0
while
aptr
:
if
size
<=
0
:
#skip such blocks
aptr
,
size
=
self
.
main
.
atomIterGetNext
(
iter
)
continue
block
=
Block
.
fromAtom
(
aptr
,
size
)
if
not
block
.
datums
:
aptr
,
size
=
self
.
main
.
atomIterGetNext
(
iterator
)
continue
for
datum
in
block
.
datums
:
if
datum
.
isDatumTX
():
yield
datum
.
getDatumTX
()
else
:
continue
aptr
,
size
=
self
.
main
.
atomIterGetNext
(
iterator
)
This diff is collapsed.
Click to expand it.
contract.py
0 → 100644
+
154
−
0
View file @
ed0cede0
from
web3
import
Web3
,
HTTPProvider
from
web3.middleware
import
geth_poa_middleware
from
web3.auto
import
w3
from
eth_account.messages
import
encode_defunct
from
web3.exceptions
import
TransactionNotFound
from
requests.exceptions
import
ConnectionError
from
web3.datastructures
import
AttributeDict
class
ContractProvider
:
def
__init__
(
self
,
name
,
provider_urls
,
contract_address
,
abi
,
commision_wallet
=
None
,
cfnet
=
None
,
chain_name
=
None
,
event_keys_map
=
None
,
native_token_index
=
None
,
pair_token_index
=
None
,
nft_contract
=
False
,
nft_token
=
None
,
network_id
=
None
):
self
.
name
=
name
self
.
net
=
cfnet
self
.
commision_wallet
=
commision_wallet
self
.
contract_address
=
contract_address
self
.
__last_attempt_block
=
"
latest
"
self
.
abi
=
abi
self
.
urls
=
provider_urls
self
.
chain_name
=
chain_name
self
.
event_keys_map
=
event_keys_map
self
.
native_token_index
=
native_token_index
self
.
pair_token_index
=
pair_token_index
self
.
__reserves
=
None
self
.
__supply
=
None
self
.
__tokens
=
None
self
.
__token_symbols
=
None
self
.
is_nft
=
nft_contract
self
.
nft_token
=
nft_token
self
.
network_id
=
network_id
@property
def
w3
(
self
):
for
url
in
self
.
urls
:
web3
=
Web3
(
HTTPProvider
(
url
))
if
web3
.
isConnected
():
web3
.
middleware_onion
.
inject
(
geth_poa_middleware
,
layer
=
0
)
return
web3
raise
ConnectionError
(
"
No valid links provided - failed to connect
"
)
@property
def
contract
(
self
):
contract
=
self
.
w3
.
eth
.
contract
(
address
=
self
.
contract_address
,
abi
=
self
.
abi
)
return
contract
@property
def
functions
(
self
):
return
self
.
contract
.
functions
@property
def
reserves
(
self
):
if
self
.
is_nft
:
return
0
return
self
.
functions
.
getReserves
().
call
()
@property
def
supply
(
self
):
if
self
.
is_nft
:
return
0
return
self
.
functions
.
totalSupply
().
call
()
@property
def
native_token_ticker
(
self
):
#just nft - native ticker form ERC20 abi
if
self
.
is_nft
:
self
.
__token_symbols
=
[
self
.
contract
.
functions
.
symbol
().
call
(),
"
Unknown
"
]
return
self
.
__token_symbols
[
self
.
native_token_index
]
if
not
self
.
__tokens
:
self
.
__tokens
=
[
self
.
functions
.
token0
().
call
(),
self
.
functions
.
token1
().
call
()]
if
not
self
.
__token_symbols
:
t0provider
=
ContractProvider
(
name
=
"
Token0-lp
"
,
provider_urls
=
self
.
urls
,
contract_address
=
self
.
__tokens
[
0
],
abi
=
self
.
abi
)
t1provider
=
ContractProvider
(
name
=
"
Token1-lp
"
,
provider_urls
=
self
.
urls
,
contract_address
=
self
.
__tokens
[
1
],
abi
=
self
.
abi
)
t1
=
t0provider
.
functions
.
symbol
().
call
()
t2
=
t1provider
.
functions
.
symbol
().
call
()
self
.
__token_symbols
=
[
t1
,
t2
]
return
self
.
__token_symbols
[
self
.
native_token_index
]
@property
def
pair_token_ticker
(
self
):
if
self
.
is_nft
:
return
"
NFT
"
if
not
self
.
__tokens
:
self
.
__tokens
=
[
self
.
functions
.
token0
().
call
(),
self
.
functions
.
token1
().
call
()]
if
not
self
.
__token_symbols
:
t0provider
=
ContractProvider
(
name
=
"
Token0-lp
"
,
provider_urls
=
self
.
urls
,
contract_address
=
self
.
__tokens
[
0
],
abi
=
self
.
abi
)
t1provider
=
ContractProvider
(
name
=
"
Token1-lp
"
,
provider_urls
=
self
.
urls
,
contract_address
=
self
.
__tokens
[
1
],
abi
=
self
.
abi
)
t1
=
t0provider
.
functions
.
symbol
().
call
()
t2
=
t1provider
.
functions
.
symbol
().
call
()
self
.
__token_symbols
=
[
t1
,
t2
]
return
self
.
__token_symbols
[
self
.
pair_token_index
]
def
all_events
(
self
,
event_name
,
**
kwargs
):
return
self
.
contract
.
events
[
event_name
].
createFilter
(
**
kwargs
).
get_all_entries
()
def
all_events_from_last_try
(
self
,
event_name
):
block
=
self
.
w3
.
eth
.
blockNumber
events
=
self
.
contract
.
events
[
event_name
].
createFilter
(
fromBlock
=
self
.
__last_attempt_block
).
get_all_entries
()
#logIt.notice("{} events from {} to {} block: {}".format(self.name, self.__last_attempt_block, block, len(events)))
self
.
__last_attempt_block
=
block
return
events
def
event_from_tx_logs
(
self
,
tx_hash
,
event_name
):
receipt
=
self
.
w3
.
eth
.
get_transaction_receipt
(
tx_hash
)
return
self
.
contract
.
events
[
event_name
]().
processReceipt
(
receipt
)
def
get_transaction
(
self
,
txhash
):
tx
=
self
.
w3
.
eth
.
get_transaction
(
txhash
)
contract_data
=
self
.
contract
.
decode_function_input
(
tx
.
input
)[
1
]
return
tx
,
contract_data
def
normalized_event_data
(
self
,
ev
):
if
not
self
.
event_keys_map
:
raise
RuntimeError
(
"
Can
'
t normilize event data, no mapping
"
)
result
=
{}
for
key
in
self
.
event_keys_map
.
keys
():
result
[
key
]
=
ev
[
"
args
"
][
self
.
event_keys_map
[
key
]]
if
not
"
_block
"
in
ev
[
"
args
"
]:
result
[
"
_block
"
]
=
self
.
w3
.
eth
.
blockNumber
else
:
result
[
"
_block
"
]
=
ev
[
"
args
"
][
"
_block
"
]
return
result
This diff is collapsed.
Click to expand it.
helpers.py
0 → 100644
+
54
−
0
View file @
ed0cede0
import
json
def
json_dump
(
data
):
return
json
.
dumps
(
data
)
def
json_load
(
data
):
try
:
return
json
.
loads
(
data
)
except
:
return
None
def
find_tx_out
(
tx
,
out_type
):
try
:
return
next
(
filter
(
lambda
x
:
isinstance
(
x
,
out_type
),
tx
.
getItems
()))
except
:
return
None
def
get_tx_items
(
tx
,
out_type
):
try
:
return
list
(
filter
(
lambda
x
:
isinstance
(
x
,
out_type
),
tx
.
getItems
()))
except
:
return
None
def
get_tsd_data
(
tsd_list
,
type_tsd
):
try
:
return
next
(
filter
(
lambda
x
:
x
.
type
==
type_tsd
,
tsd_list
)).
data
except
:
return
None
def
get_tx_outs
(
tx
,
out_type
):
try
:
return
list
(
filter
(
lambda
x
:
isinstance
(
x
,
out_type
),
tx
.
getItems
()))
except
:
return
None
class
ListCache
:
def
__init__
(
self
,
capacity
):
self
.
capacity
=
capacity
self
.
data
=
[]
def
add
(
self
,
data
):
self
.
data
.
append
(
data
)
if
len
(
self
.
data
)
>
self
.
capacity
:
self
.
data
.
pop
(
0
)
def
data
(
self
):
return
self
.
data
def
net_by_name
(
NETS
,
netname
):
for
net
in
NETS
:
if
net
.
name
==
netname
:
return
net
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment