Adrienne Vermorel

Créer des serveurs MCP personnalisés pour la data engineering

Quand construire son propre serveur

Avant d’écrire du code, vérifiez si quelqu’un n’a pas déjà résolu votre problème.

Avec plus de 5 800 serveurs communautaires et des intégrations officielles pour BigQuery, Snowflake, dbt et des dizaines d’autres outils data, vous pouvez souvent éviter de développer quoi que ce soit. Commencez par explorer le MCP Registry, awesome-mcp-servers et la documentation des éditeurs.

Un serveur personnalisé devient nécessaire quand votre organisation utilise des systèmes internes propriétaires : catalogues de données maison, orchestrateurs sur mesure ou APIs internes qu’aucun serveur public ne supportera jamais. C’est également le cas quand vous souhaitez combiner plusieurs sources de données dans une interface unifiée, ou quand des exigences de sécurité imposent des solutions auto-hébergées avec une gestion spécifique des credentials.

En data engineering, les cas d’usage les plus courants sont les catalogues de données internes (recherche de métadonnées, exposition de la lineage), le monitoring de pipelines (vérification du statut des jobs sur Airflow, Dagster ou tout autre scheduler), les plateformes de data quality (exécution de checks, récupération des scores depuis Great Expectations ou Elementary) et les outils de gestion des coûts.

Un serveur MCP basique représente environ 50 lignes de Python. Les SDK gèrent la complexité du protocole pour que vous puissiez vous concentrer sur votre logique métier.

Les différents SDK

MCP dispose de SDK officiels pour plusieurs langages. En data engineering, Python et TypeScript sont les choix pragmatiques.

SDK Python

Si vous travaillez déjà en Python (et c’est le cas de la plupart des data engineers), le choix s’impose de lui-même.

Installation avec uv (recommandé) :

Terminal window
uv add "mcp[cli]"

Ou avec pip :

Terminal window
pip install "mcp[cli]"

L’extra [cli] inclut les outils en ligne de commande pour les tests et l’installation.

Le SDK intègre FastMCP, un framework haut niveau qui utilise des décorateurs pour définir les tools, resources et prompts. Il gère automatiquement la sérialisation JSON-RPC, la gestion des transports et la négociation du protocole.

SDK TypeScript

Pour les équipes disposant d’une infrastructure Node.js, ou les développeurs frontend s’orientant vers le data tooling :

Installation :

Terminal window
npm install @modelcontextprotocol/server zod

Le SDK TypeScript utilise Zod pour la validation des schémas, ce qui vous offre un typage solide et un bon support IDE.

Quel SDK choisir ?

CritèrePythonTypeScript
Codebase existantePipelines data, notebooks, dbtApplications web, services Node
Bibliothèquespandas, SQLAlchemy, boto3Meilleur pour les workloads I/O asynchrones
Expertise de l’équipeData engineersDéveloppeurs full-stack
DéploiementFonctionne avec uvx, distribution facilePackages npm, conteneurisé

Pour la plupart des projets de data engineering, Python avec FastMCP est le chemin le plus rapide vers un serveur fonctionnel.

Structure de base d’un serveur

Voici un serveur MCP minimal en Python et TypeScript. Chacun expose un outil unique que vous pouvez tester immédiatement.

Exemple Python avec FastMCP

Créez un fichier nommé server.py :

from mcp.server.fastmcp import FastMCP
# Initialize the server with a name
mcp = FastMCP("DataEngineering")
@mcp.tool()
def query_database(query: str, database: str = "production") -> str:
"""Execute a SQL query against the specified database.
Args:
query: The SQL query to execute
database: Target database name (default: production)
Returns:
Query results as formatted text
"""
# In a real implementation, you would connect to your database here
return f"Query executed on {database}: {query}"
if __name__ == "__main__":
mcp.run(transport="stdio")

Le décorateur @mcp.tool() enregistre la fonction comme un outil MCP. FastMCP extrait la signature de la fonction pour construire le JSON Schema, utilise la docstring comme description de l’outil et gère toute la sérialisation et la communication du protocole.

Lancez le serveur :

Terminal window
uv run server.py

Exemple TypeScript avec McpServer

Créez server.ts :

import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import { z } from "zod";
const server = new McpServer({
name: "data-engineering-server",
version: "1.0.0",
});
server.registerTool(
"query_database",
{
description: "Execute a SQL query against the specified database",
inputSchema: {
query: z.string().describe("The SQL query to execute"),
database: z.string().default("production").describe("Target database name"),
},
},
async ({ query, database }) => {
// In a real implementation, connect to your database here
return {
content: [{ type: "text", text: `Query executed on ${database}: ${query}` }],
};
}
);
async function main() {
const transport = new StdioServerTransport();
await server.connect(transport);
}
main();

Compilez et exécutez :

Terminal window
npx tsc server.ts
node server.js

Exécution avec le transport stdio

Les deux exemples utilisent le transport stdio, qui communique via les flux d’entrée/sortie standard. C’est le mode par défaut pour le développement local. Aucune configuration réseau n’est nécessaire : le client lance le serveur comme un sous-processus et la communication s’effectue via des pipes plutôt que des sockets. C’est idéal pour les applications desktop comme Claude Desktop ou Cursor.

Définir des tools

Les tools sont le moyen par lequel les serveurs MCP exposent leurs fonctionnalités. Un bon tool possède une description claire, des paramètres typés et un output prévisible.

Les docstrings comme descriptions

FastMCP extrait les descriptions des tools directement des docstrings Python :

@mcp.tool()
def get_table_schema(table_name: str) -> str:
"""Get the schema definition for a database table.
Returns column names, data types, and constraints for the specified table.
Useful for understanding table structure before writing queries.
Args:
table_name: Fully qualified table name (e.g., 'analytics.orders')
Returns:
Schema definition as formatted text
"""
# Implementation here
return f"Schema for {table_name}: id INT PRIMARY KEY, name VARCHAR(255)..."

L’IA voit cette description quand elle décide quel tool appeler. Rédigez des descriptions qui aident le modèle à comprendre quand et pourquoi utiliser le tool, pas seulement ce qu’il fait.

Modèles Pydantic pour les outputs structurés

Pour les valeurs de retour complexes, utilisez des modèles Pydantic afin de garantir une structure cohérente :

from pydantic import BaseModel, Field
class ValidationResult(BaseModel):
"""Result of a data quality validation check."""
table_name: str
row_count: int = Field(description="Total rows examined")
null_count: int = Field(description="Number of null values found")
duplicate_count: int = Field(description="Number of duplicate rows")
is_valid: bool = Field(description="Whether the table passed validation")
@mcp.tool()
def run_data_quality_check(table_name: str) -> ValidationResult:
"""Run comprehensive data quality checks on a table.
Validates completeness, uniqueness, and data integrity.
Args:
table_name: The table to validate
Returns:
Detailed validation results
"""
# Run actual checks against your database
return ValidationResult(
table_name=table_name,
row_count=10000,
null_count=5,
duplicate_count=0,
is_valid=True
)

FastMCP sérialise automatiquement les modèles Pydantic en JSON, et les descriptions des champs Field enrichissent le schéma de sortie.

Validation des entrées avec les schémas

Les annotations de type définissent le schéma d’entrée. Utilisez le système de types Python et Pydantic pour la validation :

from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class Environment(str, Enum):
PRODUCTION = "production"
STAGING = "staging"
DEVELOPMENT = "development"
class QueryParams(BaseModel):
"""Parameters for database queries."""
limit: int = Field(default=100, ge=1, le=10000, description="Maximum rows to return")
timeout_seconds: int = Field(default=30, ge=1, le=300, description="Query timeout")
@mcp.tool()
def run_query(
query: str,
environment: Environment = Environment.PRODUCTION,
params: Optional[QueryParams] = None
) -> str:
"""Execute a read-only SQL query.
Args:
query: SQL SELECT statement to execute
environment: Target environment
params: Optional query parameters
Returns:
Query results as JSON
"""
effective_params = params or QueryParams()
# Execute with validated parameters
return f"Results from {environment.value} (limit: {effective_params.limit})"

Les Enums créent des options de type dropdown dans les interfaces des tools. Les contraintes Field de Pydantic empêchent les entrées invalides d’atteindre votre code.

Patterns avancés

Au-delà des tools basiques, les serveurs MCP peuvent exposer des resources, des prompts et du reporting de progression.

Reporting de progression avec Context

Les opérations longues doivent reporter leur progression pour que l’IA puisse informer les utilisateurs :

from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("DataEngineering")
@mcp.tool()
async def process_large_dataset(dataset_id: str, ctx: Context) -> str:
"""Process a large dataset with progress updates.
Args:
dataset_id: Identifier of the dataset to process
ctx: MCP context for progress reporting (injected automatically)
Returns:
Processing summary
"""
await ctx.info(f"Starting processing of {dataset_id}")
total_batches = 10
for i in range(total_batches):
# Simulate batch processing
await ctx.report_progress(
progress=(i + 1) / total_batches,
total=1.0,
message=f"Processing batch {i + 1}/{total_batches}"
)
await ctx.info("Processing complete")
return f"Processed dataset {dataset_id}: 10 batches, 100,000 rows"

L’objet Context est injecté automatiquement quand vous l’incluez comme paramètre. Utilisez ctx.info(), ctx.warning() et ctx.error() pour le logging, et ctx.report_progress() pour les barres de progression.

Resources pour l’exposition des données

Les resources exposent des données que l’IA peut lire, similaires à un endpoint GET. Contrairement aux tools, les resources sont contrôlées par l’application et n’ont pas d’effets de bord :

@mcp.resource("schema://{table_name}")
def get_table_schema(table_name: str) -> str:
"""Expose table schema as a readable resource.
URI pattern: schema://analytics.orders
"""
# Fetch from your metadata store
return f"""
Table: {table_name}
Columns:
- id: INT (Primary Key)
- customer_id: INT (Foreign Key)
- total_amount: DECIMAL(10,2)
- created_at: TIMESTAMP
"""
@mcp.resource("config://pipelines/{pipeline_id}")
def get_pipeline_config(pipeline_id: str) -> str:
"""Expose pipeline configuration."""
return f"Pipeline {pipeline_id} configuration: schedule=daily, owner=data-team"

Les resources utilisent des templates d’URI avec des placeholders {variable}. L’IA peut parcourir les resources disponibles et les lire sans exécuter d’actions.

Prompts pour des templates réutilisables

Les prompts sont des templates contrôlés par l’utilisateur qui guident les interactions avec l’IA :

@mcp.prompt(title="Data Quality Report")
def data_quality_prompt(table_name: str) -> str:
"""Generate a data quality analysis prompt.
Args:
table_name: Table to analyze
"""
return f"""Analyze the data quality of the '{table_name}' table.
Please check:
1. Completeness: What percentage of required fields are populated?
2. Uniqueness: Are there duplicate records?
3. Freshness: When was the data last updated?
4. Validity: Do values conform to expected formats and ranges?
Provide a summary with specific recommendations for improvement."""
@mcp.prompt(title="Schema Review")
def schema_review_prompt(table_name: str, changes: str) -> str:
"""Generate a schema change review prompt."""
return f"""Review the proposed schema changes for {table_name}:
{changes}
Evaluate:
- Backward compatibility with existing queries
- Impact on downstream dependencies
- Performance implications
- Data migration requirements"""

Les prompts apparaissent dans l’interface du client et aident les utilisateurs à invoquer l’IA avec des requêtes structurées qu’ils peuvent réutiliser.

Configuration du transport

MCP supporte deux transports principaux, et votre choix dépend de la façon dont vous prévoyez de déployer.

stdio pour le développement local

Le transport Standard I/O exécute le serveur comme un sous-processus :

if __name__ == "__main__":
mcp.run(transport="stdio")

Configurez dans Claude Desktop (claude_desktop_config.json) :

{
"mcpServers": {
"my-data-server": {
"command": "uv",
"args": ["run", "/path/to/server.py"],
"env": {
"DATABASE_URL": "postgresql://localhost/analytics"
}
}
}
}

Ou dans Claude Code :

Terminal window
claude mcp add my-data-server -- uv run /path/to/server.py

stdio signifie zéro configuration réseau, credentials passés via des variables d’environnement, isolation des processus (chaque client obtient sa propre instance de serveur) et debugging simple avec les logs envoyés vers stderr.

HTTP Streamable pour le déploiement en production

Pour les serveurs distants ou les déploiements partagés, utilisez le transport HTTP :

mcp = FastMCP("RemoteDataServer", stateless_http=True)
if __name__ == "__main__":
mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)

Configurez le client pour se connecter :

{
"mcpServers": {
"remote-data-server": {
"type": "http",
"url": "https://mcp.yourcompany.com",
"headers": {
"Authorization": "Bearer ${MCP_API_KEY}"
}
}
}
}

HTTP nécessite une authentification (OAuth 2.1 pour la production), mais permet de servir plusieurs clients depuis une seule instance et de déployer derrière des load balancers. Il supporte également les Server-Sent Events pour les réponses en streaming.

Commencez avec stdio pour le développement et les outils internes. Passez à HTTP quand vous avez besoin d’un déploiement centralisé ou d’un accès inter-équipes.

Tests et debugging

Tester des serveurs MCP nécessite des outils qui parlent le protocole. Le MCP Inspector est indispensable ici.

MCP Inspector

L’Inspector est une interface de test interactive qui se connecte à votre serveur :

Terminal window
# Test a Python server
npx @modelcontextprotocol/inspector uv run server.py
# Test a Node.js server
npx @modelcontextprotocol/inspector node build/index.js
# Connect to a remote HTTP server
npx @modelcontextprotocol/inspector --connect https://mcp.yourcompany.com

L’Inspector lance une interface web où vous pouvez voir tous les tools, resources et prompts enregistrés, appeler des tools avec des arguments personnalisés, inspecter les messages JSON-RPC et débugger le formatage des réponses.

Logging vers stderr

L’erreur la plus courante lors de la création de serveurs MCP est d’écrire sur stdout. Le transport stdio utilise stdout pour les messages JSON-RPC. Toute autre sortie sur stdout corrompt le protocole et casse la communication.

# BAD - breaks JSON-RPC communication
print("Debug: processing query") # Goes to stdout!
print(f"Error: {e}") # Also stdout!
# GOOD - use logging to stderr
import logging
import sys
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
stream=sys.stderr # Explicitly send to stderr
)
logger = logging.getLogger(__name__)
logger.info("Debug: processing query") # Goes to stderr
logger.error(f"Error: {e}") # Also stderr

Ou utilisez l’objet Context à l’intérieur des tools :

@mcp.tool()
async def my_tool(query: str, ctx: Context) -> str:
await ctx.info("Processing query") # Proper MCP logging
# ...

Si votre serveur fonctionne en isolation mais échoue une fois connecté à un client, cherchez d’abord les instructions print() égarées. Ce problème vous mordra au moins une fois.

Workflow de test

  1. Testez unitairement votre logique métier séparément de MCP :
test_logic.py
def test_query_execution():
result = execute_query("SELECT 1", "test_db")
assert "1" in result
  1. Testez le serveur MCP avec l’Inspector :
Terminal window
npx @modelcontextprotocol/inspector uv run server.py
  1. Tests d’intégration avec un vrai client (Claude Desktop ou Claude Code) :
Terminal window
# Add to Claude Code
claude mcp add test-server -- uv run server.py
# Test interactively
claude
> List the tools available from test-server

Exemples pour la data engineering

Trois serveurs MCP pratiques qui répondent à des besoins courants en data engineering.

Serveur de catalogue de données

Ce serveur expose votre catalogue de données interne pour la découverte assistée par IA :

from mcp.server.fastmcp import FastMCP
import json
mcp = FastMCP("DataCatalogMCP")
# Simulated catalog data - replace with your actual catalog API
CATALOG = {
"sales.orders": {
"description": "Order transactions from all channels",
"columns": ["order_id", "customer_id", "total_amount", "created_at"],
"owner": "sales-team",
"tags": ["pii", "financial"],
"upstream": ["raw.shopify_orders", "raw.pos_transactions"],
"downstream": ["analytics.revenue_daily", "ml.churn_features"]
},
"sales.customers": {
"description": "Customer master data with demographics",
"columns": ["customer_id", "email", "segment", "lifetime_value"],
"owner": "marketing-team",
"tags": ["pii"],
"upstream": ["raw.crm_contacts"],
"downstream": ["sales.orders", "analytics.cohorts"]
}
}
@mcp.tool()
def search_tables(
query: str,
tags: list[str] | None = None,
limit: int = 10
) -> str:
"""Search for tables in the data catalog.
Args:
query: Search term to match against table names and descriptions
tags: Optional list of tags to filter by (e.g., ['pii', 'financial'])
limit: Maximum number of results to return
Returns:
Matching tables with descriptions
"""
results = []
for table_name, metadata in CATALOG.items():
# Simple search matching
if query.lower() in table_name.lower() or query.lower() in metadata["description"].lower():
if tags is None or any(t in metadata["tags"] for t in tags):
results.append({
"name": table_name,
"description": metadata["description"],
"owner": metadata["owner"],
"tags": metadata["tags"]
})
return json.dumps(results[:limit], indent=2)
@mcp.tool()
def get_table_details(table_name: str) -> str:
"""Get detailed metadata for a specific table.
Args:
table_name: Fully qualified table name (e.g., 'sales.orders')
Returns:
Complete table metadata including columns, owner, and tags
"""
if table_name not in CATALOG:
return json.dumps({"error": f"Table '{table_name}' not found in catalog"})
metadata = CATALOG[table_name]
return json.dumps({
"table": table_name,
**metadata
}, indent=2)
@mcp.tool()
def get_data_lineage(
table_name: str,
direction: str = "both",
depth: int = 2
) -> str:
"""Trace data lineage for a table.
Args:
table_name: Table to trace lineage for
direction: 'upstream', 'downstream', or 'both'
depth: How many levels to traverse (1-5)
Returns:
Lineage graph showing data flow
"""
if table_name not in CATALOG:
return json.dumps({"error": f"Table '{table_name}' not found"})
metadata = CATALOG[table_name]
lineage = {"table": table_name}
if direction in ("upstream", "both"):
lineage["upstream"] = metadata.get("upstream", [])
if direction in ("downstream", "both"):
lineage["downstream"] = metadata.get("downstream", [])
return json.dumps(lineage, indent=2)
if __name__ == "__main__":
mcp.run(transport="stdio")

Avec ce serveur, vous pouvez demander des choses comme “Trouve toutes les tables liées aux clients” ou “Quelles tables contiennent des données PII ?” ou “Montre-moi la lineage de la table orders.”

Serveur de monitoring de pipelines

Surveillez les pipelines, vérifiez les statuts et investiguez les échecs :

from mcp.server.fastmcp import FastMCP, Context
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
import json
mcp = FastMCP("PipelineMonitorMCP")
class PipelineStatus(str, Enum):
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
PENDING = "pending"
class PipelineRun(BaseModel):
"""Details of a pipeline execution."""
pipeline_id: str
run_id: str
status: PipelineStatus
started_at: str
finished_at: str | None = None
duration_seconds: int | None = None
error_message: str | None = None
# Simulated pipeline data - replace with Airflow/Dagster/Prefect API calls
PIPELINES = {
"etl_daily_sales": {
"last_run": PipelineRun(
pipeline_id="etl_daily_sales",
run_id="run_20260114_001",
status=PipelineStatus.SUCCESS,
started_at="2026-01-14T06:00:00Z",
finished_at="2026-01-14T06:45:00Z",
duration_seconds=2700
),
"schedule": "0 6 * * *",
"owner": "data-platform"
},
"sync_inventory": {
"last_run": PipelineRun(
pipeline_id="sync_inventory",
run_id="run_20260114_001",
status=PipelineStatus.FAILED,
started_at="2026-01-14T07:00:00Z",
finished_at="2026-01-14T07:15:00Z",
duration_seconds=900,
error_message="Connection timeout to inventory service"
),
"schedule": "0 */2 * * *",
"owner": "supply-chain"
}
}
@mcp.tool()
def get_pipeline_status(pipeline_id: str) -> PipelineRun:
"""Get the current status of a data pipeline.
Args:
pipeline_id: Unique identifier for the pipeline
Returns:
Latest run details including status, timing, and any errors
"""
if pipeline_id not in PIPELINES:
return PipelineRun(
pipeline_id=pipeline_id,
run_id="unknown",
status=PipelineStatus.PENDING,
started_at=datetime.now().isoformat(),
error_message=f"Pipeline '{pipeline_id}' not found"
)
return PIPELINES[pipeline_id]["last_run"]
@mcp.tool()
def list_failed_pipelines(hours: int = 24) -> str:
"""List all pipelines that failed recently.
Args:
hours: Look back period in hours (default: 24)
Returns:
List of failed pipelines with error details
"""
failures = []
for pipeline_id, data in PIPELINES.items():
if data["last_run"].status == PipelineStatus.FAILED:
failures.append({
"pipeline": pipeline_id,
"run_id": data["last_run"].run_id,
"error": data["last_run"].error_message,
"failed_at": data["last_run"].finished_at,
"owner": data["owner"]
})
return json.dumps(failures, indent=2)
@mcp.tool()
def list_all_pipelines() -> str:
"""List all registered pipelines with their current status.
Returns:
Summary of all pipelines
"""
summary = []
for pipeline_id, data in PIPELINES.items():
summary.append({
"pipeline": pipeline_id,
"status": data["last_run"].status.value,
"schedule": data["schedule"],
"owner": data["owner"],
"last_run": data["last_run"].started_at
})
return json.dumps(summary, indent=2)
@mcp.tool()
async def trigger_pipeline(pipeline_id: str, ctx: Context) -> str:
"""Trigger a manual run of a pipeline.
Args:
pipeline_id: Pipeline to trigger
ctx: Context for progress reporting
Returns:
Confirmation with new run ID
"""
if pipeline_id not in PIPELINES:
return json.dumps({"error": f"Pipeline '{pipeline_id}' not found"})
await ctx.info(f"Triggering pipeline: {pipeline_id}")
# In real implementation, call your orchestrator's API
new_run_id = f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return json.dumps({
"status": "triggered",
"pipeline": pipeline_id,
"run_id": new_run_id,
"message": f"Pipeline {pipeline_id} triggered successfully"
}, indent=2)
if __name__ == "__main__":
mcp.run(transport="stdio")

Maintenant vous pouvez demander “Quels pipelines ont échoué dans les dernières 24 heures ?” ou “Déclenche une ré-exécution du pipeline sync_inventory.”

Serveur de data quality

Exécutez des checks de qualité et récupérez les scores depuis votre plateforme de data quality :

from mcp.server.fastmcp import FastMCP, Context
from pydantic import BaseModel, Field
from enum import Enum
import json
mcp = FastMCP("DataQualityMCP")
class CheckType(str, Enum):
COMPLETENESS = "completeness"
UNIQUENESS = "uniqueness"
FRESHNESS = "freshness"
VALIDITY = "validity"
CONSISTENCY = "consistency"
class QualityCheckResult(BaseModel):
"""Result of a single quality check."""
check_name: str
check_type: CheckType
table_name: str
passed: bool
score: float = Field(ge=0, le=1, description="Score between 0 and 1")
rows_checked: int
issues_found: int
details: str | None = None
# Simulated quality scores - replace with Great Expectations/Elementary/custom API
QUALITY_SCORES = {
"sales.orders": {
"overall": 0.94,
"completeness": 0.99,
"uniqueness": 0.95,
"freshness": 0.88,
"validity": 0.96
},
"sales.customers": {
"overall": 0.87,
"completeness": 0.92,
"uniqueness": 0.98,
"freshness": 0.75,
"validity": 0.85
}
}
@mcp.tool()
async def run_quality_check(
table_name: str,
check_type: CheckType,
ctx: Context
) -> QualityCheckResult:
"""Run a specific data quality check on a table.
Args:
table_name: Table to validate
check_type: Type of quality check to run
ctx: Context for progress reporting
Returns:
Detailed check results
"""
await ctx.info(f"Running {check_type.value} check on {table_name}")
# Simulate check execution
await ctx.report_progress(progress=0.5, total=1.0, message="Scanning table...")
# In real implementation, call your quality framework
score = QUALITY_SCORES.get(table_name, {}).get(check_type.value, 0.5)
passed = score >= 0.9
await ctx.report_progress(progress=1.0, total=1.0, message="Check complete")
return QualityCheckResult(
check_name=f"{check_type.value}_{table_name.replace('.', '_')}",
check_type=check_type,
table_name=table_name,
passed=passed,
score=score,
rows_checked=100000,
issues_found=int((1 - score) * 100000),
details=f"{'Passed' if passed else 'Failed'}: {score:.1%} of rows meet {check_type.value} criteria"
)
@mcp.tool()
def get_quality_score(table_name: str) -> str:
"""Get the overall data quality score for a table.
Args:
table_name: Table to get scores for
Returns:
Quality scores across all dimensions
"""
if table_name not in QUALITY_SCORES:
return json.dumps({
"error": f"No quality scores found for '{table_name}'",
"available_tables": list(QUALITY_SCORES.keys())
}, indent=2)
scores = QUALITY_SCORES[table_name]
return json.dumps({
"table": table_name,
"overall_score": scores["overall"],
"dimensions": {
"completeness": scores["completeness"],
"uniqueness": scores["uniqueness"],
"freshness": scores["freshness"],
"validity": scores.get("validity", "not measured")
},
"status": "healthy" if scores["overall"] >= 0.9 else "needs attention"
}, indent=2)
@mcp.tool()
def list_quality_issues(min_severity: str = "warning") -> str:
"""List tables with data quality issues.
Args:
min_severity: Minimum severity to include ('warning' or 'critical')
Returns:
Tables that need attention
"""
issues = []
threshold = 0.8 if min_severity == "critical" else 0.9
for table_name, scores in QUALITY_SCORES.items():
if scores["overall"] < threshold:
# Find the worst dimension
dimensions = {k: v for k, v in scores.items() if k != "overall"}
worst_dim = min(dimensions, key=dimensions.get)
issues.append({
"table": table_name,
"overall_score": scores["overall"],
"severity": "critical" if scores["overall"] < 0.8 else "warning",
"primary_issue": worst_dim,
"issue_score": dimensions[worst_dim]
})
return json.dumps(sorted(issues, key=lambda x: x["overall_score"]), indent=2)
@mcp.tool()
async def run_full_validation(table_name: str, ctx: Context) -> str:
"""Run all quality checks on a table.
Args:
table_name: Table to validate
ctx: Context for progress reporting
Returns:
Complete validation report
"""
await ctx.info(f"Starting full validation of {table_name}")
results = []
checks = list(CheckType)
for i, check_type in enumerate(checks):
await ctx.report_progress(
progress=(i + 1) / len(checks),
total=1.0,
message=f"Running {check_type.value} check..."
)
result = await run_quality_check(table_name, check_type, ctx)
results.append({
"check": result.check_type.value,
"passed": result.passed,
"score": result.score
})
overall_passed = all(r["passed"] for r in results)
return json.dumps({
"table": table_name,
"validation_passed": overall_passed,
"checks": results,
"summary": f"{'All checks passed' if overall_passed else 'Some checks failed'}"
}, indent=2)
if __name__ == "__main__":
mcp.run(transport="stdio")

Cela vous permet de demander “Quel est le score de data quality pour la table orders ?” ou “Lance un check de complétude sur sales.customers” ou “Quelles tables ont des problèmes de qualité que je devrais investiguer ?”

Checklist de configuration du projet

Créer un nouveau projet de serveur MCP de zéro :

1. Initialiser le projet

Terminal window
# Create project with uv
uv init my-data-mcp-server
cd my-data-mcp-server
# Or with standard Python
mkdir my-data-mcp-server
cd my-data-mcp-server
python -m venv .venv
source .venv/bin/activate

2. Ajouter les dépendances

Terminal window
# Core MCP with CLI tools
uv add "mcp[cli]"
# Common data engineering dependencies
uv add httpx # HTTP client for API calls
uv add pydantic # Data validation (usually included with mcp)
uv add sqlalchemy # Database connections
uv add asyncpg # Async PostgreSQL (if needed)

3. Créer le fichier serveur

Terminal window
touch server.py

Commencez avec la structure de base présentée plus tôt, puis ajoutez vos tools.

4. Tester avec MCP Inspector

Terminal window
npx @modelcontextprotocol/inspector uv run server.py

Ouvrez l’URL affichée dans votre navigateur. Vérifiez que :

  • Tous les tools apparaissent dans la liste
  • Les descriptions sont claires
  • Les schémas d’entrée semblent corrects
  • Les tools s’exécutent sans erreurs

5. Installer dans Claude Desktop

Terminal window
# Quick install (creates config entry)
uv run mcp install server.py --name "My Data Server"

Ou éditez manuellement claude_desktop_config.json :

{
"mcpServers": {
"my-data-server": {
"command": "uv",
"args": ["run", "/full/path/to/server.py"],
"env": {
"DATABASE_URL": "your-connection-string"
}
}
}
}

6. Installer dans Claude Code

Terminal window
# Add to current project (creates .mcp.json)
claude mcp add my-data-server -s project -- uv run /full/path/to/server.py
# Or user-wide installation
claude mcp add my-data-server -- uv run /full/path/to/server.py

7. Tester dans le client de production

Terminal window
# With Claude Code
claude
> What tools does my-data-server provide?
> [Test each tool with realistic inputs]

Structure du projet

Un projet de serveur MCP typique ressemble à :

my-data-mcp-server/
├── pyproject.toml # Dependencies and metadata
├── server.py # Main server file
├── tools/ # Tool implementations (optional, for large servers)
│ ├── __init__.py
│ ├── catalog.py
│ └── quality.py
├── tests/ # Unit tests for business logic
│ └── test_tools.py
└── README.md # Setup and usage instructions

Serveurs existants à étudier

Avant de construire le vôtre, regardez comment ces serveurs MCP de data engineering sont structurés :

DataHub MCP

Dépôt : github.com/acryldata/mcp-server-datahub

Le serveur MCP officiel pour DataHub, le célèbre catalogue de données open source. Fonctionnalités :

  • Recherche parmi les datasets, dashboards et pipelines
  • Parcours de la lineage
  • Récupération des métadonnées
  • Exécution de requêtes SQL

À étudier pour : les patterns d’intégration de catalogue, l’implémentation de la recherche, les APIs de lineage.

dbt MCP

Dépôt : github.com/dbt-labs/dbt-mcp

L’intégration officielle dbt couverte en profondeur dans l’article tutoriel sur dbt MCP. Fonctionnalités :

  • Exécution de commandes CLI (run, test, build)
  • Requêtes sur la semantic layer
  • Découverte et lineage des modèles
  • Intégration avec l’API Cloud

À étudier pour : les patterns de wrapping CLI, l’architecture hybride local/remote, la gestion des variables d’environnement.

OpenMetadata MCP

Documentation : open-metadata.org/mcp

Intégration avec la plateforme de métadonnées entreprise avec :

  • Résultats de data profiling
  • Visualisation de la lineage
  • Métriques de qualité
  • Workflows de gouvernance

À étudier pour : les patterns entreprise, la gestion de l’authentification, les modèles de métadonnées complexes.

Elementary MCP

Site web : elementary-data.com

Intégration avec la plateforme d’observabilité des données offrant :

  • Résultats et historique des tests
  • Détection d’anomalies
  • Monitoring de la fraîcheur des données
  • Gestion des alertes

À étudier pour : les patterns d’observabilité, l’exposition de données time-series, l’intégration des alertes.

Construire sur l’existant

Quand vous étudiez ces serveurs, concentrez-vous sur la façon dont ils structurent les entrées et sorties des tools, comment ils gèrent les credentials de manière sécurisée, comment ils remontent les échecs à l’IA, et quelles options de configuration ils exposent.

Beaucoup de serveurs sont open source sous licences permissives. Forkez-les comme points de départ ou adaptez leurs patterns pour vos propres implémentations.

Prochaines étapes

Commencez petit. Choisissez un système interne que vous interrogez souvent, construisez un serveur minimal avec un ou deux tools, testez-le avec l’Inspector, puis itérez au fur et à mesure que vous découvrez vos besoins.

Pour les déploiements en production, vous aurez besoin d’authentification (OAuth 2.1 pour les serveurs HTTP distants), de rate limiting pour protéger vos systèmes backend, de logging et monitoring pour suivre l’usage, et d’une documentation claire pour que l’IA et votre équipe sachent quand utiliser chaque tool.

L’écosystème MCP grandit rapidement, et les serveurs que vous construisez aujourd’hui pour un usage interne pourraient devenir de précieuses contributions à la communauté demain.