Adrienne Vermorel
Building Custom MCP Servers for Data Engineering
When to Build Custom
Before writing code, check if someone else already solved your problem.
With over 5,800 community servers and official integrations for BigQuery, Snowflake, dbt, and dozens of other data tools, you can often skip the build entirely. Browse the MCP Registry, awesome-mcp-servers, and vendor documentation first.
You’ll need a custom server when your organization uses proprietary internal systems, things like homegrown data catalogs, custom orchestrators, or internal APIs that no public server will ever support. The same applies when you want to combine multiple data sources into a unified interface, or when security requirements mandate self-hosted solutions with specific credential handling.
In data engineering, the most common use cases are internal data catalogs (searching metadata, exposing lineage), pipeline monitoring (checking job status across Airflow, Dagster, or whatever scheduler you’re running), data quality platforms (running checks, retrieving scores from Great Expectations or Elementary), and cost management tools.
A basic MCP server takes about 50 lines of Python. The SDKs handle protocol complexity so you can focus on your actual business logic.
SDK Options
MCP has official SDKs for multiple languages. For data engineering, Python and TypeScript are the practical choices.
Python SDK
If you’re already working in Python (and most data engineers are), this is the obvious choice.
- Package:
mcpon PyPI - Repository: github.com/modelcontextprotocol/python-sdk
- Current version: v1.25.0 (v2 in development)
- Requirements: Python 3.10+
Installation with uv (recommended):
uv add "mcp[cli]"Or with pip:
pip install "mcp[cli]"The [cli] extra includes the command-line tools for testing and installation.
The SDK includes FastMCP, a high-level framework that uses decorators to define tools, resources, and prompts. It handles JSON-RPC serialization, transport management, and protocol negotiation automatically.
TypeScript SDK
For teams with Node.js infrastructure, or frontend developers branching into data tooling:
- Packages:
@modelcontextprotocol/server,@modelcontextprotocol/client - Repository: github.com/modelcontextprotocol/typescript-sdk
- Requirements: Node.js 18+ (22.7.5+ recommended)
Installation:
npm install @modelcontextprotocol/server zodThe TypeScript SDK uses Zod for schema validation, which gives you solid type safety and IDE support.
Which SDK to Choose?
| Consideration | Python | TypeScript |
|---|---|---|
| Existing codebase | Data pipelines, notebooks, dbt | Web apps, Node services |
| Libraries | pandas, SQLAlchemy, boto3 | Better for async I/O heavy workloads |
| Team expertise | Data engineers | Full-stack developers |
| Deployment | Works with uvx, easy to distribute | npm packages, containerized |
For most data engineering work, Python with FastMCP is the fastest path to a working server.
Basic Server Skeleton
Here’s a minimal MCP server in both Python and TypeScript. Each exposes a single tool you can test immediately.
Python FastMCP Example
Create a file named server.py:
from mcp.server.fastmcp import FastMCP
# Initialize the server with a namemcp = FastMCP("DataEngineering")
@mcp.tool()def query_database(query: str, database: str = "production") -> str: """Execute a SQL query against the specified database.
Args: query: The SQL query to execute database: Target database name (default: production)
Returns: Query results as formatted text """ # In a real implementation, you would connect to your database here return f"Query executed on {database}: {query}"
if __name__ == "__main__": mcp.run(transport="stdio")The @mcp.tool() decorator registers the function as an MCP tool. FastMCP extracts the function signature to build the JSON Schema, uses the docstring as the tool description, and handles all the serialization and protocol communication.
Run the server:
uv run server.pyTypeScript McpServer Example
Create server.ts:
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";import { z } from "zod";
const server = new McpServer({ name: "data-engineering-server", version: "1.0.0",});
server.registerTool( "query_database", { description: "Execute a SQL query against the specified database", inputSchema: { query: z.string().describe("The SQL query to execute"), database: z.string().default("production").describe("Target database name"), }, }, async ({ query, database }) => { // In a real implementation, connect to your database here return { content: [{ type: "text", text: `Query executed on ${database}: ${query}` }], }; });
async function main() { const transport = new StdioServerTransport(); await server.connect(transport);}
main();Compile and run:
npx tsc server.tsnode server.jsRunning with stdio Transport
Both examples use stdio transport, communicating over standard input/output streams. This is the default for local development. No network configuration required, the client spawns the server as a subprocess, and communication happens through pipes rather than sockets. It’s ideal for desktop applications like Claude Desktop or Cursor.
Defining Tools
Tools are how MCP servers expose functionality. A good tool has a clear description, typed parameters, and predictable output.
Docstrings as Descriptions
FastMCP extracts tool descriptions directly from Python docstrings:
@mcp.tool()def get_table_schema(table_name: str) -> str: """Get the schema definition for a database table.
Returns column names, data types, and constraints for the specified table. Useful for understanding table structure before writing queries.
Args: table_name: Fully qualified table name (e.g., 'analytics.orders')
Returns: Schema definition as formatted text """ # Implementation here return f"Schema for {table_name}: id INT PRIMARY KEY, name VARCHAR(255)..."The AI sees this description when deciding which tool to call. Write descriptions that help the model understand when and why to use the tool, not just what it does.
Pydantic Models for Structured Output
For complex return values, use Pydantic models to ensure consistent structure:
from pydantic import BaseModel, Field
class ValidationResult(BaseModel): """Result of a data quality validation check.""" table_name: str row_count: int = Field(description="Total rows examined") null_count: int = Field(description="Number of null values found") duplicate_count: int = Field(description="Number of duplicate rows") is_valid: bool = Field(description="Whether the table passed validation")
@mcp.tool()def run_data_quality_check(table_name: str) -> ValidationResult: """Run comprehensive data quality checks on a table.
Validates completeness, uniqueness, and data integrity.
Args: table_name: The table to validate
Returns: Detailed validation results """ # Run actual checks against your database return ValidationResult( table_name=table_name, row_count=10000, null_count=5, duplicate_count=0, is_valid=True )FastMCP serializes Pydantic models to JSON automatically, and the Field descriptions enrich the output schema.
Input Validation with Schemas
Type hints define the input schema. Use Python’s type system and Pydantic for validation:
from enum import Enumfrom typing import Optionalfrom pydantic import BaseModel, Field
class Environment(str, Enum): PRODUCTION = "production" STAGING = "staging" DEVELOPMENT = "development"
class QueryParams(BaseModel): """Parameters for database queries.""" limit: int = Field(default=100, ge=1, le=10000, description="Maximum rows to return") timeout_seconds: int = Field(default=30, ge=1, le=300, description="Query timeout")
@mcp.tool()def run_query( query: str, environment: Environment = Environment.PRODUCTION, params: Optional[QueryParams] = None) -> str: """Execute a read-only SQL query.
Args: query: SQL SELECT statement to execute environment: Target environment params: Optional query parameters
Returns: Query results as JSON """ effective_params = params or QueryParams() # Execute with validated parameters return f"Results from {environment.value} (limit: {effective_params.limit})"Enums create dropdown-style options in tool UIs. Pydantic’s Field constraints prevent invalid inputs before they reach your code.
Advanced Patterns
Beyond basic tools, MCP servers can expose resources, prompts, and progress reporting.
Progress Reporting with Context
Long-running operations should report progress so the AI can inform users:
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("DataEngineering")
@mcp.tool()async def process_large_dataset(dataset_id: str, ctx: Context) -> str: """Process a large dataset with progress updates.
Args: dataset_id: Identifier of the dataset to process ctx: MCP context for progress reporting (injected automatically)
Returns: Processing summary """ await ctx.info(f"Starting processing of {dataset_id}")
total_batches = 10 for i in range(total_batches): # Simulate batch processing await ctx.report_progress( progress=(i + 1) / total_batches, total=1.0, message=f"Processing batch {i + 1}/{total_batches}" )
await ctx.info("Processing complete") return f"Processed dataset {dataset_id}: 10 batches, 100,000 rows"The Context object is injected automatically when you include it as a parameter. Use ctx.info(), ctx.warning(), and ctx.error() for logging, and ctx.report_progress() for progress bars.
Resources for Data Exposure
Resources expose data that the AI can read, similar to a GET endpoint. Unlike tools, resources are application-controlled and don’t have side effects:
@mcp.resource("schema://{table_name}")def get_table_schema(table_name: str) -> str: """Expose table schema as a readable resource.
URI pattern: schema://analytics.orders """ # Fetch from your metadata store return f""" Table: {table_name} Columns: - id: INT (Primary Key) - customer_id: INT (Foreign Key) - total_amount: DECIMAL(10,2) - created_at: TIMESTAMP """
@mcp.resource("config://pipelines/{pipeline_id}")def get_pipeline_config(pipeline_id: str) -> str: """Expose pipeline configuration.""" return f"Pipeline {pipeline_id} configuration: schedule=daily, owner=data-team"Resources use URI templates with {variable} placeholders. The AI can browse available resources and read them without executing actions.
Prompts for Reusable Templates
Prompts are user-controlled templates that guide AI interactions:
@mcp.prompt(title="Data Quality Report")def data_quality_prompt(table_name: str) -> str: """Generate a data quality analysis prompt.
Args: table_name: Table to analyze """ return f"""Analyze the data quality of the '{table_name}' table.
Please check:1. Completeness: What percentage of required fields are populated?2. Uniqueness: Are there duplicate records?3. Freshness: When was the data last updated?4. Validity: Do values conform to expected formats and ranges?
Provide a summary with specific recommendations for improvement."""
@mcp.prompt(title="Schema Review")def schema_review_prompt(table_name: str, changes: str) -> str: """Generate a schema change review prompt.""" return f"""Review the proposed schema changes for {table_name}:
{changes}
Evaluate:- Backward compatibility with existing queries- Impact on downstream dependencies- Performance implications- Data migration requirements"""Prompts appear in the client UI and help users invoke the AI with structured requests they can reuse.
Transport Configuration
MCP supports two primary transports, and your choice depends on how you plan to deploy.
stdio for Local Development
Standard I/O transport runs the server as a subprocess:
if __name__ == "__main__": mcp.run(transport="stdio")Configure in Claude Desktop (claude_desktop_config.json):
{ "mcpServers": { "my-data-server": { "command": "uv", "args": ["run", "/path/to/server.py"], "env": { "DATABASE_URL": "postgresql://localhost/analytics" } } }}Or in Claude Code:
claude mcp add my-data-server -- uv run /path/to/server.pystdio means zero network configuration, credentials passed via environment variables, process isolation (each client gets its own server instance), and simple debugging with logs going to stderr.
Streamable HTTP for Production Deployment
For remote servers or shared deployments, use HTTP transport:
mcp = FastMCP("RemoteDataServer", stateless_http=True)
if __name__ == "__main__": mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)Configure client to connect:
{ "mcpServers": { "remote-data-server": { "type": "http", "url": "https://mcp.yourcompany.com", "headers": { "Authorization": "Bearer ${MCP_API_KEY}" } } }}HTTP requires authentication (OAuth 2.1 for production), but it can serve multiple clients from one instance and deploy behind load balancers. It also supports Server-Sent Events for streaming responses.
Start with stdio for development and internal tools. Move to HTTP when you need centralized deployment or cross-team access.
Testing and Debugging
Testing MCP servers requires tools that speak the protocol. The MCP Inspector is indispensable here.
MCP Inspector
The Inspector is an interactive testing UI that connects to your server:
# Test a Python servernpx @modelcontextprotocol/inspector uv run server.py
# Test a Node.js servernpx @modelcontextprotocol/inspector node build/index.js
# Connect to a remote HTTP servernpx @modelcontextprotocol/inspector --connect https://mcp.yourcompany.comThe Inspector launches a web UI where you can view all registered tools, resources, and prompts, call tools with custom arguments, inspect JSON-RPC messages, and debug response formatting.
Logging to stderr
The most common mistake when building MCP servers is printing to stdout. stdio transport uses stdout for JSON-RPC messages. Any other output on stdout corrupts the protocol and breaks communication.
# BAD - breaks JSON-RPC communicationprint("Debug: processing query") # Goes to stdout!print(f"Error: {e}") # Also stdout!
# GOOD - use logging to stderrimport loggingimport sys
logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stderr # Explicitly send to stderr)logger = logging.getLogger(__name__)
logger.info("Debug: processing query") # Goes to stderrlogger.error(f"Error: {e}") # Also stderrOr use the Context object inside tools:
@mcp.tool()async def my_tool(query: str, ctx: Context) -> str: await ctx.info("Processing query") # Proper MCP logging # ...If your server works in isolation but fails when connected to a client, check for stray print() statements first. This will bite you at least once.
Testing Workflow
- Unit test your business logic separately from MCP:
def test_query_execution(): result = execute_query("SELECT 1", "test_db") assert "1" in result- Test the MCP server with Inspector:
npx @modelcontextprotocol/inspector uv run server.py- Integration test with a real client (Claude Desktop or Claude Code):
# Add to Claude Codeclaude mcp add test-server -- uv run server.py
# Test interactivelyclaude> List the tools available from test-serverData Engineering Examples
Three practical MCP servers that address common data engineering needs.
Data Catalog Server
This server exposes your internal data catalog for AI-assisted discovery:
from mcp.server.fastmcp import FastMCPimport json
mcp = FastMCP("DataCatalogMCP")
# Simulated catalog data - replace with your actual catalog APICATALOG = { "sales.orders": { "description": "Order transactions from all channels", "columns": ["order_id", "customer_id", "total_amount", "created_at"], "owner": "sales-team", "tags": ["pii", "financial"], "upstream": ["raw.shopify_orders", "raw.pos_transactions"], "downstream": ["analytics.revenue_daily", "ml.churn_features"] }, "sales.customers": { "description": "Customer master data with demographics", "columns": ["customer_id", "email", "segment", "lifetime_value"], "owner": "marketing-team", "tags": ["pii"], "upstream": ["raw.crm_contacts"], "downstream": ["sales.orders", "analytics.cohorts"] }}
@mcp.tool()def search_tables( query: str, tags: list[str] | None = None, limit: int = 10) -> str: """Search for tables in the data catalog.
Args: query: Search term to match against table names and descriptions tags: Optional list of tags to filter by (e.g., ['pii', 'financial']) limit: Maximum number of results to return
Returns: Matching tables with descriptions """ results = [] for table_name, metadata in CATALOG.items(): # Simple search matching if query.lower() in table_name.lower() or query.lower() in metadata["description"].lower(): if tags is None or any(t in metadata["tags"] for t in tags): results.append({ "name": table_name, "description": metadata["description"], "owner": metadata["owner"], "tags": metadata["tags"] }) return json.dumps(results[:limit], indent=2)
@mcp.tool()def get_table_details(table_name: str) -> str: """Get detailed metadata for a specific table.
Args: table_name: Fully qualified table name (e.g., 'sales.orders')
Returns: Complete table metadata including columns, owner, and tags """ if table_name not in CATALOG: return json.dumps({"error": f"Table '{table_name}' not found in catalog"})
metadata = CATALOG[table_name] return json.dumps({ "table": table_name, **metadata }, indent=2)
@mcp.tool()def get_data_lineage( table_name: str, direction: str = "both", depth: int = 2) -> str: """Trace data lineage for a table.
Args: table_name: Table to trace lineage for direction: 'upstream', 'downstream', or 'both' depth: How many levels to traverse (1-5)
Returns: Lineage graph showing data flow """ if table_name not in CATALOG: return json.dumps({"error": f"Table '{table_name}' not found"})
metadata = CATALOG[table_name] lineage = {"table": table_name}
if direction in ("upstream", "both"): lineage["upstream"] = metadata.get("upstream", []) if direction in ("downstream", "both"): lineage["downstream"] = metadata.get("downstream", [])
return json.dumps(lineage, indent=2)
if __name__ == "__main__": mcp.run(transport="stdio")With this server, you can ask things like “Find all tables related to customers” or “What tables contain PII data?” or “Show me the lineage for the orders table.”
Pipeline Monitoring Server
Monitor pipelines, check status, and investigate failures:
from mcp.server.fastmcp import FastMCP, Contextfrom pydantic import BaseModel, Fieldfrom datetime import datetimefrom enum import Enumimport json
mcp = FastMCP("PipelineMonitorMCP")
class PipelineStatus(str, Enum): RUNNING = "running" SUCCESS = "success" FAILED = "failed" PENDING = "pending"
class PipelineRun(BaseModel): """Details of a pipeline execution.""" pipeline_id: str run_id: str status: PipelineStatus started_at: str finished_at: str | None = None duration_seconds: int | None = None error_message: str | None = None
# Simulated pipeline data - replace with Airflow/Dagster/Prefect API callsPIPELINES = { "etl_daily_sales": { "last_run": PipelineRun( pipeline_id="etl_daily_sales", run_id="run_20260114_001", status=PipelineStatus.SUCCESS, started_at="2026-01-14T06:00:00Z", finished_at="2026-01-14T06:45:00Z", duration_seconds=2700 ), "schedule": "0 6 * * *", "owner": "data-platform" }, "sync_inventory": { "last_run": PipelineRun( pipeline_id="sync_inventory", run_id="run_20260114_001", status=PipelineStatus.FAILED, started_at="2026-01-14T07:00:00Z", finished_at="2026-01-14T07:15:00Z", duration_seconds=900, error_message="Connection timeout to inventory service" ), "schedule": "0 */2 * * *", "owner": "supply-chain" }}
@mcp.tool()def get_pipeline_status(pipeline_id: str) -> PipelineRun: """Get the current status of a data pipeline.
Args: pipeline_id: Unique identifier for the pipeline
Returns: Latest run details including status, timing, and any errors """ if pipeline_id not in PIPELINES: return PipelineRun( pipeline_id=pipeline_id, run_id="unknown", status=PipelineStatus.PENDING, started_at=datetime.now().isoformat(), error_message=f"Pipeline '{pipeline_id}' not found" )
return PIPELINES[pipeline_id]["last_run"]
@mcp.tool()def list_failed_pipelines(hours: int = 24) -> str: """List all pipelines that failed recently.
Args: hours: Look back period in hours (default: 24)
Returns: List of failed pipelines with error details """ failures = [] for pipeline_id, data in PIPELINES.items(): if data["last_run"].status == PipelineStatus.FAILED: failures.append({ "pipeline": pipeline_id, "run_id": data["last_run"].run_id, "error": data["last_run"].error_message, "failed_at": data["last_run"].finished_at, "owner": data["owner"] })
return json.dumps(failures, indent=2)
@mcp.tool()def list_all_pipelines() -> str: """List all registered pipelines with their current status.
Returns: Summary of all pipelines """ summary = [] for pipeline_id, data in PIPELINES.items(): summary.append({ "pipeline": pipeline_id, "status": data["last_run"].status.value, "schedule": data["schedule"], "owner": data["owner"], "last_run": data["last_run"].started_at })
return json.dumps(summary, indent=2)
@mcp.tool()async def trigger_pipeline(pipeline_id: str, ctx: Context) -> str: """Trigger a manual run of a pipeline.
Args: pipeline_id: Pipeline to trigger ctx: Context for progress reporting
Returns: Confirmation with new run ID """ if pipeline_id not in PIPELINES: return json.dumps({"error": f"Pipeline '{pipeline_id}' not found"})
await ctx.info(f"Triggering pipeline: {pipeline_id}")
# In real implementation, call your orchestrator's API new_run_id = f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return json.dumps({ "status": "triggered", "pipeline": pipeline_id, "run_id": new_run_id, "message": f"Pipeline {pipeline_id} triggered successfully" }, indent=2)
if __name__ == "__main__": mcp.run(transport="stdio")Now you can ask “What pipelines failed in the last 24 hours?” or “Trigger a rerun of the sync_inventory pipeline.”
Data Quality Server
Run quality checks and retrieve scores from your data quality platform:
from mcp.server.fastmcp import FastMCP, Contextfrom pydantic import BaseModel, Fieldfrom enum import Enumimport json
mcp = FastMCP("DataQualityMCP")
class CheckType(str, Enum): COMPLETENESS = "completeness" UNIQUENESS = "uniqueness" FRESHNESS = "freshness" VALIDITY = "validity" CONSISTENCY = "consistency"
class QualityCheckResult(BaseModel): """Result of a single quality check.""" check_name: str check_type: CheckType table_name: str passed: bool score: float = Field(ge=0, le=1, description="Score between 0 and 1") rows_checked: int issues_found: int details: str | None = None
# Simulated quality scores - replace with Great Expectations/Elementary/custom APIQUALITY_SCORES = { "sales.orders": { "overall": 0.94, "completeness": 0.99, "uniqueness": 0.95, "freshness": 0.88, "validity": 0.96 }, "sales.customers": { "overall": 0.87, "completeness": 0.92, "uniqueness": 0.98, "freshness": 0.75, "validity": 0.85 }}
@mcp.tool()async def run_quality_check( table_name: str, check_type: CheckType, ctx: Context) -> QualityCheckResult: """Run a specific data quality check on a table.
Args: table_name: Table to validate check_type: Type of quality check to run ctx: Context for progress reporting
Returns: Detailed check results """ await ctx.info(f"Running {check_type.value} check on {table_name}")
# Simulate check execution await ctx.report_progress(progress=0.5, total=1.0, message="Scanning table...")
# In real implementation, call your quality framework score = QUALITY_SCORES.get(table_name, {}).get(check_type.value, 0.5) passed = score >= 0.9
await ctx.report_progress(progress=1.0, total=1.0, message="Check complete")
return QualityCheckResult( check_name=f"{check_type.value}_{table_name.replace('.', '_')}", check_type=check_type, table_name=table_name, passed=passed, score=score, rows_checked=100000, issues_found=int((1 - score) * 100000), details=f"{'Passed' if passed else 'Failed'}: {score:.1%} of rows meet {check_type.value} criteria" )
@mcp.tool()def get_quality_score(table_name: str) -> str: """Get the overall data quality score for a table.
Args: table_name: Table to get scores for
Returns: Quality scores across all dimensions """ if table_name not in QUALITY_SCORES: return json.dumps({ "error": f"No quality scores found for '{table_name}'", "available_tables": list(QUALITY_SCORES.keys()) }, indent=2)
scores = QUALITY_SCORES[table_name] return json.dumps({ "table": table_name, "overall_score": scores["overall"], "dimensions": { "completeness": scores["completeness"], "uniqueness": scores["uniqueness"], "freshness": scores["freshness"], "validity": scores.get("validity", "not measured") }, "status": "healthy" if scores["overall"] >= 0.9 else "needs attention" }, indent=2)
@mcp.tool()def list_quality_issues(min_severity: str = "warning") -> str: """List tables with data quality issues.
Args: min_severity: Minimum severity to include ('warning' or 'critical')
Returns: Tables that need attention """ issues = [] threshold = 0.8 if min_severity == "critical" else 0.9
for table_name, scores in QUALITY_SCORES.items(): if scores["overall"] < threshold: # Find the worst dimension dimensions = {k: v for k, v in scores.items() if k != "overall"} worst_dim = min(dimensions, key=dimensions.get)
issues.append({ "table": table_name, "overall_score": scores["overall"], "severity": "critical" if scores["overall"] < 0.8 else "warning", "primary_issue": worst_dim, "issue_score": dimensions[worst_dim] })
return json.dumps(sorted(issues, key=lambda x: x["overall_score"]), indent=2)
@mcp.tool()async def run_full_validation(table_name: str, ctx: Context) -> str: """Run all quality checks on a table.
Args: table_name: Table to validate ctx: Context for progress reporting
Returns: Complete validation report """ await ctx.info(f"Starting full validation of {table_name}")
results = [] checks = list(CheckType)
for i, check_type in enumerate(checks): await ctx.report_progress( progress=(i + 1) / len(checks), total=1.0, message=f"Running {check_type.value} check..." )
result = await run_quality_check(table_name, check_type, ctx) results.append({ "check": result.check_type.value, "passed": result.passed, "score": result.score })
overall_passed = all(r["passed"] for r in results)
return json.dumps({ "table": table_name, "validation_passed": overall_passed, "checks": results, "summary": f"{'All checks passed' if overall_passed else 'Some checks failed'}" }, indent=2)
if __name__ == "__main__": mcp.run(transport="stdio")This lets you ask “What’s the data quality score for the orders table?” or “Run a completeness check on sales.customers” or “Which tables have quality issues I should investigate?”
Project Setup Checklist
Creating a new MCP server project from scratch:
1. Initialize the Project
# Create project with uvuv init my-data-mcp-servercd my-data-mcp-server
# Or with standard Pythonmkdir my-data-mcp-servercd my-data-mcp-serverpython -m venv .venvsource .venv/bin/activate2. Add Dependencies
# Core MCP with CLI toolsuv add "mcp[cli]"
# Common data engineering dependenciesuv add httpx # HTTP client for API callsuv add pydantic # Data validation (usually included with mcp)uv add sqlalchemy # Database connectionsuv add asyncpg # Async PostgreSQL (if needed)3. Create the Server File
touch server.pyStart with the basic skeleton from earlier, then add your tools.
4. Test with MCP Inspector
npx @modelcontextprotocol/inspector uv run server.pyOpen the URL shown in your browser. Verify:
- All tools appear in the list
- Descriptions are clear
- Input schemas look correct
- Tools execute without errors
5. Install to Claude Desktop
# Quick install (creates config entry)uv run mcp install server.py --name "My Data Server"Or manually edit claude_desktop_config.json:
{ "mcpServers": { "my-data-server": { "command": "uv", "args": ["run", "/full/path/to/server.py"], "env": { "DATABASE_URL": "your-connection-string" } } }}6. Install to Claude Code
# Add to current project (creates .mcp.json)claude mcp add my-data-server -s project -- uv run /full/path/to/server.py
# Or user-wide installationclaude mcp add my-data-server -- uv run /full/path/to/server.py7. Test in Production Client
# With Claude Codeclaude> What tools does my-data-server provide?> [Test each tool with realistic inputs]Project Structure
A typical MCP server project looks like:
my-data-mcp-server/├── pyproject.toml # Dependencies and metadata├── server.py # Main server file├── tools/ # Tool implementations (optional, for large servers)│ ├── __init__.py│ ├── catalog.py│ └── quality.py├── tests/ # Unit tests for business logic│ └── test_tools.py└── README.md # Setup and usage instructionsExisting Servers to Study
Before building your own, look at how these data engineering MCP servers are structured:
DataHub MCP
Repository: github.com/acryldata/mcp-server-datahub
The official MCP server for DataHub, the popular open-source data catalog. Features:
- Search across datasets, dashboards, and pipelines
- Lineage traversal
- Metadata retrieval
- SQL query execution
Study this for: catalog integration patterns, search implementation, lineage APIs.
dbt MCP
Repository: github.com/dbt-labs/dbt-mcp
The official dbt integration covered in depth in the dbt MCP tutorial article. Features:
- CLI command execution (run, test, build)
- Semantic layer queries
- Model discovery and lineage
- Cloud API integration
Study this for: CLI wrapping patterns, hybrid local/remote architecture, environment variable handling.
OpenMetadata MCP
Documentation: open-metadata.org/mcp
Enterprise metadata platform integration with:
- Data profiling results
- Lineage visualization
- Quality metrics
- Governance workflows
Study this for: enterprise patterns, authentication handling, complex metadata models.
Elementary MCP
Website: elementary-data.com
Data observability platform integration offering:
- Test results and history
- Anomaly detection
- Data freshness monitoring
- Alert management
Study this for: observability patterns, time-series data exposure, alert integration.
Building on Existing Work
When studying these servers, focus on how they structure tool inputs and outputs, how they handle credentials securely, how they report failures to the AI, and what configuration options they expose.
Many servers are open source under permissive licenses. Fork them as starting points or adapt their patterns for your own implementations.
Next Steps
Start small. Pick one internal system you query often, build a minimal server with one or two tools, test it with Inspector, then iterate as you discover needs.
For production deployments, you’ll want authentication (OAuth 2.1 for remote HTTP servers), rate limiting to protect your backend systems, logging and monitoring to track usage, and clear documentation so both the AI and your team know when to use each tool.
The MCP ecosystem is growing fast, and the servers you build today for internal use might become valuable community contributions tomorrow.