Adrienne Vermorel

Building Custom MCP Servers for Data Engineering

When to Build Custom

Before writing code, check if someone else already solved your problem.

With over 5,800 community servers and official integrations for BigQuery, Snowflake, dbt, and dozens of other data tools, you can often skip the build entirely. Browse the MCP Registry, awesome-mcp-servers, and vendor documentation first.

You’ll need a custom server when your organization uses proprietary internal systems, things like homegrown data catalogs, custom orchestrators, or internal APIs that no public server will ever support. The same applies when you want to combine multiple data sources into a unified interface, or when security requirements mandate self-hosted solutions with specific credential handling.

In data engineering, the most common use cases are internal data catalogs (searching metadata, exposing lineage), pipeline monitoring (checking job status across Airflow, Dagster, or whatever scheduler you’re running), data quality platforms (running checks, retrieving scores from Great Expectations or Elementary), and cost management tools.

A basic MCP server takes about 50 lines of Python. The SDKs handle protocol complexity so you can focus on your actual business logic.

SDK Options

MCP has official SDKs for multiple languages. For data engineering, Python and TypeScript are the practical choices.

Python SDK

If you’re already working in Python (and most data engineers are), this is the obvious choice.

Installation with uv (recommended):

Terminal window
uv add "mcp[cli]"

Or with pip:

Terminal window
pip install "mcp[cli]"

The [cli] extra includes the command-line tools for testing and installation.

The SDK includes FastMCP, a high-level framework that uses decorators to define tools, resources, and prompts. It handles JSON-RPC serialization, transport management, and protocol negotiation automatically.

TypeScript SDK

For teams with Node.js infrastructure, or frontend developers branching into data tooling:

Installation:

Terminal window
npm install @modelcontextprotocol/server zod

The TypeScript SDK uses Zod for schema validation, which gives you solid type safety and IDE support.

Which SDK to Choose?

ConsiderationPythonTypeScript
Existing codebaseData pipelines, notebooks, dbtWeb apps, Node services
Librariespandas, SQLAlchemy, boto3Better for async I/O heavy workloads
Team expertiseData engineersFull-stack developers
DeploymentWorks with uvx, easy to distributenpm packages, containerized

For most data engineering work, Python with FastMCP is the fastest path to a working server.

Basic Server Skeleton

Here’s a minimal MCP server in both Python and TypeScript. Each exposes a single tool you can test immediately.

Python FastMCP Example

Create a file named server.py:

from mcp.server.fastmcp import FastMCP
# Initialize the server with a name
mcp = FastMCP("DataEngineering")
@mcp.tool()
def query_database(query: str, database: str = "production") -> str:
"""Execute a SQL query against the specified database.
Args:
query: The SQL query to execute
database: Target database name (default: production)
Returns:
Query results as formatted text
"""
# In a real implementation, you would connect to your database here
return f"Query executed on {database}: {query}"
if __name__ == "__main__":
mcp.run(transport="stdio")

The @mcp.tool() decorator registers the function as an MCP tool. FastMCP extracts the function signature to build the JSON Schema, uses the docstring as the tool description, and handles all the serialization and protocol communication.

Run the server:

Terminal window
uv run server.py

TypeScript McpServer Example

Create server.ts:

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
const server = new McpServer({
name: "data-engineering-server",
version: "1.0.0",
});
server.registerTool(
"query_database",
{
description: "Execute a SQL query against the specified database",
inputSchema: {
query: z.string().describe("The SQL query to execute"),
database: z.string().default("production").describe("Target database name"),
},
},
async ({ query, database }) => {
// In a real implementation, connect to your database here
return {
content: [{ type: "text", text: `Query executed on ${database}: ${query}` }],
};
}
);
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
}
main();

Compile and run:

Terminal window
npx tsc server.ts
node server.js

Running with stdio Transport

Both examples use stdio transport, communicating over standard input/output streams. This is the default for local development. No network configuration required, the client spawns the server as a subprocess, and communication happens through pipes rather than sockets. It’s ideal for desktop applications like Claude Desktop or Cursor.

Defining Tools

Tools are how MCP servers expose functionality. A good tool has a clear description, typed parameters, and predictable output.

Docstrings as Descriptions

FastMCP extracts tool descriptions directly from Python docstrings:

@mcp.tool()
def get_table_schema(table_name: str) -> str:
"""Get the schema definition for a database table.
Returns column names, data types, and constraints for the specified table.
Useful for understanding table structure before writing queries.
Args:
table_name: Fully qualified table name (e.g., 'analytics.orders')
Returns:
Schema definition as formatted text
"""
# Implementation here
return f"Schema for {table_name}: id INT PRIMARY KEY, name VARCHAR(255)..."

The AI sees this description when deciding which tool to call. Write descriptions that help the model understand when and why to use the tool, not just what it does.

Pydantic Models for Structured Output

For complex return values, use Pydantic models to ensure consistent structure:

from pydantic import BaseModel, Field
class ValidationResult(BaseModel):
"""Result of a data quality validation check."""
table_name: str
row_count: int = Field(description="Total rows examined")
null_count: int = Field(description="Number of null values found")
duplicate_count: int = Field(description="Number of duplicate rows")
is_valid: bool = Field(description="Whether the table passed validation")
@mcp.tool()
def run_data_quality_check(table_name: str) -> ValidationResult:
"""Run comprehensive data quality checks on a table.
Validates completeness, uniqueness, and data integrity.
Args:
table_name: The table to validate
Returns:
Detailed validation results
"""
# Run actual checks against your database
return ValidationResult(
table_name=table_name,
row_count=10000,
null_count=5,
duplicate_count=0,
is_valid=True
)

FastMCP serializes Pydantic models to JSON automatically, and the Field descriptions enrich the output schema.

Input Validation with Schemas

Type hints define the input schema. Use Python’s type system and Pydantic for validation:

from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class Environment(str, Enum):
PRODUCTION = "production"
STAGING = "staging"
DEVELOPMENT = "development"
class QueryParams(BaseModel):
"""Parameters for database queries."""
limit: int = Field(default=100, ge=1, le=10000, description="Maximum rows to return")
timeout_seconds: int = Field(default=30, ge=1, le=300, description="Query timeout")
@mcp.tool()
def run_query(
query: str,
environment: Environment = Environment.PRODUCTION,
params: Optional[QueryParams] = None
) -> str:
"""Execute a read-only SQL query.
Args:
query: SQL SELECT statement to execute
environment: Target environment
params: Optional query parameters
Returns:
Query results as JSON
"""
effective_params = params or QueryParams()
# Execute with validated parameters
return f"Results from {environment.value} (limit: {effective_params.limit})"

Enums create dropdown-style options in tool UIs. Pydantic’s Field constraints prevent invalid inputs before they reach your code.

Advanced Patterns

Beyond basic tools, MCP servers can expose resources, prompts, and progress reporting.

Progress Reporting with Context

Long-running operations should report progress so the AI can inform users:

from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("DataEngineering")
@mcp.tool()
async def process_large_dataset(dataset_id: str, ctx: Context) -> str:
"""Process a large dataset with progress updates.
Args:
dataset_id: Identifier of the dataset to process
ctx: MCP context for progress reporting (injected automatically)
Returns:
Processing summary
"""
await ctx.info(f"Starting processing of {dataset_id}")
total_batches = 10
for i in range(total_batches):
# Simulate batch processing
await ctx.report_progress(
progress=(i + 1) / total_batches,
total=1.0,
message=f"Processing batch {i + 1}/{total_batches}"
)
await ctx.info("Processing complete")
return f"Processed dataset {dataset_id}: 10 batches, 100,000 rows"

The Context object is injected automatically when you include it as a parameter. Use ctx.info(), ctx.warning(), and ctx.error() for logging, and ctx.report_progress() for progress bars.

Resources for Data Exposure

Resources expose data that the AI can read, similar to a GET endpoint. Unlike tools, resources are application-controlled and don’t have side effects:

@mcp.resource("schema://{table_name}")
def get_table_schema(table_name: str) -> str:
"""Expose table schema as a readable resource.
URI pattern: schema://analytics.orders
"""
# Fetch from your metadata store
return f"""
Table: {table_name}
Columns:
- id: INT (Primary Key)
- customer_id: INT (Foreign Key)
- total_amount: DECIMAL(10,2)
- created_at: TIMESTAMP
"""
@mcp.resource("config://pipelines/{pipeline_id}")
def get_pipeline_config(pipeline_id: str) -> str:
"""Expose pipeline configuration."""
return f"Pipeline {pipeline_id} configuration: schedule=daily, owner=data-team"

Resources use URI templates with {variable} placeholders. The AI can browse available resources and read them without executing actions.

Prompts for Reusable Templates

Prompts are user-controlled templates that guide AI interactions:

@mcp.prompt(title="Data Quality Report")
def data_quality_prompt(table_name: str) -> str:
"""Generate a data quality analysis prompt.
Args:
table_name: Table to analyze
"""
return f"""Analyze the data quality of the '{table_name}' table.
Please check:
1. Completeness: What percentage of required fields are populated?
2. Uniqueness: Are there duplicate records?
3. Freshness: When was the data last updated?
4. Validity: Do values conform to expected formats and ranges?
Provide a summary with specific recommendations for improvement."""
@mcp.prompt(title="Schema Review")
def schema_review_prompt(table_name: str, changes: str) -> str:
"""Generate a schema change review prompt."""
return f"""Review the proposed schema changes for {table_name}:
{changes}
Evaluate:
- Backward compatibility with existing queries
- Impact on downstream dependencies
- Performance implications
- Data migration requirements"""

Prompts appear in the client UI and help users invoke the AI with structured requests they can reuse.

Transport Configuration

MCP supports two primary transports, and your choice depends on how you plan to deploy.

stdio for Local Development

Standard I/O transport runs the server as a subprocess:

if __name__ == "__main__":
mcp.run(transport="stdio")

Configure in Claude Desktop (claude_desktop_config.json):

{
"mcpServers": {
"my-data-server": {
"command": "uv",
"args": ["run", "/path/to/server.py"],
"env": {
"DATABASE_URL": "postgresql://localhost/analytics"
}
}
}
}

Or in Claude Code:

Terminal window
claude mcp add my-data-server -- uv run /path/to/server.py

stdio means zero network configuration, credentials passed via environment variables, process isolation (each client gets its own server instance), and simple debugging with logs going to stderr.

Streamable HTTP for Production Deployment

For remote servers or shared deployments, use HTTP transport:

mcp = FastMCP("RemoteDataServer", stateless_http=True)
if __name__ == "__main__":
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)

Configure client to connect:

{
"mcpServers": {
"remote-data-server": {
"type": "http",
"url": "https://mcp.yourcompany.com",
"headers": {
"Authorization": "Bearer ${MCP_API_KEY}"
}
}
}
}

HTTP requires authentication (OAuth 2.1 for production), but it can serve multiple clients from one instance and deploy behind load balancers. It also supports Server-Sent Events for streaming responses.

Start with stdio for development and internal tools. Move to HTTP when you need centralized deployment or cross-team access.

Testing and Debugging

Testing MCP servers requires tools that speak the protocol. The MCP Inspector is indispensable here.

MCP Inspector

The Inspector is an interactive testing UI that connects to your server:

Terminal window
# Test a Python server
npx @modelcontextprotocol/inspector uv run server.py
# Test a Node.js server
npx @modelcontextprotocol/inspector node build/index.js
# Connect to a remote HTTP server
npx @modelcontextprotocol/inspector --connect https://mcp.yourcompany.com

The Inspector launches a web UI where you can view all registered tools, resources, and prompts, call tools with custom arguments, inspect JSON-RPC messages, and debug response formatting.

Logging to stderr

The most common mistake when building MCP servers is printing to stdout. stdio transport uses stdout for JSON-RPC messages. Any other output on stdout corrupts the protocol and breaks communication.

# BAD - breaks JSON-RPC communication
print("Debug: processing query") # Goes to stdout!
print(f"Error: {e}") # Also stdout!
# GOOD - use logging to stderr
import logging
import sys
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr # Explicitly send to stderr
)
logger = logging.getLogger(__name__)
logger.info("Debug: processing query") # Goes to stderr
logger.error(f"Error: {e}") # Also stderr

Or use the Context object inside tools:

@mcp.tool()
async def my_tool(query: str, ctx: Context) -> str:
await ctx.info("Processing query") # Proper MCP logging
# ...

If your server works in isolation but fails when connected to a client, check for stray print() statements first. This will bite you at least once.

Testing Workflow

  1. Unit test your business logic separately from MCP:
test_logic.py
def test_query_execution():
result = execute_query("SELECT 1", "test_db")
assert "1" in result
  1. Test the MCP server with Inspector:
Terminal window
npx @modelcontextprotocol/inspector uv run server.py
  1. Integration test with a real client (Claude Desktop or Claude Code):
Terminal window
# Add to Claude Code
claude mcp add test-server -- uv run server.py
# Test interactively
claude
> List the tools available from test-server

Data Engineering Examples

Three practical MCP servers that address common data engineering needs.

Data Catalog Server

This server exposes your internal data catalog for AI-assisted discovery:

from mcp.server.fastmcp import FastMCP
import json
mcp = FastMCP("DataCatalogMCP")
# Simulated catalog data - replace with your actual catalog API
CATALOG = {
"sales.orders": {
"description": "Order transactions from all channels",
"columns": ["order_id", "customer_id", "total_amount", "created_at"],
"owner": "sales-team",
"tags": ["pii", "financial"],
"upstream": ["raw.shopify_orders", "raw.pos_transactions"],
"downstream": ["analytics.revenue_daily", "ml.churn_features"]
},
"sales.customers": {
"description": "Customer master data with demographics",
"columns": ["customer_id", "email", "segment", "lifetime_value"],
"owner": "marketing-team",
"tags": ["pii"],
"upstream": ["raw.crm_contacts"],
"downstream": ["sales.orders", "analytics.cohorts"]
}
}
@mcp.tool()
def search_tables(
query: str,
tags: list[str] | None = None,
limit: int = 10
) -> str:
"""Search for tables in the data catalog.
Args:
query: Search term to match against table names and descriptions
tags: Optional list of tags to filter by (e.g., ['pii', 'financial'])
limit: Maximum number of results to return
Returns:
Matching tables with descriptions
"""
results = []
for table_name, metadata in CATALOG.items():
# Simple search matching
if query.lower() in table_name.lower() or query.lower() in metadata["description"].lower():
if tags is None or any(t in metadata["tags"] for t in tags):
results.append({
"name": table_name,
"description": metadata["description"],
"owner": metadata["owner"],
"tags": metadata["tags"]
})
return json.dumps(results[:limit], indent=2)
@mcp.tool()
def get_table_details(table_name: str) -> str:
"""Get detailed metadata for a specific table.
Args:
table_name: Fully qualified table name (e.g., 'sales.orders')
Returns:
Complete table metadata including columns, owner, and tags
"""
if table_name not in CATALOG:
return json.dumps({"error": f"Table '{table_name}' not found in catalog"})
metadata = CATALOG[table_name]
return json.dumps({
"table": table_name,
**metadata
}, indent=2)
@mcp.tool()
def get_data_lineage(
table_name: str,
direction: str = "both",
depth: int = 2
) -> str:
"""Trace data lineage for a table.
Args:
table_name: Table to trace lineage for
direction: 'upstream', 'downstream', or 'both'
depth: How many levels to traverse (1-5)
Returns:
Lineage graph showing data flow
"""
if table_name not in CATALOG:
return json.dumps({"error": f"Table '{table_name}' not found"})
metadata = CATALOG[table_name]
lineage = {"table": table_name}
if direction in ("upstream", "both"):
lineage["upstream"] = metadata.get("upstream", [])
if direction in ("downstream", "both"):
lineage["downstream"] = metadata.get("downstream", [])
return json.dumps(lineage, indent=2)
if __name__ == "__main__":
mcp.run(transport="stdio")

With this server, you can ask things like “Find all tables related to customers” or “What tables contain PII data?” or “Show me the lineage for the orders table.”

Pipeline Monitoring Server

Monitor pipelines, check status, and investigate failures:

from mcp.server.fastmcp import FastMCP, Context
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
import json
mcp = FastMCP("PipelineMonitorMCP")
class PipelineStatus(str, Enum):
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
PENDING = "pending"
class PipelineRun(BaseModel):
"""Details of a pipeline execution."""
pipeline_id: str
run_id: str
status: PipelineStatus
started_at: str
finished_at: str | None = None
duration_seconds: int | None = None
error_message: str | None = None
# Simulated pipeline data - replace with Airflow/Dagster/Prefect API calls
PIPELINES = {
"etl_daily_sales": {
"last_run": PipelineRun(
pipeline_id="etl_daily_sales",
run_id="run_20260114_001",
status=PipelineStatus.SUCCESS,
started_at="2026-01-14T06:00:00Z",
finished_at="2026-01-14T06:45:00Z",
duration_seconds=2700
),
"schedule": "0 6 * * *",
"owner": "data-platform"
},
"sync_inventory": {
"last_run": PipelineRun(
pipeline_id="sync_inventory",
run_id="run_20260114_001",
status=PipelineStatus.FAILED,
started_at="2026-01-14T07:00:00Z",
finished_at="2026-01-14T07:15:00Z",
duration_seconds=900,
error_message="Connection timeout to inventory service"
),
"schedule": "0 */2 * * *",
"owner": "supply-chain"
}
}
@mcp.tool()
def get_pipeline_status(pipeline_id: str) -> PipelineRun:
"""Get the current status of a data pipeline.
Args:
pipeline_id: Unique identifier for the pipeline
Returns:
Latest run details including status, timing, and any errors
"""
if pipeline_id not in PIPELINES:
return PipelineRun(
pipeline_id=pipeline_id,
run_id="unknown",
status=PipelineStatus.PENDING,
started_at=datetime.now().isoformat(),
error_message=f"Pipeline '{pipeline_id}' not found"
)
return PIPELINES[pipeline_id]["last_run"]
@mcp.tool()
def list_failed_pipelines(hours: int = 24) -> str:
"""List all pipelines that failed recently.
Args:
hours: Look back period in hours (default: 24)
Returns:
List of failed pipelines with error details
"""
failures = []
for pipeline_id, data in PIPELINES.items():
if data["last_run"].status == PipelineStatus.FAILED:
failures.append({
"pipeline": pipeline_id,
"run_id": data["last_run"].run_id,
"error": data["last_run"].error_message,
"failed_at": data["last_run"].finished_at,
"owner": data["owner"]
})
return json.dumps(failures, indent=2)
@mcp.tool()
def list_all_pipelines() -> str:
"""List all registered pipelines with their current status.
Returns:
Summary of all pipelines
"""
summary = []
for pipeline_id, data in PIPELINES.items():
summary.append({
"pipeline": pipeline_id,
"status": data["last_run"].status.value,
"schedule": data["schedule"],
"owner": data["owner"],
"last_run": data["last_run"].started_at
})
return json.dumps(summary, indent=2)
@mcp.tool()
async def trigger_pipeline(pipeline_id: str, ctx: Context) -> str:
"""Trigger a manual run of a pipeline.
Args:
pipeline_id: Pipeline to trigger
ctx: Context for progress reporting
Returns:
Confirmation with new run ID
"""
if pipeline_id not in PIPELINES:
return json.dumps({"error": f"Pipeline '{pipeline_id}' not found"})
await ctx.info(f"Triggering pipeline: {pipeline_id}")
# In real implementation, call your orchestrator's API
new_run_id = f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return json.dumps({
"status": "triggered",
"pipeline": pipeline_id,
"run_id": new_run_id,
"message": f"Pipeline {pipeline_id} triggered successfully"
}, indent=2)
if __name__ == "__main__":
mcp.run(transport="stdio")

Now you can ask “What pipelines failed in the last 24 hours?” or “Trigger a rerun of the sync_inventory pipeline.”

Data Quality Server

Run quality checks and retrieve scores from your data quality platform:

from mcp.server.fastmcp import FastMCP, Context
from pydantic import BaseModel, Field
from enum import Enum
import json
mcp = FastMCP("DataQualityMCP")
class CheckType(str, Enum):
COMPLETENESS = "completeness"
UNIQUENESS = "uniqueness"
FRESHNESS = "freshness"
VALIDITY = "validity"
CONSISTENCY = "consistency"
class QualityCheckResult(BaseModel):
"""Result of a single quality check."""
check_name: str
check_type: CheckType
table_name: str
passed: bool
score: float = Field(ge=0, le=1, description="Score between 0 and 1")
rows_checked: int
issues_found: int
details: str | None = None
# Simulated quality scores - replace with Great Expectations/Elementary/custom API
QUALITY_SCORES = {
"sales.orders": {
"overall": 0.94,
"completeness": 0.99,
"uniqueness": 0.95,
"freshness": 0.88,
"validity": 0.96
},
"sales.customers": {
"overall": 0.87,
"completeness": 0.92,
"uniqueness": 0.98,
"freshness": 0.75,
"validity": 0.85
}
}
@mcp.tool()
async def run_quality_check(
table_name: str,
check_type: CheckType,
ctx: Context
) -> QualityCheckResult:
"""Run a specific data quality check on a table.
Args:
table_name: Table to validate
check_type: Type of quality check to run
ctx: Context for progress reporting
Returns:
Detailed check results
"""
await ctx.info(f"Running {check_type.value} check on {table_name}")
# Simulate check execution
await ctx.report_progress(progress=0.5, total=1.0, message="Scanning table...")
# In real implementation, call your quality framework
score = QUALITY_SCORES.get(table_name, {}).get(check_type.value, 0.5)
passed = score >= 0.9
await ctx.report_progress(progress=1.0, total=1.0, message="Check complete")
return QualityCheckResult(
check_name=f"{check_type.value}_{table_name.replace('.', '_')}",
check_type=check_type,
table_name=table_name,
passed=passed,
score=score,
rows_checked=100000,
issues_found=int((1 - score) * 100000),
details=f"{'Passed' if passed else 'Failed'}: {score:.1%} of rows meet {check_type.value} criteria"
)
@mcp.tool()
def get_quality_score(table_name: str) -> str:
"""Get the overall data quality score for a table.
Args:
table_name: Table to get scores for
Returns:
Quality scores across all dimensions
"""
if table_name not in QUALITY_SCORES:
return json.dumps({
"error": f"No quality scores found for '{table_name}'",
"available_tables": list(QUALITY_SCORES.keys())
}, indent=2)
scores = QUALITY_SCORES[table_name]
return json.dumps({
"table": table_name,
"overall_score": scores["overall"],
"dimensions": {
"completeness": scores["completeness"],
"uniqueness": scores["uniqueness"],
"freshness": scores["freshness"],
"validity": scores.get("validity", "not measured")
},
"status": "healthy" if scores["overall"] >= 0.9 else "needs attention"
}, indent=2)
@mcp.tool()
def list_quality_issues(min_severity: str = "warning") -> str:
"""List tables with data quality issues.
Args:
min_severity: Minimum severity to include ('warning' or 'critical')
Returns:
Tables that need attention
"""
issues = []
threshold = 0.8 if min_severity == "critical" else 0.9
for table_name, scores in QUALITY_SCORES.items():
if scores["overall"] < threshold:
# Find the worst dimension
dimensions = {k: v for k, v in scores.items() if k != "overall"}
worst_dim = min(dimensions, key=dimensions.get)
issues.append({
"table": table_name,
"overall_score": scores["overall"],
"severity": "critical" if scores["overall"] < 0.8 else "warning",
"primary_issue": worst_dim,
"issue_score": dimensions[worst_dim]
})
return json.dumps(sorted(issues, key=lambda x: x["overall_score"]), indent=2)
@mcp.tool()
async def run_full_validation(table_name: str, ctx: Context) -> str:
"""Run all quality checks on a table.
Args:
table_name: Table to validate
ctx: Context for progress reporting
Returns:
Complete validation report
"""
await ctx.info(f"Starting full validation of {table_name}")
results = []
checks = list(CheckType)
for i, check_type in enumerate(checks):
await ctx.report_progress(
progress=(i + 1) / len(checks),
total=1.0,
message=f"Running {check_type.value} check..."
)
result = await run_quality_check(table_name, check_type, ctx)
results.append({
"check": result.check_type.value,
"passed": result.passed,
"score": result.score
})
overall_passed = all(r["passed"] for r in results)
return json.dumps({
"table": table_name,
"validation_passed": overall_passed,
"checks": results,
"summary": f"{'All checks passed' if overall_passed else 'Some checks failed'}"
}, indent=2)
if __name__ == "__main__":
mcp.run(transport="stdio")

This lets you ask “What’s the data quality score for the orders table?” or “Run a completeness check on sales.customers” or “Which tables have quality issues I should investigate?”

Project Setup Checklist

Creating a new MCP server project from scratch:

1. Initialize the Project

Terminal window
# Create project with uv
uv init my-data-mcp-server
cd my-data-mcp-server
# Or with standard Python
mkdir my-data-mcp-server
cd my-data-mcp-server
python -m venv .venv
source .venv/bin/activate

2. Add Dependencies

Terminal window
# Core MCP with CLI tools
uv add "mcp[cli]"
# Common data engineering dependencies
uv add httpx # HTTP client for API calls
uv add pydantic # Data validation (usually included with mcp)
uv add sqlalchemy # Database connections
uv add asyncpg # Async PostgreSQL (if needed)

3. Create the Server File

Terminal window
touch server.py

Start with the basic skeleton from earlier, then add your tools.

4. Test with MCP Inspector

Terminal window
npx @modelcontextprotocol/inspector uv run server.py

Open the URL shown in your browser. Verify:

  • All tools appear in the list
  • Descriptions are clear
  • Input schemas look correct
  • Tools execute without errors

5. Install to Claude Desktop

Terminal window
# Quick install (creates config entry)
uv run mcp install server.py --name "My Data Server"

Or manually edit claude_desktop_config.json:

{
"mcpServers": {
"my-data-server": {
"command": "uv",
"args": ["run", "/full/path/to/server.py"],
"env": {
"DATABASE_URL": "your-connection-string"
}
}
}
}

6. Install to Claude Code

Terminal window
# Add to current project (creates .mcp.json)
claude mcp add my-data-server -s project -- uv run /full/path/to/server.py
# Or user-wide installation
claude mcp add my-data-server -- uv run /full/path/to/server.py

7. Test in Production Client

Terminal window
# With Claude Code
claude
> What tools does my-data-server provide?
> [Test each tool with realistic inputs]

Project Structure

A typical MCP server project looks like:

my-data-mcp-server/
├── pyproject.toml # Dependencies and metadata
├── server.py # Main server file
├── tools/ # Tool implementations (optional, for large servers)
│ ├── __init__.py
│ ├── catalog.py
│ └── quality.py
├── tests/ # Unit tests for business logic
│ └── test_tools.py
└── README.md # Setup and usage instructions

Existing Servers to Study

Before building your own, look at how these data engineering MCP servers are structured:

DataHub MCP

Repository: github.com/acryldata/mcp-server-datahub

The official MCP server for DataHub, the popular open-source data catalog. Features:

  • Search across datasets, dashboards, and pipelines
  • Lineage traversal
  • Metadata retrieval
  • SQL query execution

Study this for: catalog integration patterns, search implementation, lineage APIs.

dbt MCP

Repository: github.com/dbt-labs/dbt-mcp

The official dbt integration covered in depth in the dbt MCP tutorial article. Features:

  • CLI command execution (run, test, build)
  • Semantic layer queries
  • Model discovery and lineage
  • Cloud API integration

Study this for: CLI wrapping patterns, hybrid local/remote architecture, environment variable handling.

OpenMetadata MCP

Documentation: open-metadata.org/mcp

Enterprise metadata platform integration with:

  • Data profiling results
  • Lineage visualization
  • Quality metrics
  • Governance workflows

Study this for: enterprise patterns, authentication handling, complex metadata models.

Elementary MCP

Website: elementary-data.com

Data observability platform integration offering:

  • Test results and history
  • Anomaly detection
  • Data freshness monitoring
  • Alert management

Study this for: observability patterns, time-series data exposure, alert integration.

Building on Existing Work

When studying these servers, focus on how they structure tool inputs and outputs, how they handle credentials securely, how they report failures to the AI, and what configuration options they expose.

Many servers are open source under permissive licenses. Fork them as starting points or adapt their patterns for your own implementations.

Next Steps

Start small. Pick one internal system you query often, build a minimal server with one or two tools, test it with Inspector, then iterate as you discover needs.

For production deployments, you’ll want authentication (OAuth 2.1 for remote HTTP servers), rate limiting to protect your backend systems, logging and monitoring to track usage, and clear documentation so both the AI and your team know when to use each tool.

The MCP ecosystem is growing fast, and the servers you build today for internal use might become valuable community contributions tomorrow.