How I Automated Data Contract Violation Triage Using Multi-Agent Orchestration
TL;DR
I built SchemaFlow-AI as a personal proof-of-concept to explore a problem I kept running into while experimenting with lakehouse pipelines: schema drift alerts are easy to generate, but the hard part is deciding what each alert means, which data contract clause applies, and how urgently someone should act before downstream marts break. This experimental project routes schema drift events through a LangGraph-orchestrated multi-agent pipeline backed by FastAPI and a lightweight data contract knowledge index. Each event passes through classifier, severity, contract retrieval, and remediation agents before landing in a structured triage report with SLA hours and agent traces. The full source code lives at https://github.com/aniket-work/SchemaFlow-AI. In my opinion, the most interesting takeaway is not any single agent prompt, but how graph-based orchestration keeps specialist steps composable without turning the codebase into spaghetti. This write-up documents my design choices, the code I wrote along the way, and what I would change if I extended the PoC further.
Introduction
A few months ago I started paying closer attention to what happens after a bronze-to-silver job fails at eight in the morning. From my experience reading incident threads and anonymized pipeline logs, the error message itself is rarely the whole story. A dbt model might complain about a schema mismatch because discount_code disappeared upstream, but the on-call engineer still has to answer harder questions. Is this a column removal or a type change? Does it block revenue dashboards or only a staging table? Which data contract governs the fix, and how many hours do I have before SLA breach?
I wanted to test whether multi-agent orchestration could compress that downstream triage work without pretending to replace human judgment. SchemaFlow-AI is the result of that experiment. It is not production software, not something I deployed at any employer, and not a claim about how data platforms should operate. It is a solo PoC I built to learn LangGraph patterns in a domain that felt concrete and underserved in agent tutorials.
The architecture rhymes with other automation experiments I have tried: specialist agents, a coordinator, a knowledge layer for retrieval, and a thin API for integration. I deliberately chose schema drift and data contract triage rather than chatbots or document summarization because those demos are overrepresented, and because pipeline incidents have crisp inputs and outputs that make evaluation honest.
What pulled me toward this domain specifically was the asymmetry between detection and response. Modern stacks already emit Great Expectations failures, dbt test errors, and Airflow alerts. The gap I noticed was structured triage: turning noisy drift signals into category labels, severity scores, contract citations, and remediation steps that an on-call engineer could accept or override quickly. I wondered whether explicit agent roles could mirror that mental sorting without hiding reasoning inside a black-box model.
I also wanted a use case where mistakes are obvious. If an agent labels a missing daily partition as advisory, tests should scream. That clarity helped me iterate faster than I would have in a vague business insights demo where quality is subjective.
What's This Article About?
This article walks through how I designed and implemented SchemaFlow-AI, a multi-agent system that ingests batches of schema drift events and emits triage reports containing drift category labels, impact severity scores, data contract references, recommended remediation steps, and SLA hour estimates. You will see the LangGraph state machine I used, the specialist agents I wrote, the contract knowledge module that acts as a stand-in for retrieval-augmented generation, and the FastAPI surface that exposes the workflow to a simple dashboard.
I also cover setup and execution steps so you can reproduce the PoC locally, and I share edge cases I discovered while testing, including ambiguous event descriptions that fooled the classifier and severity heuristics that needed tuning. Throughout, I frame conclusions as personal observations from an experimental build rather than authoritative guidance for regulated pipeline environments.
Tech Stack
I kept the stack intentionally small. Complexity in agent systems often hides in dependencies long before it hides in prompts, and I wanted to feel every layer.
- Python 3.11 as the runtime. Strong typing with Pydantic models made drift event batches easy to validate at API boundaries.
- LangGraph for orchestration. I considered a hand-rolled coordinator loop, but LangGraph gave me explicit state transitions and a graph I could diagram without lying.
- FastAPI for HTTP endpoints and to serve a minimal HTML dashboard from the same process.
-
Pydantic v2 for schemas such as
DriftEvent,TriageResult, andBatchTriageReport. - Rich for CLI tables. Terminal output matters in data engineering workflows because many pipeline tools still live partly in terminals and JSON exports.
- Matplotlib for severity and category distribution charts. I wanted at least one visual summary of learned triage statistics, not because the charts are fancy, but because they make batch behavior obvious at a glance.
I skipped a full vector database in this PoC. A lexical contract index was enough to prove the RAG insertion point, and it kept the repository approachable for readers who want to clone and run in ten minutes.
As per my experience shipping small services, I optimize for clone-to-green time in PoCs. Every extra infrastructure component is a reader who bounces at step four of the README. LangGraph plus FastAPI plus a JSON fixture gets people to a colored terminal table quickly. That table is the emotional payoff that convinces someone to read the rest of the design sections.
| Component | Role in SchemaFlow-AI | Why I chose it |
|---|---|---|
| LangGraph | Per-event drift loop | Makes orchestration visible and testable |
| FastAPI | HTTP + static dashboard | Single process dev experience |
| Pydantic | Schema guardrails | Prevents malformed event batches early |
| Rich | CLI tables | Matches on-call friendly output |
| Matplotlib | Drift category charts | Quant summary for README and article |
Why Read It?
If you are experimenting with LangGraph and tired of toy chatbots, schema drift triage offers a disciplined sandbox. Inputs are structured pipeline incident descriptions. Outputs are enums, contract codes, and remediation lists. That makes it easy to tell when an agent misbehaved without hand-waving about subjective quality.
From my perspective, three ideas in this PoC transfer to other domains:
- Specialist agents beat monolithic prompts when each step has different failure modes. Severity scoring and contract lookup should not share one prompt context window.
- Graph orchestration documents operational process better than nested if-statements. Data engineers already think in workflows; LangGraph mirrors that honestly.
- A thin API unlocks multiple clients with one workflow. I used both a Rich CLI and a browser dashboard against the same triage function.
The GitHub repository at https://github.com/aniket-work/SchemaFlow-AI includes tests, sample data, diagrams, and a title animation GIF generated from terminal output plus a dashboard preview. Clone it, run python main.py, and you will see the same ASCII table aesthetic I embedded in the article cover animation.
Let's Design
Before writing code I sketched the lifecycle of a single drift event. An alert fires because bronze landed with unexpected shape, a partition is missing, or freshness slipped. A triage system must answer four questions: what kind of drift is this, how severely does it impact consumers, which data contract clauses apply, and what remediation path unblocks the pipeline fastest. Those questions map cleanly to four agents plus a coordinator.
Coordinator and state
The coordinator owns batch progress. LangGraph state carries the event list, accumulated results, pipeline name, and a numeric index. I chose explicit index-based iteration rather than recursive graph tricks because it made debugging easier when a single event produced surprising output.
Classifier agent
The classifier maps free text into a DriftCategory enum: column added, column removed, type mismatch, null spike, partition gap, freshness breach, row count anomaly, or constraint violation. I used keyword scoring rather than an LLM because I wanted deterministic tests. In a later iteration I would hybridize: rules for high-confidence patterns, LLM fallback for ambiguous notes.
Severity agent
Severity is where pipeline impact actually lives. A missing optional column is not the same as a removed merge key or a freshness SLA breach on a revenue-facing mart. I encoded heuristics such as detecting column removal, eighteen percent null spikes on customer_id, or twenty-six hour freshness gaps against a six hour SLA. The agent emits critical, major, minor, or advisory. This is the step I would be most cautious about deploying without human review, which is why the PoC prints confidence scores and agent traces.
Contract agent
This agent queries a small data contract chunk index. Each chunk includes a code like DC-ORD-001, title, body, and category tag. Search is lexical overlap with a category boost. It is a deliberate stand-in for embedding retrieval, but the interface is stable: pass description and category, receive citation strings.
Remediation agent
Finally, the remediation agent converts category and severity into recommended corrective steps and an SLA hour count. Column removal suggests pausing the silver merge and notifying the producer owner within four hours. Freshness breaches trigger paging the pipeline owner within two hours. The remediation agent is where I encoded operational knowledge that on-call engineers already apply intuitively.
API and clients
FastAPI exposes /api/triage/sample for the bundled dataset and /api/triage for custom payloads. The root route serves a dark-themed dashboard that renders severity cards and a category bar chart after a POST call. I put the frontend in a single HTML file to avoid Node build tooling in a PoC README.
Let's Get Cooking
Here is where I translate the design into code. I split the implementation into schema models, specialist agents, the LangGraph workflow, and the API layer. I wrote each block to be readable in isolation because, in my experience, agent repos rot quickly when everything lives in one thousand-line module.
Data models
I started with Pydantic models because drift events arrive from JSON exports and alerting webhooks in real life, and I wanted validation before any agent touched the data. Strong typing also made FastAPI response models trivial to declare.
class ImpactSeverity(str, Enum):
CRITICAL = "critical"
MAJOR = "major"
MINOR = "minor"
ADVISORY = "advisory"
class DriftCategory(str, Enum):
COLUMN_ADDED = "column_added"
COLUMN_REMOVED = "column_removed"
TYPE_MISMATCH = "type_mismatch"
NULL_SPIKE = "null_spike"
PARTITION_GAP = "partition_gap"
FRESHNESS_BREACH = "freshness_breach"
ROW_COUNT_ANOMALY = "row_count_anomaly"
CONSTRAINT_VIOLATION = "constraint_violation"
class DriftEvent(BaseModel):
event_id: str
pipeline: str
table: str
description: str
detected_at: str
operator_notes: str = ""
class TriageResult(BaseModel):
event_id: str
category: DriftCategory
severity: ImpactSeverity
contract_refs: list[str] = Field(default_factory=list)
remediation_steps: list[str] = Field(default_factory=list)
sla_hours: int
confidence: float = Field(ge=0.0, le=1.0)
agent_trace: list[str] = Field(default_factory=list)
These models became the contract between CLI, API, and agents. Keeping enums strict prevented silent string drift in severity labels when I generated charts later.
Specialist agents
Each agent appends to an agent_trace list so I could explain decisions in the CLI and in future audit logs. The classifier scores keyword hits per category:
def classify_agent(event: DriftEvent, trace: list[str]) -> DriftCategory:
text = f"{event.description} {event.operator_notes}".lower()
best_category = DriftCategory.TYPE_MISMATCH
best_score = 0
for category, keywords in CATEGORY_KEYWORDS.items():
score = sum(1 for kw in keywords if kw in text)
if score > best_score:
best_score = score
best_category = category
trace.append(f"ClassifierAgent: mapped to {best_category.value} (score={best_score})")
return best_category
Severity logic encodes the pipeline intuition I mentioned earlier. For example, upstream column removal becomes critical, while an unapproved new column stays minor. I put it this way because severity mistakes are costlier than category mistakes: they directly influence paging urgency and SLA clocks.
The contract agent wraps the knowledge search:
def contract_agent(category: DriftCategory, event: DriftEvent, trace: list[str]) -> list[str]:
hits = search_contracts(event.description, category=category.value, top_k=2)
refs = [f"{h.code}: {h.title}" for h in hits]
trace.append(f"ContractAgent: retrieved {len(refs)} contract references")
return refs
The remediation agent returns both steps and SLA hours. That pairing is what makes the output feel operational instead of academic.
LangGraph workflow
The graph is small but worth showing because it is the spine of the PoC:
def build_triage_graph() -> Any:
graph = StateGraph(TriageState)
graph.add_node("load", _load_batch)
graph.add_node("process", _process_next)
graph.add_node("summarize", _summarize)
graph.set_entry_point("load")
graph.add_edge("load", "process")
graph.add_conditional_edges(
"process",
_should_continue,
{"process": "process", "summarize": "summarize"},
)
graph.add_edge("summarize", END)
return graph.compile()
Each pass through process triages one drift event and increments the index. When the index reaches the batch length, the graph routes to summarize, which is a hook I left open for aggregate analytics such as average confidence or category histograms.
run_batch_triage computes severity counts and category distributions after the graph finishes. That summary feeds both the Rich CLI table and the Matplotlib chart in src/analytics.py.
FastAPI surface
The API is intentionally thin. It validates payloads, calls run_batch_triage, and returns a BatchTriageReport. The sample endpoint loads data/sample_drift_events.json, which describes eight findings on the orders-bronze-to-silver pipeline: a removed discount_code column, an unapproved promo_tier column, an eighteen point four percent null spike on customer_id, a missing daily partition, a DOUBLE versus DECIMAL type change on order_total, a twenty-six hour freshness breach, a sixty-two percent row count drop, and a new order_status enum value violating constraints.
@app.post("/api/triage/sample", response_model=BatchTriageReport)
def triage_sample() -> BatchTriageReport:
events = sample_events()
return run_batch_triage(events)
I wired CORS permissively because this is a local PoC dashboard, not a public deployment. If I hardened the experiment, authentication and pipeline tenancy would come first.
Batch aggregation logic
After the graph finishes, run_batch_triage aggregates severity counts and category histograms. This function is plain Python on purpose; not every step needs to be a graph node. I kept summarization outside the loop so unit tests can call triage_event directly without invoking LangGraph when I want faster feedback.
def run_batch_triage(events: list[DriftEvent]) -> BatchTriageReport:
graph = build_triage_graph()
final = graph.invoke({
"events": events,
"results": [],
"pipeline": events[0].pipeline,
"index": 0,
})
results: list[TriageResult] = final["results"]
counts = {s: 0 for s in ImpactSeverity}
category_counts: dict[str, int] = {}
for r in results:
counts[r.severity] += 1
category_counts[r.category.value] = category_counts.get(r.category.value, 0) + 1
return BatchTriageReport(
pipeline=events[0].pipeline,
processed=len(results),
critical_count=counts[ImpactSeverity.CRITICAL],
major_count=counts[ImpactSeverity.MAJOR],
minor_count=counts[ImpactSeverity.MINOR],
advisory_count=counts[ImpactSeverity.ADVISORY],
results=results,
summary_stats={
"avg_confidence": round(sum(r.confidence for r in results) / len(results), 2),
"avg_sla_hours": round(sum(r.sla_hours for r in results) / len(results), 1),
"category_distribution": category_counts,
},
)
The summary statistics feed the dashboard bar chart and the README analytics image. When I first omitted category_distribution, the UI felt empty even though per-row triage was correct. Aggregates matter for human scanability.
CLI presentation layer
main.py uses Rich tables because on-call engineers often work from tabular summaries. I print pipeline name, severity counts, and per-event rows with color-coded severity. The CLI became the source of truth for the GIF animation frames, which keeps marketing assets honest.
What surprised me during implementation
When I first ran the batch, partition-related keywords in a row count note caused a false partition classification. I tightened category keyword lists and prioritized null rate patterns. Another surprise: confidence scoring felt too flat when every result returned the same value. I added small boosts when contract hits exist and when severity is critical or major, which better reflected my own certainty when testing.
Let's Setup
Step-by-step details can be found at the repository README: https://github.com/aniket-work/SchemaFlow-AI
Locally, the setup path I used looks like this:
- Clone the repository and create a virtual environment inside the project root.
- Install dependencies from
requirements.txt, which pins LangGraph, FastAPI, Rich, and Matplotlib among others. - Run
python main.pyto triage the sample JSON batch and print the Rich summary table. - Optional: start
uvicorn schemaflow.api.server:app --app-dir src --reloadand open the dashboard at port 8000. - Optional: execute
pytest tests/ -vto confirm critical column removal events and remediation list presence.
I recommend keeping the virtual environment inside the repository folder for PoCs like this so paths to data/sample_drift_events.json remain predictable when you run scripts from different working directories.
Environment variables are not required for the default demo. If you extend the project with embedding-based retrieval or an external LLM provider, add keys through a .env file but do not commit secrets.
Let's Run
Running python main.py against the bundled dataset typically reports eight processed events with three critical and four major findings in my latest run. Critical items included the removed discount_code column blocking dbt merge, the customer_id null spike, and the freshness SLA breach. Average confidence landed around eighty-seven percent with an average SLA window near eleven hours.
The CLI output ends with an ASCII-friendly table that mirrors what I animated in the repository GIF. That was intentional: I wanted the marketing asset and the actual tool output to tell the same story.
For API mode, POST /api/triage/sample returns JSON containing per-event remediation_steps and contract_refs. The dashboard renders severity cards and a horizontal bar chart of category distribution without a separate frontend build step.
To generate analytics assets, run python src/analytics.py. It writes images/triage-stats-chart.png, which I included in the README so GitHub visitors see quantitative behavior immediately.
When I demo this PoC to myself after a break, I use a three-step smoke path: CLI batch, pytest, then dashboard POST. That sequence exercises the graph, validation rules, and HTTP layer without needing external services. If all three pass, I trust the repository state enough to publish diagrams or write about it.
For readers who prefer API exploration with curl, curl -X POST http://localhost:8000/api/triage/sample returns the full BatchTriageReport JSON. I often pipe that output through jq to inspect a single event's remediation_steps array and verify contract references attached correctly.
Edge Cases and Testing Philosophy
I wrote four pytest cases covering batch length, minimum critical detections, non-empty remediation steps and contract references, and a dedicated column removal severity check. Tests encode what I care about in this PoC: removed merge keys must never be downgraded quietly, and every triage result must be actionable.
Ambiguous notes remain a weakness. A type mismatch description that mentions partitions sometimes scores against partition keywords even when the root issue is numeric precision. In production I would route low-confidence classifications to a human queue. Here, I expose agent_trace lists to make that queue possible later.
Freshness parsing only searches for hour counts in operator notes. Timezone-aware SLA math would need a dedicated library. That limitation is acceptable in a demo but worth documenting honestly.
Walking Through a Sample Event
To make the pipeline concrete, consider event E-2026-0621-001 from the sample dataset: upstream producer removed discount_code and the silver merge job failed with schema mismatch. When I feed this batch through the graph, the coordinator hands the item to the classifier, which scores column removal keywords highest and maps the finding to column_removed. The severity agent recognizes removal as blocking merge and labels the item critical. The contract agent retrieves DC-ORD-001: Orders Fact Column Stability because the description mentions removal and the category boost aligns. The remediation agent responds with pausing the silver merge, notifying the producer owner, and backfilling after schema alignment with a four-hour SLA.
That end-to-end path takes milliseconds in my local runs, but the value is not speed. The value is repeatability. Every event in the batch gets the same structured treatment, which means I can diff reports across code versions when I tune heuristics. When I adjusted keyword weights after a false positive on a row count note, I re-ran pytest and the CLI table immediately showed whether critical counts changed. That feedback loop is what I look for in agent PoCs.
Data Contract Knowledge Base Design
I modeled the knowledge layer as a list of ContractChunk dataclass rows rather than jumping straight to ChromaDB. Each chunk stores a code, title, body, and category tag. The search function tokenizes query and body text, counts overlap, and applies a category boost when the classifier label matches. This is intentionally primitive. From my experience building RAG demos, people sometimes spend days on embedding pipelines before validating whether retrieval inputs are structured correctly. Here, the interface is stable: search_contracts(query, category, top_k).
If I swap in embeddings later, agents above the knowledge layer stay unchanged. That separation mattered to me because it mirrors how I would approach a production migration: nail schemas and agent IO first, upgrade retrieval second. The current index includes eight DC-ORD contract excerpts spanning column stability, type immutability, null thresholds, partition completeness, freshness SLA, row volume bounds, and enum constraints. It is not exhaustive, but it is enough to prove citation attachment in reports.
Monolithic Prompt Versus Specialist Agents
Early in the experiment I tried a single prompt that asked for category, severity, citations, and remediation in one JSON blob. It failed in boring ways: severity would be conservative when citations were verbose, and constraint violations were mislabeled as type mismatches when enum notes appeared in the same sentence. Splitting responsibilities into agents eliminated most of that cross-talk. Each agent receives only the fields it needs, writes to agent_trace, and returns a typed value the coordinator merges.
This aligns with how I think about LangGraph more generally. Nodes are functions with narrow contracts. Edges express control flow an on-call engineer could whiteboard. When I revisit the monolithic prompt idea, it will be as a final summarization layer, not as the core reasoning engine.
Dashboard and Visualization Choices
The HTML dashboard uses a dark theme with severity-colored badges because pipeline incident tools I have seen in demos often default to sterile white tables that hide urgency. I wanted critical items to visually pop. The category bar chart is simple horizontal bars rather than a heavy charting library. For this domain, showing that multiple violations clustered in column and freshness categories is more useful than animating trends over time.
Matplotlib generates a static triage-stats-chart.png with severity and category subplots. I used dark backgrounds to match the dashboard and GIF aesthetic. Static charts export cleanly to README, which helps GitHub visitors who never run the code still grasp batch behavior.
Performance and Operational Notes
The sample batch of eight events completes in under a second on my laptop, including graph orchestration overhead. I did not optimize for throughput because drift batches in this PoC are tiny. If batches grew to thousands of historical alerts, I would parallelize per-event triage outside LangGraph or shard by pipeline while keeping the same agent functions.
Memory footprint stays small because events are pydantic objects in a list, not streamed from a warehouse. Logging is print-based in the CLI and trace lists in results. A next step would be structured JSON logs with event IDs for observability, but I skipped that to keep the repository approachable.
Project Layout Rationale
I nested application code under src/schemaflow so imports read naturally in tests and uvicorn targets. main.py lives at the repository root because that is the entry newcomers type first. Sample data sits in data/ rather than inside tests/ so both the API and CLI can load the same fixture without pytest coupling. Images are checked into git because Dev.to and GitHub render raw URLs; binary assets must survive git push without .gitignore accidents, a lesson I learned from prior publishing attempts.
Theory Behind SLA Hours
SLA hours encode how quickly I believe a pipeline owner must show correction before downstream consumers break trust. Critical column removal and freshness breaches map to two or four hours in my heuristics because merge jobs and revenue dashboards cannot wait a business week. Unapproved column additions map to twenty-four hours because they are serious but not always immediate blockers. These numbers are not organizational standards; they are PoC placeholders where I tried to mirror operational urgency I observed in incident retrospectives.
When I tune SLA hours, I treat them like incident response targets. The remediation agent returns integers, not prose intervals, so downstream schedulers can sort and filter mechanically.
Comparing Outputs Across Code Versions
One habit I kept from prior agent experiments is saving JSON reports when iterating heuristics. SchemaFlow-AI supports --json-out on the CLI. Diffing two JSON files after a severity tweak shows exactly which event IDs changed severity or SLA hours. That practice sounds tedious, but it prevented regressions when I fixed the row count false positive. Without structured output, I would rely on eyeballing terminal tables, which does not scale past a handful of events.
Integration Points I Deliberately Left Open
The FastAPI layer returns pydantic-serialized JSON ready for webhooks, Slack workers, or incident ticket renderers I did not build. agent_trace arrays are the extension point for human review queues: a UI could highlight low-confidence rows or events where classifier and severity agents disagreed with a prior human label. The contract agent returns string citations today; tomorrow it could return structured objects with URLs to schema registry entries.
I also left authentication out entirely. If this ever faced real pipeline credentials or tenant data, isolation and role-based access would precede any model upgrade.
Ethics and Boundaries
Automating schema drift triage touches production data reliability. I want to be explicit: this PoC does not pause jobs, does not modify warehouse schemas, and does not connect to live orchestration systems. It sorts text you give it. If someone deployed a derivative system, human review on critical severities would be non-negotiable in my view, and model or rule changes would require traceable versioning.
I also avoided implying employer affiliation. This build lives entirely in my personal GitHub namespace as an experiment.
Future Roadmap I Would Explore
If I continue this PoC, my next steps would be:
- Replace lexical contract search with embeddings stored in Chroma or pgvector, then measure citation precision against a held-out drift event set.
- Add an LLM summarization step that rewrites remediation steps into on-call runbook language while keeping structured JSON underneath.
- Introduce pipeline history so repeat drift patterns escalate severity automatically.
- Export incident ticket drafts with contract footnotes for Jira or ServiceNow.
- Build a feedback endpoint where a human reviewer marks agent mistakes, creating training data for later fine-tuning.
None of those are implemented here. They are the natural extension points I noted while writing the graph skeleton.
What I would not change
If I rebuilt the PoC from scratch, I would keep three decisions exactly as they are: strict pydantic schemas at the boundary, separate specialist modules instead of one prompt file, and agent_trace on every result. Those choices cost almost nothing in lines of code but paid off every time I debugged a surprising severity label.
Reader exercises
If you fork the repository, here are three exercises I found useful while learning:
- Add a new event to
sample_drift_events.jsonwith ambiguous wording and write a test that asserts your expected severity. - Introduce a fifth agent that drafts a plain-language summary paragraph per event without altering structured fields.
- Swap lexical contract search for an embedding model and compare citation precision on ten hand-labeled examples.
Each exercise touches a different production concern: evaluation, human-readable output, and retrieval quality.
Closing Thoughts
SchemaFlow-AI started as a question about downstream pipeline incident labor, not as a pitch for autonomous data platforms. What I ended up with is a compact LangGraph workflow that feels honest about its limits: deterministic agents, explicit traces, readable tests, and an API that could swap underneath without rewriting business logic.
From my experience, the hardest part of multi-agent tutorials is not drawing boxes on a whiteboard. It is choosing a domain where evaluation is grounded. Schema drift events gave me that grounding. When the classifier mistyped a row count issue, tests failed or traces looked wrong immediately. When severity heuristics worked, the CLI table matched my own gut ranking.
If you are building your own orchestration experiments, steal the structure more than the rules. Keep schemas strict, keep agents narrow, keep orchestration visible, and keep humans in the loop for critical outcomes. The repository at https://github.com/aniket-work/SchemaFlow-AI is there if you want to run the same path I did and remix it for a completely different observability domain.
One last reflection: agent hype often focuses on autonomy. This project convinced me that disciplined routing is the more interesting problem. Drift alerts do not need a chatty assistant; they need reliable sorting, traceable reasoning, and fast handoff to humans when stakes are high. LangGraph helped me express that routing story in code without pretending the PoC is more than an experiment.
I will keep iterating on personal agent projects like this because each one teaches a reusable pattern. SchemaFlow-AI's pattern is narrow agents, explicit graph state, honest tests, and dashboards that mirror terminal truth. That combination is what I would carry into the next domain, whatever it ends up being.
Cover animation for readers who prefer visual summaries:
Disclaimer
The views and opinions expressed here are solely my own and do not represent the views, positions, or opinions of my employer or any organization I am affiliated with. The content is based on my personal experience and experimentation and may be incomplete or incorrect. Any errors or misinterpretations are unintentional, and I apologize in advance if any statements are misunderstood or misrepresented.


















