Ethereum Indexer: Fetch Transaction Lists by ETH Address

ยท

# Indexer for Ethereum to get transaction list by ETH address  
# https://github.com/Adamant-im/ETH-transactions-storage  
# v2.0  

from os import environ  
from web3 import Web3  
from web3.middleware import geth_poa_middleware  
import psycopg2  
import time  
import sys  
import logging  

# Configuration  
dbname = environ.get("DB_NAME")  
startBlock = environ.get("START_BLOCK") or "1"  
confirmationBlocks = environ.get("CONFIRMATIONS_BLOCK") or "0"  
nodeUrl = environ.get("ETH_URL")  
pollingPeriod = environ.get("PERIOD") or "20"  

if not dbname or not nodeUrl:  
    print("Error: Database name (DB_NAME) and Ethereum node URL (ETH_URL) must be set.")  
    exit(2)  

# Connect to Ethereum node  
if nodeUrl.startswith("http"):  
    web3 = Web3(Web3.HTTPProvider(nodeUrl))  
elif nodeUrl.startswith("ws"):  
    web3 = Web3(Web3.WebsocketProvider(nodeUrl))  
else:  
    web3 = Web3(Web3.IPCProvider(nodeUrl))  

web3.middleware_onion.inject(geth_poa_middleware, layer=0)  

# Initialize logger  
logger = logging.getLogger("eth-sync")  
logger.setLevel(logging.INFO)  
handler = logging.StreamHandler()  
handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s - %(message)s'))  
logger.addHandler(handler)  

# Database connection  
try:  
    conn = psycopg2.connect(dbname)  
    conn.autocommit = True  
    logger.info("Database connected successfully.")  
except Exception as e:  
    logger.error(f"Database connection failed: {e}")  
    exit(1)  

# Sync with Ethereum node  
logger.info("Waiting for Ethereum node sync...")  
while web3.eth.syncing:  
    time.sleep(300)  
logger.info("Node synced! Starting indexing...")  

def process_transactions(blockid, tr):  
    timestamp = web3.eth.getBlock(blockid)['timestamp']  
    for x in range(tr):  
        tx = web3.eth.getTransactionByBlock(blockid, x)  
        status = bool(web3.eth.get_transaction_receipt(tx['hash']).status)  
        txhash = tx['hash'].hex()  
        input_data = tx['input']  

        # Skip non-contract transfers  
        if tx['value'] == 0 and not input_data.startswith('0xa9059cbb'):  
            continue  

        # Handle contract transfers  
        contract_to = input_data[10:-64] if input_data.startswith('0xa9059cbb') else ''  
        contract_value = input_data[74:] if contract_to else ''  

        # Validate contract transfer format  
        if len(contract_to) > 128:  
            logger.info(f"Skipping {txhash} (invalid contract_to length)")  
            contract_to = contract_value = ''  

        # Insert into database  
        cur.execute('''  
            INSERT INTO public.ethtxs  
            (time, txfrom, txto, value, gas, gasprice, block, txhash, contract_to, contract_value, status)  
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)  
        ''', (timestamp, tx['from'], tx['to'], tx['value'],  
              web3.eth.getTransactionReceipt(tx['hash'])['gasUsed'],  
              tx['gasPrice'], blockid, txhash, contract_to, contract_value, status))  

# Main indexing loop  
while True:  
    try:  
        conn = psycopg2.connect(dbname)  
        cur = conn.cursor()  
        maxblockindb = cur.execute('SELECT Max(block) from public.ethtxs').fetchone()[0] or int(startBlock)  
        endblock = web3.eth.blockNumber - int(confirmationBlocks)  

        logger.info(f"Indexing blocks {maxblockindb + 1} to {endblock}")  
        for block in range(maxblockindb + 1, endblock):  
            tx_count = web3.eth.getBlockTransactionCount(block)  
            if tx_count > 0:  
                process_transactions(block, tx_count)  

    except Exception as e:  
        logger.error(f"Indexing error: {e}")  
    finally:  
        cur.close()  
        conn.close()  
        time.sleep(int(pollingPeriod))  

Key Features of This Ethereum Indexer

  1. Efficient Data Retrieval: Fetches transactions for any ETH address via Web3.py.
  2. PostgreSQL Integration: Stores indexed transactions securely.
  3. Smart Contract Support: Handles ERC-20 token transfers via input data parsing.
  4. Resilient Sync: Automatically recovers from node sync issues.

๐Ÿ‘‰ Explore advanced blockchain tools for developers.


FAQ

Q: How do I configure the starting block?
A: Set the START_BLOCK environment variable (default: block 1).

Q: Why are some transactions skipped?
A: Non-contract transfers with value=0 and non-standard input data are excluded.

Q: How often does the indexer poll for new blocks?
A: Adjustable via the PERIOD variable (default: 20 seconds).


Core Keywords


Note: Original code credits to ADAMANT Foundation and contributors.