A data quality MCP server exposes quality scores and validation results as AI-accessible tools. This allows querying quality state in conversation — “what’s the quality score for sales.orders?” — rather than navigating a dashboard.
The pattern applies to teams using Great Expectations, Elementary, Soda, or internal quality frameworks. The MCP server wraps the quality platform’s API, exposing checks, scores, and issues as tools.
The Server
from mcp.server.fastmcp import FastMCP, Contextfrom pydantic import BaseModel, Fieldfrom enum import Enumimport json
mcp = FastMCP("DataQualityMCP")
class CheckType(str, Enum): COMPLETENESS = "completeness" UNIQUENESS = "uniqueness" FRESHNESS = "freshness" VALIDITY = "validity" CONSISTENCY = "consistency"
class QualityCheckResult(BaseModel): """Result of a single quality check.""" check_name: str check_type: CheckType table_name: str passed: bool score: float = Field(ge=0, le=1, description="Score between 0 and 1") rows_checked: int issues_found: int details: str | None = NoneThe CheckType enum constrains which quality dimensions the AI can check. The QualityCheckResult model ensures consistent output across all checks — the AI always gets the same fields, making comparison and aggregation reliable. See MCP Tool Design Patterns for the reasoning behind structured output.
Core Tools
Run a Quality Check
The targeted tool — run a specific type of check on a specific table:
@mcp.tool()async def run_quality_check( table_name: str, check_type: CheckType, ctx: Context) -> QualityCheckResult: """Run a specific data quality check on a table.
Args: table_name: Table to validate check_type: Type of quality check to run ctx: Context for progress reporting
Returns: Detailed check results """ await ctx.info(f"Running {check_type.value} check on {table_name}") await ctx.report_progress(progress=0.5, total=1.0, message="Scanning table...")
# In real implementation, call your quality framework score = QUALITY_SCORES.get(table_name, {}).get(check_type.value, 0.5) passed = score >= 0.9
await ctx.report_progress(progress=1.0, total=1.0, message="Check complete")
return QualityCheckResult( check_name=f"{check_type.value}_{table_name.replace('.', '_')}", check_type=check_type, table_name=table_name, passed=passed, score=score, rows_checked=100000, issues_found=int((1 - score) * 100000), details=f"{'Passed' if passed else 'Failed'}: {score:.1%} of rows meet {check_type.value} criteria" )Progress reporting via the Context object matters here. Quality checks on large tables can take minutes. Without progress reporting, the user has no idea if the tool is working or stuck.
Quality Score Overview
The quick-look tool — overall health of a table:
@mcp.tool()def get_quality_score(table_name: str) -> str: """Get the overall data quality score for a table.
Args: table_name: Table to get scores for
Returns: Quality scores across all dimensions """ if table_name not in QUALITY_SCORES: return json.dumps({ "error": f"No quality scores found for '{table_name}'", "available_tables": list(QUALITY_SCORES.keys()) }, indent=2)
scores = QUALITY_SCORES[table_name] return json.dumps({ "table": table_name, "overall_score": scores["overall"], "dimensions": { "completeness": scores["completeness"], "uniqueness": scores["uniqueness"], "freshness": scores["freshness"], "validity": scores.get("validity", "not measured") }, "status": "healthy" if scores["overall"] >= 0.9 else "needs attention" }, indent=2)Returning available_tables in the error case is a pattern worth highlighting. When the AI asks for a table that doesn’t exist in the quality system, it gets a list of what does exist — and can either suggest alternatives or ask the user to clarify. Much better than a bare “not found” error.
Quality Issues Summary
The triage tool — “what needs my attention?”
@mcp.tool()def list_quality_issues(min_severity: str = "warning") -> str: """List tables with data quality issues.
Args: min_severity: Minimum severity to include ('warning' or 'critical')
Returns: Tables that need attention """ issues = [] threshold = 0.8 if min_severity == "critical" else 0.9
for table_name, scores in QUALITY_SCORES.items(): if scores["overall"] < threshold: dimensions = {k: v for k, v in scores.items() if k != "overall"} worst_dim = min(dimensions, key=dimensions.get)
issues.append({ "table": table_name, "overall_score": scores["overall"], "severity": "critical" if scores["overall"] < 0.8 else "warning", "primary_issue": worst_dim, "issue_score": dimensions[worst_dim] })
return json.dumps(sorted(issues, key=lambda x: x["overall_score"]), indent=2)Identifying the primary_issue — the worst-scoring dimension — gives the AI something actionable to report. “The customers table has a freshness problem (0.75)” is more useful than “the customers table scored 0.87.”
Full Validation
The comprehensive tool — run everything:
@mcp.tool()async def run_full_validation(table_name: str, ctx: Context) -> str: """Run all quality checks on a table.
Args: table_name: Table to validate ctx: Context for progress reporting
Returns: Complete validation report """ await ctx.info(f"Starting full validation of {table_name}")
results = [] checks = list(CheckType)
for i, check_type in enumerate(checks): await ctx.report_progress( progress=(i + 1) / len(checks), total=1.0, message=f"Running {check_type.value} check..." ) result = await run_quality_check(table_name, check_type, ctx) results.append({ "check": result.check_type.value, "passed": result.passed, "score": result.score })
overall_passed = all(r["passed"] for r in results)
return json.dumps({ "table": table_name, "validation_passed": overall_passed, "checks": results, "summary": f"{'All checks passed' if overall_passed else 'Some checks failed'}" }, indent=2)
if __name__ == "__main__": mcp.run(transport="stdio")What You Can Ask
With this server connected:
- “What’s the data quality score for the orders table?”
- “Run a completeness check on sales.customers”
- “Which tables have quality issues I should investigate?”
- “Run a full validation on the revenue_daily table”
- “What’s the freshness score for our customer data?”
Production Considerations
Check execution vs. cached scores. The example conflates running checks with retrieving stored scores. In production, separate these — get_quality_score reads from your quality platform’s database (fast, cheap), while run_quality_check triggers an actual scan (slow, potentially expensive). Make the distinction clear in tool descriptions so the AI defaults to cached scores and only runs fresh checks when asked.
Threshold configuration. The 0.9/0.8 thresholds for warning/critical are hardcoded in the example. In production, make these configurable per table or per check type. Some tables can tolerate lower freshness scores; others need 100% completeness.
Historical trends. A get_quality_trend tool that shows score history over the last N days reveals degradation patterns. “Freshness has been declining for a week” is the kind of insight that only appears with historical context.
Reference implementations. The Elementary MCP integration demonstrates production patterns for data observability, including test result history, anomaly detection, and alert management. Study it before building your own quality server.