How I built a production blockchain intelligence engine solo, from nothing, using Apache Airflow, partitioned PostgreSQL, scikit-learn's Isolation Forest, and FastAPI — currently processing 390,000+ real USDC transfers on Base Mainnet.
The problem worth solving
Blockchain data is public. Every transaction ever recorded on Base Mainnet is visible to anyone. But "visible" and "usable" are not the same thing. Raw EVM event logs look like this:
topics: [
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef",
"0x000000000000000000000000b2cc224c1c9fee385f8ad6a55b4d94e92359dc59",
"0x0000000000000000000000006c8c8c8c8c8c8c8c8c8c8c8c8c8c8c8c8c8c8c8c"
]
data: "0x000000000000000000000000000000000000000000000000000000001dcd6500"
That's a USDC transfer. The first topic is the Transfer event signature. The second and third topics are the sender and receiver addresses, each padded to 32 bytes. The data field is the raw amount in integer form, no decimal point, no dollar sign. A human can't read this at a glance. A compliance team can't screen it. A risk model can't score it without significant preprocessing.
OmniSight exists to close that gap, transforming raw EVM logs into structured, queryable, ML-scored intelligence, in real time, continuously, unattended.
The stack, chosen deliberately
Before explaining how it works, here's what it runs on and why each piece was chosen for a reason, not just familiarity.
• Apache Airflow for orchestration — because the pipeline needs to be idempotent, restartable, and observable. A cron job doesn't give you that.
• PostgreSQL with range partitioning for storage — because 390,000+ records growing continuously need a query strategy, not just a table.
• Web3.py for blockchain connectivity — the standard Python library for EVM interaction, with direct JSON-RPC support.
• scikit-learn Isolation Forest + RobustScaler for anomaly detection — because there are no labeled fraud examples to train against, making supervised methods impossible.
• FastAPI for the API gateway — async, fast, production-grade, with built-in OpenAPI documentation.
• OCI (Oracle Cloud Infrastructure) for hosting — a single instance running everything, Airflow, PostgreSQL, FastAPI, and Nginx.
Phase 1: Ingestion — decoding raw EVM logs
Every 120 seconds, an Airflow DAG fires. Its first job is finding the checkpoint, the last block already stored in PostgreSQL:
python
cursor.execute("SELECT MAX(block_number) FROM omnisight.usdc_transfers;")
result = cursor.fetchone()
last_block = result[0] if result and result[0] else GENESIS_BLOCK - 1
This single query makes the pipeline idempotent. It never reprocesses what it has already stored, and it never skips a block. If the pipeline crashes mid-run, the next run picks up exactly where it left off.
From the checkpoint, it fetches the next batch of blocks from Base Mainnet using eth_getLogs, filtered by three criteria: the block range, the USDC contract address, and the Transfer event topic signature:
python
raw_logs = w3.eth.get_logs({
"fromBlock": block_num,
"toBlock": block_num,
"address": w3.to_checksum_address(USDC_CONTRACT_ADDRESS),
"topics": [TRANSFER_EVENT_TOPIC],
})
This returns only USDC Transfer events, nothing else. Each event log then passes through two decoder functions.
The first decodes the 32-byte EVM address topic into a standard 42-character wallet address:
python
def decode_evm_address(topic_bytes) -> str:
hex_str = topic_bytes.hex() if isinstance(topic_bytes, bytes) else str(topic_bytes)
return "0x" + hex_str.replace("0x", "")[-40:].lower()
EVM addresses are 20 bytes but stored as 32-byte topics with 12 bytes of zero padding on the left. Taking the last 40 hex characters strips that padding cleanly.
The second decodes the raw integer amount into a human-readable USDC value:
python
def decode_usdc_amount(data_field) -> tuple:
raw_hex = data_field.hex() if isinstance(data_field, bytes) else str(data_field)
raw_int = int(raw_hex, 16) if raw_hex and raw_hex not in ("0x", "") else 0
return raw_int, raw_int / USDC_DECIMALS # USDC_DECIMALS = 10 ** 6
USDC stores amounts as integers without decimal points. One dollar is stored as 1,000,000. Dividing by 10 ** 6 gives the human-readable amount.
Decoded records are inserted with a conflict clause that makes the operation safe to retry:
python
cursor.execute("""
INSERT INTO omnisight.usdc_transfers
(block_number, transaction_hash, sender_address,
receiver_address, raw_amount, adjusted_amount)
VALUES (%s, %s, %s, %s, %s, %s)
ON CONFLICT (block_number, transaction_hash) DO NOTHING;
""", (block_num, tx_hash, sender, receiver, raw_amount, usd_amount))
The pipeline commits after every block, not after every batch. This means partial progress is preserved on failure, and the next run wastes no time reprocessing completed blocks.
Phase 2: Storage — partitioned PostgreSQL
A single flat table with 390,000+ rows and growing is a performance problem waiting to happen. OmniSight uses PostgreSQL's native range partitioning on block_number:
sql
CREATE TABLE omnisight.usdc_transfers (
transfer_id SERIAL,
block_number BIGINT NOT NULL,
transaction_hash VARCHAR(66) NOT NULL,
sender_address VARCHAR(42) NOT NULL,
receiver_address VARCHAR(42) NOT NULL,
raw_amount NUMERIC(38,0) NOT NULL,
adjusted_amount NUMERIC(18,6) NOT NULL,
ingested_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (block_number, transfer_id)
) PARTITION BY RANGE (block_number);
CREATE TABLE omnisight.usdc_transfers_era_47m
PARTITION OF omnisight.usdc_transfers
FOR VALUES FROM (47000000) TO (48000000);
Each partition covers one million blocks, roughly a few days of Base Mainnet activity. Queries that filter by block range hit only the relevant partition, not the full table. The primary key is composite on (block_number, transfer_id) rather than just transfer_id, which is required for partitioned tables in PostgreSQL and aligns the primary key with the partition key for maximum query efficiency.
Two additional indexes cover the most common query patterns, looking up all transactions sent from or received by a specific wallet:
sql
CREATE INDEX idx_usdc_sender ON omnisight.usdc_transfers (sender_address);
CREATE INDEX idx_usdc_receiver ON omnisight.usdc_transfers (receiver_address);
Phase 3: Anomaly detection — Isolation Forest without labeled data
This is the hardest part of the system to design, and the most interesting.
The standard approach to fraud detection is supervised learning: label known fraudulent transactions as positive examples, train a classifier, deploy it. That approach requires labeled data. On a blockchain dataset, there are no labels. Nobody has handed me a list of confirmed bad actors on Base Mainnet.
Isolation Forest solves this without labels. The intuition: normal wallets, behaving like thousands of others, require many random splits to isolate from the population. Unusual wallets, with extreme transaction patterns, are isolated in far fewer splits because nothing else nearby behaves the same way. The anomaly score is inversely proportional to the average path length needed to isolate a data point.
Before fitting the model, features are scaled with RobustScaler rather than the more common StandardScaler:
python
from sklearn.preprocessing import RobustScaler
from sklearn.ensemble import IsolationForest
from sklearn.pipeline import Pipeline
pipeline = Pipeline([
('scaler', RobustScaler()),
('model', IsolationForest(
contamination=0.05,
random_state=42,
n_estimators=100
))
])
The choice of RobustScaler over StandardScaler is deliberate and important. Wallet transaction volumes on Base Mainnet are extremely skewed. A small number of wallets move millions of dollars while most move very little. StandardScaler uses the mean and standard deviation, both of which get pulled heavily by extreme outliers. RobustScaler uses the median and interquartile range, making it resistant to exactly those outliers, which is critical since the outliers are precisely what the model is supposed to find.
The pipeline wraps both steps so the same transformation is applied consistently at training time and inference time, no risk of accidentally scaling inference data differently from training data.
Features per wallet are aggregated from the raw transfers table:
sql
SELECT
wallet_address,
COUNT(*) as tx_count,
SUM(adjusted_amount) as total_volume,
AVG(adjusted_amount) as avg_tx_size,
MAX(adjusted_amount) as max_tx_size,
STDDEV(adjusted_amount) as volume_stddev,
COUNT(DISTINCT DATE_TRUNC('hour', ingested_at)) as active_hours
FROM omnisight.usdc_transfers
GROUP BY wallet_address;
The model was trained on 9,418 wallet profiles. A wallet scoring below the decision threshold is flagged with an anomaly label, for example SUSPICIOUS_HIGH_VELOCITY_ANOMALY, and a numeric risk score between 0 and 1. Inference runs in under 5 milliseconds per wallet.
Phase 4: Serving — FastAPI with rate limiting
The risk scores are exposed through a production FastAPI gateway with three public endpoints.
The wallet risk endpoint accepts a wallet address and returns its full risk profile:
python
@app.get("/api/v1/public/wallet-risk/{wallet_address}")
async def get_wallet_risk(wallet_address: str):
# Validate address format
# Query risk scores from PostgreSQL
# Return structured risk profile
A real response for a flagged wallet looks like this:
json
{
"wallet_address": "0xb2cc224c1c9fee385f8ad6a55b4d94e92359dc59",
"risk_score": 0.94,
"anomaly_label": "SUSPICIOUS_HIGH_VELOCITY_ANOMALY",
"tx_count": 9216,
"total_volume_usd": 890432.50,
"model_version": "v20260619_0019"
}
The API sits behind Nginx with TLS, rate limiting is enforced at the Nginx layer, and the database connection uses a read-only role with no write permissions, principle of least privilege applied at the database layer.
The numbers that matter
• 390,000+ records ingested from Base Mainnet blocks 47M to 49M
• 9,418 wallet profiles in the current risk model
• 47 seconds average latency from chain event to dashboard update
• 120-second Airflow cadence, 5 blocks per run
• Sub-5ms inference time per wallet risk score
• Zero downtime since deployment — the idempotent checkpoint design means the pipeline recovers from any failure automatically
What this is not
OmniSight flags statistically unusual wallets, not confirmed malicious ones. An anomaly score of 0.94 means this wallet's behavior pattern is far outside the norm for the 9,418 wallets the model was trained on. It does not mean the wallet has committed fraud. This distinction matters enormously in any real compliance or investigative context, and OmnSight's own documentation states it explicitly.
This is also not a replacement for purpose-built compliance tools like Chainalysis or Elliptic. It is a demonstration that the same category of intelligence, real-time blockchain transaction monitoring with ML-based risk scoring, can be built and operated by a single engineer on modest infrastructure, and that the gap between "this requires an enterprise contract" and "this can be built and run" is smaller than the pricing suggests.
The system is live
Everything described in this post is running in production right now.
• Live dashboard: omnisight.ericdiamason.tech
• Public API docs: omnisight.ericdiamason.tech/docs
• Source code: github.com/ericdiamason/omnisight
Score a wallet. Check the live transfers. Read the code. Everything in this post is verifiable.













