ServicesAboutNotesContact Get in touch →
EN FR
Note

MCP Data Quality Server Pattern

A practical MCP server pattern for data quality — running validation checks, retrieving quality scores, and surfacing tables that need attention.

Planted
mcpdata engineeringdata quality

A data quality MCP server exposes quality scores and validation results as AI-accessible tools. This allows querying quality state in conversation — “what’s the quality score for sales.orders?” — rather than navigating a dashboard.

The pattern applies to teams using Great Expectations, Elementary, Soda, or internal quality frameworks. The MCP server wraps the quality platform’s API, exposing checks, scores, and issues as tools.

The Server

from mcp.server.fastmcp import FastMCP, Context
from pydantic import BaseModel, Field
from enum import Enum
import json
mcp = FastMCP("DataQualityMCP")
class CheckType(str, Enum):
COMPLETENESS = "completeness"
UNIQUENESS = "uniqueness"
FRESHNESS = "freshness"
VALIDITY = "validity"
CONSISTENCY = "consistency"
class QualityCheckResult(BaseModel):
"""Result of a single quality check."""
check_name: str
check_type: CheckType
table_name: str
passed: bool
score: float = Field(ge=0, le=1, description="Score between 0 and 1")
rows_checked: int
issues_found: int
details: str | None = None

The CheckType enum constrains which quality dimensions the AI can check. The QualityCheckResult model ensures consistent output across all checks — the AI always gets the same fields, making comparison and aggregation reliable. See MCP Tool Design Patterns for the reasoning behind structured output.

Core Tools

Run a Quality Check

The targeted tool — run a specific type of check on a specific table:

@mcp.tool()
async def run_quality_check(
table_name: str,
check_type: CheckType,
ctx: Context
) -> QualityCheckResult:
"""Run a specific data quality check on a table.
Args:
table_name: Table to validate
check_type: Type of quality check to run
ctx: Context for progress reporting
Returns:
Detailed check results
"""
await ctx.info(f"Running {check_type.value} check on {table_name}")
await ctx.report_progress(progress=0.5, total=1.0, message="Scanning table...")
# In real implementation, call your quality framework
score = QUALITY_SCORES.get(table_name, {}).get(check_type.value, 0.5)
passed = score >= 0.9
await ctx.report_progress(progress=1.0, total=1.0, message="Check complete")
return QualityCheckResult(
check_name=f"{check_type.value}_{table_name.replace('.', '_')}",
check_type=check_type,
table_name=table_name,
passed=passed,
score=score,
rows_checked=100000,
issues_found=int((1 - score) * 100000),
details=f"{'Passed' if passed else 'Failed'}: {score:.1%} of rows meet {check_type.value} criteria"
)

Progress reporting via the Context object matters here. Quality checks on large tables can take minutes. Without progress reporting, the user has no idea if the tool is working or stuck.

Quality Score Overview

The quick-look tool — overall health of a table:

@mcp.tool()
def get_quality_score(table_name: str) -> str:
"""Get the overall data quality score for a table.
Args:
table_name: Table to get scores for
Returns:
Quality scores across all dimensions
"""
if table_name not in QUALITY_SCORES:
return json.dumps({
"error": f"No quality scores found for '{table_name}'",
"available_tables": list(QUALITY_SCORES.keys())
}, indent=2)
scores = QUALITY_SCORES[table_name]
return json.dumps({
"table": table_name,
"overall_score": scores["overall"],
"dimensions": {
"completeness": scores["completeness"],
"uniqueness": scores["uniqueness"],
"freshness": scores["freshness"],
"validity": scores.get("validity", "not measured")
},
"status": "healthy" if scores["overall"] >= 0.9 else "needs attention"
}, indent=2)

Returning available_tables in the error case is a pattern worth highlighting. When the AI asks for a table that doesn’t exist in the quality system, it gets a list of what does exist — and can either suggest alternatives or ask the user to clarify. Much better than a bare “not found” error.

Quality Issues Summary

The triage tool — “what needs my attention?”

@mcp.tool()
def list_quality_issues(min_severity: str = "warning") -> str:
"""List tables with data quality issues.
Args:
min_severity: Minimum severity to include ('warning' or 'critical')
Returns:
Tables that need attention
"""
issues = []
threshold = 0.8 if min_severity == "critical" else 0.9
for table_name, scores in QUALITY_SCORES.items():
if scores["overall"] < threshold:
dimensions = {k: v for k, v in scores.items() if k != "overall"}
worst_dim = min(dimensions, key=dimensions.get)
issues.append({
"table": table_name,
"overall_score": scores["overall"],
"severity": "critical" if scores["overall"] < 0.8 else "warning",
"primary_issue": worst_dim,
"issue_score": dimensions[worst_dim]
})
return json.dumps(sorted(issues, key=lambda x: x["overall_score"]), indent=2)

Identifying the primary_issue — the worst-scoring dimension — gives the AI something actionable to report. “The customers table has a freshness problem (0.75)” is more useful than “the customers table scored 0.87.”

Full Validation

The comprehensive tool — run everything:

@mcp.tool()
async def run_full_validation(table_name: str, ctx: Context) -> str:
"""Run all quality checks on a table.
Args:
table_name: Table to validate
ctx: Context for progress reporting
Returns:
Complete validation report
"""
await ctx.info(f"Starting full validation of {table_name}")
results = []
checks = list(CheckType)
for i, check_type in enumerate(checks):
await ctx.report_progress(
progress=(i + 1) / len(checks),
total=1.0,
message=f"Running {check_type.value} check..."
)
result = await run_quality_check(table_name, check_type, ctx)
results.append({
"check": result.check_type.value,
"passed": result.passed,
"score": result.score
})
overall_passed = all(r["passed"] for r in results)
return json.dumps({
"table": table_name,
"validation_passed": overall_passed,
"checks": results,
"summary": f"{'All checks passed' if overall_passed else 'Some checks failed'}"
}, indent=2)
if __name__ == "__main__":
mcp.run(transport="stdio")

What You Can Ask

With this server connected:

  • “What’s the data quality score for the orders table?”
  • “Run a completeness check on sales.customers”
  • “Which tables have quality issues I should investigate?”
  • “Run a full validation on the revenue_daily table”
  • “What’s the freshness score for our customer data?”

Production Considerations

Check execution vs. cached scores. The example conflates running checks with retrieving stored scores. In production, separate these — get_quality_score reads from your quality platform’s database (fast, cheap), while run_quality_check triggers an actual scan (slow, potentially expensive). Make the distinction clear in tool descriptions so the AI defaults to cached scores and only runs fresh checks when asked.

Threshold configuration. The 0.9/0.8 thresholds for warning/critical are hardcoded in the example. In production, make these configurable per table or per check type. Some tables can tolerate lower freshness scores; others need 100% completeness.

Historical trends. A get_quality_trend tool that shows score history over the last N days reveals degradation patterns. “Freshness has been declining for a week” is the kind of insight that only appears with historical context.

Reference implementations. The Elementary MCP integration demonstrates production patterns for data observability, including test result history, anomaly detection, and alert management. Study it before building your own quality server.