Skip to content
Snippets Groups Projects
contract.py 5.58 KiB
from web3 import Web3, HTTPProvider
from web3.middleware import geth_poa_middleware
from web3.auto import w3
from eth_account.messages import encode_defunct

from web3.exceptions import TransactionNotFound
from requests.exceptions import ConnectionError
from web3.datastructures import AttributeDict


class ContractProvider:
    def __init__(self, name, 
                        provider_urls, 
                        contract_address, 
                        abi, 
                        commision_wallet=None, 
                        cfnet=None, 
                        chain_name=None, 
                        event_keys_map = None, 
                        native_token_index = None, 
                        pair_token_index = None, 
                        nft_contract = False, 
                        nft_token = None, network_id = None):
        self.name = name
        
        self.net = cfnet
        self.commision_wallet = commision_wallet
        self.contract_address = contract_address
        self.__last_attempt_block = "latest"
        self.abi = abi
        self.urls = provider_urls
        
        self.chain_name = chain_name
        self.event_keys_map = event_keys_map

        self.native_token_index = native_token_index
        self.pair_token_index = pair_token_index
    
        self.__reserves = None
        self.__supply = None
        self.__tokens = None
        self.__token_symbols = None

        self.is_nft = nft_contract
        self.nft_token = nft_token
        self.network_id = network_id

    @property
    def w3(self):
        for url in self.urls:
            try:
                web3 = Web3(HTTPProvider(url))
                if web3.isConnected():
                    web3.middleware_onion.inject(geth_poa_middleware, layer=0)
                    return web3
            except Exception as e:
                #logIt.notice(f"Connection failed for URL {url}: {e}")
                continue
        raise ConnectionError("No valid links provided - failed to connect")

    @property
    def contract(self):
        contract = self.w3.eth.contract(address=self.contract_address, abi=self.abi)
        return contract

    @property
    def functions(self):
        return self.contract.functions


    @property
    def reserves(self):
        if self.is_nft:
            return 0
        return self.functions.getReserves().call()

    @property
    def supply(self):
        if self.is_nft:
            return 0
        return self.functions.totalSupply().call()
    
    @property
    def native_token_ticker(self):
        
        #just nft - native ticker form ERC20 abi
        if self.is_nft:
            self.__token_symbols = [self.contract.functions.symbol().call(), "Unknown"]
            return self.__token_symbols[self.native_token_index]

        if not self.__tokens:
            self.__tokens =  [self.functions.token0().call(), self.functions.token1().call()]
        
        if not self.__token_symbols:
            t0provider = ContractProvider(name = "Token0-lp", provider_urls=self.urls, contract_address=self.__tokens[0], abi = self.abi)
            t1provider = ContractProvider(name = "Token1-lp", provider_urls=self.urls, contract_address=self.__tokens[1], abi = self.abi)
            
            t1 = t0provider.functions.symbol().call()
            t2 = t1provider.functions.symbol().call()

            self.__token_symbols = [t1, t2]
        
        return self.__token_symbols[self.native_token_index]

    @property
    def  pair_token_ticker(self):
        if self.is_nft:
            return "NFT"

        if not self.__tokens:
            self.__tokens =  [self.functions.token0().call(), self.functions.token1().call()]
        
        if not self.__token_symbols:
            t0provider = ContractProvider(name = "Token0-lp", provider_urls=self.urls, contract_address=self.__tokens[0], abi = self.abi)
            t1provider = ContractProvider(name = "Token1-lp", provider_urls=self.urls, contract_address=self.__tokens[1], abi = self.abi)
            
            t1 = t0provider.functions.symbol().call()
            t2 = t1provider.functions.symbol().call()

            self.__token_symbols = [t1, t2]
        
        return self.__token_symbols[self.pair_token_index]
        

    def all_events(self, event_name, **kwargs):
        return self.contract.events[event_name].createFilter(**kwargs).get_all_entries()
    
    def all_events_from_last_try(self, event_name):
        block = self.w3.eth.blockNumber
        events = self.contract.events[event_name].createFilter(fromBlock=self.__last_attempt_block).get_all_entries()
        #logIt.notice("{} events from {} to {} block: {}".format(self.name, self.__last_attempt_block, block, len(events)))
        self.__last_attempt_block = block
        return events
    
    def event_from_tx_logs(self, tx_hash, event_name):
        receipt = self.w3.eth.get_transaction_receipt(tx_hash)
        return self.contract.events[event_name]().processReceipt(receipt)

    def get_transaction(self, txhash):
          tx = self.w3.eth.get_transaction(txhash)
          contract_data = self.contract.decode_function_input(tx.input)[1]
          return tx, contract_data

    def normalized_event_data(self, ev):
        
        if not self.event_keys_map: 
            raise RuntimeError("Can't normilize event data, no mapping")
        
        result = {}
        for key in self.event_keys_map.keys():
            result[key] = ev["args"][self.event_keys_map[key]]
        
        if not "_block" in ev["args"]:
            result["_block"] = self.w3.eth.blockNumber

        else:
            result["_block"] = ev["args"]["_block"]
        return result