Adrienne Vermorel
Créer des serveurs MCP personnalisés pour la data engineering
Quand construire son propre serveur
Avant d’écrire du code, vérifiez si quelqu’un n’a pas déjà résolu votre problème.
Avec plus de 5 800 serveurs communautaires et des intégrations officielles pour BigQuery, Snowflake, dbt et des dizaines d’autres outils data, vous pouvez souvent éviter de développer quoi que ce soit. Commencez par explorer le MCP Registry, awesome-mcp-servers et la documentation des éditeurs.
Un serveur personnalisé devient nécessaire quand votre organisation utilise des systèmes internes propriétaires : catalogues de données maison, orchestrateurs sur mesure ou APIs internes qu’aucun serveur public ne supportera jamais. C’est également le cas quand vous souhaitez combiner plusieurs sources de données dans une interface unifiée, ou quand des exigences de sécurité imposent des solutions auto-hébergées avec une gestion spécifique des credentials.
En data engineering, les cas d’usage les plus courants sont les catalogues de données internes (recherche de métadonnées, exposition de la lineage), le monitoring de pipelines (vérification du statut des jobs sur Airflow, Dagster ou tout autre scheduler), les plateformes de data quality (exécution de checks, récupération des scores depuis Great Expectations ou Elementary) et les outils de gestion des coûts.
Un serveur MCP basique représente environ 50 lignes de Python. Les SDK gèrent la complexité du protocole pour que vous puissiez vous concentrer sur votre logique métier.
Les différents SDK
MCP dispose de SDK officiels pour plusieurs langages. En data engineering, Python et TypeScript sont les choix pragmatiques.
SDK Python
Si vous travaillez déjà en Python (et c’est le cas de la plupart des data engineers), le choix s’impose de lui-même.
- Package :
mcpsur PyPI - Dépôt : github.com/modelcontextprotocol/python-sdk
- Version actuelle : v1.25.0 (v2 en cours de développement)
- Prérequis : Python 3.10+
Installation avec uv (recommandé) :
uv add "mcp[cli]"Ou avec pip :
pip install "mcp[cli]"L’extra [cli] inclut les outils en ligne de commande pour les tests et l’installation.
Le SDK intègre FastMCP, un framework haut niveau qui utilise des décorateurs pour définir les tools, resources et prompts. Il gère automatiquement la sérialisation JSON-RPC, la gestion des transports et la négociation du protocole.
SDK TypeScript
Pour les équipes disposant d’une infrastructure Node.js, ou les développeurs frontend s’orientant vers le data tooling :
- Packages :
@modelcontextprotocol/server,@modelcontextprotocol/client - Dépôt : github.com/modelcontextprotocol/typescript-sdk
- Prérequis : Node.js 18+ (22.7.5+ recommandé)
Installation :
npm install @modelcontextprotocol/server zodLe SDK TypeScript utilise Zod pour la validation des schémas, ce qui vous offre un typage solide et un bon support IDE.
Quel SDK choisir ?
| Critère | Python | TypeScript |
|---|---|---|
| Codebase existante | Pipelines data, notebooks, dbt | Applications web, services Node |
| Bibliothèques | pandas, SQLAlchemy, boto3 | Meilleur pour les workloads I/O asynchrones |
| Expertise de l’équipe | Data engineers | Développeurs full-stack |
| Déploiement | Fonctionne avec uvx, distribution facile | Packages npm, conteneurisé |
Pour la plupart des projets de data engineering, Python avec FastMCP est le chemin le plus rapide vers un serveur fonctionnel.
Structure de base d’un serveur
Voici un serveur MCP minimal en Python et TypeScript. Chacun expose un outil unique que vous pouvez tester immédiatement.
Exemple Python avec FastMCP
Créez un fichier nommé server.py :
from mcp.server.fastmcp import FastMCP
# Initialize the server with a namemcp = FastMCP("DataEngineering")
@mcp.tool()def query_database(query: str, database: str = "production") -> str: """Execute a SQL query against the specified database.
Args: query: The SQL query to execute database: Target database name (default: production)
Returns: Query results as formatted text """ # In a real implementation, you would connect to your database here return f"Query executed on {database}: {query}"
if __name__ == "__main__": mcp.run(transport="stdio")Le décorateur @mcp.tool() enregistre la fonction comme un outil MCP. FastMCP extrait la signature de la fonction pour construire le JSON Schema, utilise la docstring comme description de l’outil et gère toute la sérialisation et la communication du protocole.
Lancez le serveur :
uv run server.pyExemple TypeScript avec McpServer
Créez server.ts :
import { McpServer } from "@modelcontextprotocol/sdk/server/mcp.js";import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";import { z } from "zod";
const server = new McpServer({ name: "data-engineering-server", version: "1.0.0",});
server.registerTool( "query_database", { description: "Execute a SQL query against the specified database", inputSchema: { query: z.string().describe("The SQL query to execute"), database: z.string().default("production").describe("Target database name"), }, }, async ({ query, database }) => { // In a real implementation, connect to your database here return { content: [{ type: "text", text: `Query executed on ${database}: ${query}` }], }; });
async function main() { const transport = new StdioServerTransport(); await server.connect(transport);}
main();Compilez et exécutez :
npx tsc server.tsnode server.jsExécution avec le transport stdio
Les deux exemples utilisent le transport stdio, qui communique via les flux d’entrée/sortie standard. C’est le mode par défaut pour le développement local. Aucune configuration réseau n’est nécessaire : le client lance le serveur comme un sous-processus et la communication s’effectue via des pipes plutôt que des sockets. C’est idéal pour les applications desktop comme Claude Desktop ou Cursor.
Définir des tools
Les tools sont le moyen par lequel les serveurs MCP exposent leurs fonctionnalités. Un bon tool possède une description claire, des paramètres typés et un output prévisible.
Les docstrings comme descriptions
FastMCP extrait les descriptions des tools directement des docstrings Python :
@mcp.tool()def get_table_schema(table_name: str) -> str: """Get the schema definition for a database table.
Returns column names, data types, and constraints for the specified table. Useful for understanding table structure before writing queries.
Args: table_name: Fully qualified table name (e.g., 'analytics.orders')
Returns: Schema definition as formatted text """ # Implementation here return f"Schema for {table_name}: id INT PRIMARY KEY, name VARCHAR(255)..."L’IA voit cette description quand elle décide quel tool appeler. Rédigez des descriptions qui aident le modèle à comprendre quand et pourquoi utiliser le tool, pas seulement ce qu’il fait.
Modèles Pydantic pour les outputs structurés
Pour les valeurs de retour complexes, utilisez des modèles Pydantic afin de garantir une structure cohérente :
from pydantic import BaseModel, Field
class ValidationResult(BaseModel): """Result of a data quality validation check.""" table_name: str row_count: int = Field(description="Total rows examined") null_count: int = Field(description="Number of null values found") duplicate_count: int = Field(description="Number of duplicate rows") is_valid: bool = Field(description="Whether the table passed validation")
@mcp.tool()def run_data_quality_check(table_name: str) -> ValidationResult: """Run comprehensive data quality checks on a table.
Validates completeness, uniqueness, and data integrity.
Args: table_name: The table to validate
Returns: Detailed validation results """ # Run actual checks against your database return ValidationResult( table_name=table_name, row_count=10000, null_count=5, duplicate_count=0, is_valid=True )FastMCP sérialise automatiquement les modèles Pydantic en JSON, et les descriptions des champs Field enrichissent le schéma de sortie.
Validation des entrées avec les schémas
Les annotations de type définissent le schéma d’entrée. Utilisez le système de types Python et Pydantic pour la validation :
from enum import Enumfrom typing import Optionalfrom pydantic import BaseModel, Field
class Environment(str, Enum): PRODUCTION = "production" STAGING = "staging" DEVELOPMENT = "development"
class QueryParams(BaseModel): """Parameters for database queries.""" limit: int = Field(default=100, ge=1, le=10000, description="Maximum rows to return") timeout_seconds: int = Field(default=30, ge=1, le=300, description="Query timeout")
@mcp.tool()def run_query( query: str, environment: Environment = Environment.PRODUCTION, params: Optional[QueryParams] = None) -> str: """Execute a read-only SQL query.
Args: query: SQL SELECT statement to execute environment: Target environment params: Optional query parameters
Returns: Query results as JSON """ effective_params = params or QueryParams() # Execute with validated parameters return f"Results from {environment.value} (limit: {effective_params.limit})"Les Enums créent des options de type dropdown dans les interfaces des tools. Les contraintes Field de Pydantic empêchent les entrées invalides d’atteindre votre code.
Patterns avancés
Au-delà des tools basiques, les serveurs MCP peuvent exposer des resources, des prompts et du reporting de progression.
Reporting de progression avec Context
Les opérations longues doivent reporter leur progression pour que l’IA puisse informer les utilisateurs :
from mcp.server.fastmcp import FastMCP, Context
mcp = FastMCP("DataEngineering")
@mcp.tool()async def process_large_dataset(dataset_id: str, ctx: Context) -> str: """Process a large dataset with progress updates.
Args: dataset_id: Identifier of the dataset to process ctx: MCP context for progress reporting (injected automatically)
Returns: Processing summary """ await ctx.info(f"Starting processing of {dataset_id}")
total_batches = 10 for i in range(total_batches): # Simulate batch processing await ctx.report_progress( progress=(i + 1) / total_batches, total=1.0, message=f"Processing batch {i + 1}/{total_batches}" )
await ctx.info("Processing complete") return f"Processed dataset {dataset_id}: 10 batches, 100,000 rows"L’objet Context est injecté automatiquement quand vous l’incluez comme paramètre. Utilisez ctx.info(), ctx.warning() et ctx.error() pour le logging, et ctx.report_progress() pour les barres de progression.
Resources pour l’exposition des données
Les resources exposent des données que l’IA peut lire, similaires à un endpoint GET. Contrairement aux tools, les resources sont contrôlées par l’application et n’ont pas d’effets de bord :
@mcp.resource("schema://{table_name}")def get_table_schema(table_name: str) -> str: """Expose table schema as a readable resource.
URI pattern: schema://analytics.orders """ # Fetch from your metadata store return f""" Table: {table_name} Columns: - id: INT (Primary Key) - customer_id: INT (Foreign Key) - total_amount: DECIMAL(10,2) - created_at: TIMESTAMP """
@mcp.resource("config://pipelines/{pipeline_id}")def get_pipeline_config(pipeline_id: str) -> str: """Expose pipeline configuration.""" return f"Pipeline {pipeline_id} configuration: schedule=daily, owner=data-team"Les resources utilisent des templates d’URI avec des placeholders {variable}. L’IA peut parcourir les resources disponibles et les lire sans exécuter d’actions.
Prompts pour des templates réutilisables
Les prompts sont des templates contrôlés par l’utilisateur qui guident les interactions avec l’IA :
@mcp.prompt(title="Data Quality Report")def data_quality_prompt(table_name: str) -> str: """Generate a data quality analysis prompt.
Args: table_name: Table to analyze """ return f"""Analyze the data quality of the '{table_name}' table.
Please check:1. Completeness: What percentage of required fields are populated?2. Uniqueness: Are there duplicate records?3. Freshness: When was the data last updated?4. Validity: Do values conform to expected formats and ranges?
Provide a summary with specific recommendations for improvement."""
@mcp.prompt(title="Schema Review")def schema_review_prompt(table_name: str, changes: str) -> str: """Generate a schema change review prompt.""" return f"""Review the proposed schema changes for {table_name}:
{changes}
Evaluate:- Backward compatibility with existing queries- Impact on downstream dependencies- Performance implications- Data migration requirements"""Les prompts apparaissent dans l’interface du client et aident les utilisateurs à invoquer l’IA avec des requêtes structurées qu’ils peuvent réutiliser.
Configuration du transport
MCP supporte deux transports principaux, et votre choix dépend de la façon dont vous prévoyez de déployer.
stdio pour le développement local
Le transport Standard I/O exécute le serveur comme un sous-processus :
if __name__ == "__main__": mcp.run(transport="stdio")Configurez dans Claude Desktop (claude_desktop_config.json) :
{ "mcpServers": { "my-data-server": { "command": "uv", "args": ["run", "/path/to/server.py"], "env": { "DATABASE_URL": "postgresql://localhost/analytics" } } }}Ou dans Claude Code :
claude mcp add my-data-server -- uv run /path/to/server.pystdio signifie zéro configuration réseau, credentials passés via des variables d’environnement, isolation des processus (chaque client obtient sa propre instance de serveur) et debugging simple avec les logs envoyés vers stderr.
HTTP Streamable pour le déploiement en production
Pour les serveurs distants ou les déploiements partagés, utilisez le transport HTTP :
mcp = FastMCP("RemoteDataServer", stateless_http=True)
if __name__ == "__main__": mcp.run(transport="streamable-http", host="0.0.0.0", port=8000)Configurez le client pour se connecter :
{ "mcpServers": { "remote-data-server": { "type": "http", "url": "https://mcp.yourcompany.com", "headers": { "Authorization": "Bearer ${MCP_API_KEY}" } } }}HTTP nécessite une authentification (OAuth 2.1 pour la production), mais permet de servir plusieurs clients depuis une seule instance et de déployer derrière des load balancers. Il supporte également les Server-Sent Events pour les réponses en streaming.
Commencez avec stdio pour le développement et les outils internes. Passez à HTTP quand vous avez besoin d’un déploiement centralisé ou d’un accès inter-équipes.
Tests et debugging
Tester des serveurs MCP nécessite des outils qui parlent le protocole. Le MCP Inspector est indispensable ici.
MCP Inspector
L’Inspector est une interface de test interactive qui se connecte à votre serveur :
# Test a Python servernpx @modelcontextprotocol/inspector uv run server.py
# Test a Node.js servernpx @modelcontextprotocol/inspector node build/index.js
# Connect to a remote HTTP servernpx @modelcontextprotocol/inspector --connect https://mcp.yourcompany.comL’Inspector lance une interface web où vous pouvez voir tous les tools, resources et prompts enregistrés, appeler des tools avec des arguments personnalisés, inspecter les messages JSON-RPC et débugger le formatage des réponses.
Logging vers stderr
L’erreur la plus courante lors de la création de serveurs MCP est d’écrire sur stdout. Le transport stdio utilise stdout pour les messages JSON-RPC. Toute autre sortie sur stdout corrompt le protocole et casse la communication.
# BAD - breaks JSON-RPC communicationprint("Debug: processing query") # Goes to stdout!print(f"Error: {e}") # Also stdout!
# GOOD - use logging to stderrimport loggingimport sys
logging.basicConfig( level=logging.DEBUG, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', stream=sys.stderr # Explicitly send to stderr)logger = logging.getLogger(__name__)
logger.info("Debug: processing query") # Goes to stderrlogger.error(f"Error: {e}") # Also stderrOu utilisez l’objet Context à l’intérieur des tools :
@mcp.tool()async def my_tool(query: str, ctx: Context) -> str: await ctx.info("Processing query") # Proper MCP logging # ...Si votre serveur fonctionne en isolation mais échoue une fois connecté à un client, cherchez d’abord les instructions print() égarées. Ce problème vous mordra au moins une fois.
Workflow de test
- Testez unitairement votre logique métier séparément de MCP :
def test_query_execution(): result = execute_query("SELECT 1", "test_db") assert "1" in result- Testez le serveur MCP avec l’Inspector :
npx @modelcontextprotocol/inspector uv run server.py- Tests d’intégration avec un vrai client (Claude Desktop ou Claude Code) :
# Add to Claude Codeclaude mcp add test-server -- uv run server.py
# Test interactivelyclaude> List the tools available from test-serverExemples pour la data engineering
Trois serveurs MCP pratiques qui répondent à des besoins courants en data engineering.
Serveur de catalogue de données
Ce serveur expose votre catalogue de données interne pour la découverte assistée par IA :
from mcp.server.fastmcp import FastMCPimport json
mcp = FastMCP("DataCatalogMCP")
# Simulated catalog data - replace with your actual catalog APICATALOG = { "sales.orders": { "description": "Order transactions from all channels", "columns": ["order_id", "customer_id", "total_amount", "created_at"], "owner": "sales-team", "tags": ["pii", "financial"], "upstream": ["raw.shopify_orders", "raw.pos_transactions"], "downstream": ["analytics.revenue_daily", "ml.churn_features"] }, "sales.customers": { "description": "Customer master data with demographics", "columns": ["customer_id", "email", "segment", "lifetime_value"], "owner": "marketing-team", "tags": ["pii"], "upstream": ["raw.crm_contacts"], "downstream": ["sales.orders", "analytics.cohorts"] }}
@mcp.tool()def search_tables( query: str, tags: list[str] | None = None, limit: int = 10) -> str: """Search for tables in the data catalog.
Args: query: Search term to match against table names and descriptions tags: Optional list of tags to filter by (e.g., ['pii', 'financial']) limit: Maximum number of results to return
Returns: Matching tables with descriptions """ results = [] for table_name, metadata in CATALOG.items(): # Simple search matching if query.lower() in table_name.lower() or query.lower() in metadata["description"].lower(): if tags is None or any(t in metadata["tags"] for t in tags): results.append({ "name": table_name, "description": metadata["description"], "owner": metadata["owner"], "tags": metadata["tags"] }) return json.dumps(results[:limit], indent=2)
@mcp.tool()def get_table_details(table_name: str) -> str: """Get detailed metadata for a specific table.
Args: table_name: Fully qualified table name (e.g., 'sales.orders')
Returns: Complete table metadata including columns, owner, and tags """ if table_name not in CATALOG: return json.dumps({"error": f"Table '{table_name}' not found in catalog"})
metadata = CATALOG[table_name] return json.dumps({ "table": table_name, **metadata }, indent=2)
@mcp.tool()def get_data_lineage( table_name: str, direction: str = "both", depth: int = 2) -> str: """Trace data lineage for a table.
Args: table_name: Table to trace lineage for direction: 'upstream', 'downstream', or 'both' depth: How many levels to traverse (1-5)
Returns: Lineage graph showing data flow """ if table_name not in CATALOG: return json.dumps({"error": f"Table '{table_name}' not found"})
metadata = CATALOG[table_name] lineage = {"table": table_name}
if direction in ("upstream", "both"): lineage["upstream"] = metadata.get("upstream", []) if direction in ("downstream", "both"): lineage["downstream"] = metadata.get("downstream", [])
return json.dumps(lineage, indent=2)
if __name__ == "__main__": mcp.run(transport="stdio")Avec ce serveur, vous pouvez demander des choses comme “Trouve toutes les tables liées aux clients” ou “Quelles tables contiennent des données PII ?” ou “Montre-moi la lineage de la table orders.”
Serveur de monitoring de pipelines
Surveillez les pipelines, vérifiez les statuts et investiguez les échecs :
from mcp.server.fastmcp import FastMCP, Contextfrom pydantic import BaseModel, Fieldfrom datetime import datetimefrom enum import Enumimport json
mcp = FastMCP("PipelineMonitorMCP")
class PipelineStatus(str, Enum): RUNNING = "running" SUCCESS = "success" FAILED = "failed" PENDING = "pending"
class PipelineRun(BaseModel): """Details of a pipeline execution.""" pipeline_id: str run_id: str status: PipelineStatus started_at: str finished_at: str | None = None duration_seconds: int | None = None error_message: str | None = None
# Simulated pipeline data - replace with Airflow/Dagster/Prefect API callsPIPELINES = { "etl_daily_sales": { "last_run": PipelineRun( pipeline_id="etl_daily_sales", run_id="run_20260114_001", status=PipelineStatus.SUCCESS, started_at="2026-01-14T06:00:00Z", finished_at="2026-01-14T06:45:00Z", duration_seconds=2700 ), "schedule": "0 6 * * *", "owner": "data-platform" }, "sync_inventory": { "last_run": PipelineRun( pipeline_id="sync_inventory", run_id="run_20260114_001", status=PipelineStatus.FAILED, started_at="2026-01-14T07:00:00Z", finished_at="2026-01-14T07:15:00Z", duration_seconds=900, error_message="Connection timeout to inventory service" ), "schedule": "0 */2 * * *", "owner": "supply-chain" }}
@mcp.tool()def get_pipeline_status(pipeline_id: str) -> PipelineRun: """Get the current status of a data pipeline.
Args: pipeline_id: Unique identifier for the pipeline
Returns: Latest run details including status, timing, and any errors """ if pipeline_id not in PIPELINES: return PipelineRun( pipeline_id=pipeline_id, run_id="unknown", status=PipelineStatus.PENDING, started_at=datetime.now().isoformat(), error_message=f"Pipeline '{pipeline_id}' not found" )
return PIPELINES[pipeline_id]["last_run"]
@mcp.tool()def list_failed_pipelines(hours: int = 24) -> str: """List all pipelines that failed recently.
Args: hours: Look back period in hours (default: 24)
Returns: List of failed pipelines with error details """ failures = [] for pipeline_id, data in PIPELINES.items(): if data["last_run"].status == PipelineStatus.FAILED: failures.append({ "pipeline": pipeline_id, "run_id": data["last_run"].run_id, "error": data["last_run"].error_message, "failed_at": data["last_run"].finished_at, "owner": data["owner"] })
return json.dumps(failures, indent=2)
@mcp.tool()def list_all_pipelines() -> str: """List all registered pipelines with their current status.
Returns: Summary of all pipelines """ summary = [] for pipeline_id, data in PIPELINES.items(): summary.append({ "pipeline": pipeline_id, "status": data["last_run"].status.value, "schedule": data["schedule"], "owner": data["owner"], "last_run": data["last_run"].started_at })
return json.dumps(summary, indent=2)
@mcp.tool()async def trigger_pipeline(pipeline_id: str, ctx: Context) -> str: """Trigger a manual run of a pipeline.
Args: pipeline_id: Pipeline to trigger ctx: Context for progress reporting
Returns: Confirmation with new run ID """ if pipeline_id not in PIPELINES: return json.dumps({"error": f"Pipeline '{pipeline_id}' not found"})
await ctx.info(f"Triggering pipeline: {pipeline_id}")
# In real implementation, call your orchestrator's API new_run_id = f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return json.dumps({ "status": "triggered", "pipeline": pipeline_id, "run_id": new_run_id, "message": f"Pipeline {pipeline_id} triggered successfully" }, indent=2)
if __name__ == "__main__": mcp.run(transport="stdio")Maintenant vous pouvez demander “Quels pipelines ont échoué dans les dernières 24 heures ?” ou “Déclenche une ré-exécution du pipeline sync_inventory.”
Serveur de data quality
Exécutez des checks de qualité et récupérez les scores depuis votre plateforme de data quality :
from mcp.server.fastmcp import FastMCP, Contextfrom pydantic import BaseModel, Fieldfrom enum import Enumimport json
mcp = FastMCP("DataQualityMCP")
class CheckType(str, Enum): COMPLETENESS = "completeness" UNIQUENESS = "uniqueness" FRESHNESS = "freshness" VALIDITY = "validity" CONSISTENCY = "consistency"
class QualityCheckResult(BaseModel): """Result of a single quality check.""" check_name: str check_type: CheckType table_name: str passed: bool score: float = Field(ge=0, le=1, description="Score between 0 and 1") rows_checked: int issues_found: int details: str | None = None
# Simulated quality scores - replace with Great Expectations/Elementary/custom APIQUALITY_SCORES = { "sales.orders": { "overall": 0.94, "completeness": 0.99, "uniqueness": 0.95, "freshness": 0.88, "validity": 0.96 }, "sales.customers": { "overall": 0.87, "completeness": 0.92, "uniqueness": 0.98, "freshness": 0.75, "validity": 0.85 }}
@mcp.tool()async def run_quality_check( table_name: str, check_type: CheckType, ctx: Context) -> QualityCheckResult: """Run a specific data quality check on a table.
Args: table_name: Table to validate check_type: Type of quality check to run ctx: Context for progress reporting
Returns: Detailed check results """ await ctx.info(f"Running {check_type.value} check on {table_name}")
# Simulate check execution await ctx.report_progress(progress=0.5, total=1.0, message="Scanning table...")
# In real implementation, call your quality framework score = QUALITY_SCORES.get(table_name, {}).get(check_type.value, 0.5) passed = score >= 0.9
await ctx.report_progress(progress=1.0, total=1.0, message="Check complete")
return QualityCheckResult( check_name=f"{check_type.value}_{table_name.replace('.', '_')}", check_type=check_type, table_name=table_name, passed=passed, score=score, rows_checked=100000, issues_found=int((1 - score) * 100000), details=f"{'Passed' if passed else 'Failed'}: {score:.1%} of rows meet {check_type.value} criteria" )
@mcp.tool()def get_quality_score(table_name: str) -> str: """Get the overall data quality score for a table.
Args: table_name: Table to get scores for
Returns: Quality scores across all dimensions """ if table_name not in QUALITY_SCORES: return json.dumps({ "error": f"No quality scores found for '{table_name}'", "available_tables": list(QUALITY_SCORES.keys()) }, indent=2)
scores = QUALITY_SCORES[table_name] return json.dumps({ "table": table_name, "overall_score": scores["overall"], "dimensions": { "completeness": scores["completeness"], "uniqueness": scores["uniqueness"], "freshness": scores["freshness"], "validity": scores.get("validity", "not measured") }, "status": "healthy" if scores["overall"] >= 0.9 else "needs attention" }, indent=2)
@mcp.tool()def list_quality_issues(min_severity: str = "warning") -> str: """List tables with data quality issues.
Args: min_severity: Minimum severity to include ('warning' or 'critical')
Returns: Tables that need attention """ issues = [] threshold = 0.8 if min_severity == "critical" else 0.9
for table_name, scores in QUALITY_SCORES.items(): if scores["overall"] < threshold: # Find the worst dimension dimensions = {k: v for k, v in scores.items() if k != "overall"} worst_dim = min(dimensions, key=dimensions.get)
issues.append({ "table": table_name, "overall_score": scores["overall"], "severity": "critical" if scores["overall"] < 0.8 else "warning", "primary_issue": worst_dim, "issue_score": dimensions[worst_dim] })
return json.dumps(sorted(issues, key=lambda x: x["overall_score"]), indent=2)
@mcp.tool()async def run_full_validation(table_name: str, ctx: Context) -> str: """Run all quality checks on a table.
Args: table_name: Table to validate ctx: Context for progress reporting
Returns: Complete validation report """ await ctx.info(f"Starting full validation of {table_name}")
results = [] checks = list(CheckType)
for i, check_type in enumerate(checks): await ctx.report_progress( progress=(i + 1) / len(checks), total=1.0, message=f"Running {check_type.value} check..." )
result = await run_quality_check(table_name, check_type, ctx) results.append({ "check": result.check_type.value, "passed": result.passed, "score": result.score })
overall_passed = all(r["passed"] for r in results)
return json.dumps({ "table": table_name, "validation_passed": overall_passed, "checks": results, "summary": f"{'All checks passed' if overall_passed else 'Some checks failed'}" }, indent=2)
if __name__ == "__main__": mcp.run(transport="stdio")Cela vous permet de demander “Quel est le score de data quality pour la table orders ?” ou “Lance un check de complétude sur sales.customers” ou “Quelles tables ont des problèmes de qualité que je devrais investiguer ?”
Checklist de configuration du projet
Créer un nouveau projet de serveur MCP de zéro :
1. Initialiser le projet
# Create project with uvuv init my-data-mcp-servercd my-data-mcp-server
# Or with standard Pythonmkdir my-data-mcp-servercd my-data-mcp-serverpython -m venv .venvsource .venv/bin/activate2. Ajouter les dépendances
# Core MCP with CLI toolsuv add "mcp[cli]"
# Common data engineering dependenciesuv add httpx # HTTP client for API callsuv add pydantic # Data validation (usually included with mcp)uv add sqlalchemy # Database connectionsuv add asyncpg # Async PostgreSQL (if needed)3. Créer le fichier serveur
touch server.pyCommencez avec la structure de base présentée plus tôt, puis ajoutez vos tools.
4. Tester avec MCP Inspector
npx @modelcontextprotocol/inspector uv run server.pyOuvrez l’URL affichée dans votre navigateur. Vérifiez que :
- Tous les tools apparaissent dans la liste
- Les descriptions sont claires
- Les schémas d’entrée semblent corrects
- Les tools s’exécutent sans erreurs
5. Installer dans Claude Desktop
# Quick install (creates config entry)uv run mcp install server.py --name "My Data Server"Ou éditez manuellement claude_desktop_config.json :
{ "mcpServers": { "my-data-server": { "command": "uv", "args": ["run", "/full/path/to/server.py"], "env": { "DATABASE_URL": "your-connection-string" } } }}6. Installer dans Claude Code
# Add to current project (creates .mcp.json)claude mcp add my-data-server -s project -- uv run /full/path/to/server.py
# Or user-wide installationclaude mcp add my-data-server -- uv run /full/path/to/server.py7. Tester dans le client de production
# With Claude Codeclaude> What tools does my-data-server provide?> [Test each tool with realistic inputs]Structure du projet
Un projet de serveur MCP typique ressemble à :
my-data-mcp-server/├── pyproject.toml # Dependencies and metadata├── server.py # Main server file├── tools/ # Tool implementations (optional, for large servers)│ ├── __init__.py│ ├── catalog.py│ └── quality.py├── tests/ # Unit tests for business logic│ └── test_tools.py└── README.md # Setup and usage instructionsServeurs existants à étudier
Avant de construire le vôtre, regardez comment ces serveurs MCP de data engineering sont structurés :
DataHub MCP
Dépôt : github.com/acryldata/mcp-server-datahub
Le serveur MCP officiel pour DataHub, le célèbre catalogue de données open source. Fonctionnalités :
- Recherche parmi les datasets, dashboards et pipelines
- Parcours de la lineage
- Récupération des métadonnées
- Exécution de requêtes SQL
À étudier pour : les patterns d’intégration de catalogue, l’implémentation de la recherche, les APIs de lineage.
dbt MCP
Dépôt : github.com/dbt-labs/dbt-mcp
L’intégration officielle dbt couverte en profondeur dans l’article tutoriel sur dbt MCP. Fonctionnalités :
- Exécution de commandes CLI (run, test, build)
- Requêtes sur la semantic layer
- Découverte et lineage des modèles
- Intégration avec l’API Cloud
À étudier pour : les patterns de wrapping CLI, l’architecture hybride local/remote, la gestion des variables d’environnement.
OpenMetadata MCP
Documentation : open-metadata.org/mcp
Intégration avec la plateforme de métadonnées entreprise avec :
- Résultats de data profiling
- Visualisation de la lineage
- Métriques de qualité
- Workflows de gouvernance
À étudier pour : les patterns entreprise, la gestion de l’authentification, les modèles de métadonnées complexes.
Elementary MCP
Site web : elementary-data.com
Intégration avec la plateforme d’observabilité des données offrant :
- Résultats et historique des tests
- Détection d’anomalies
- Monitoring de la fraîcheur des données
- Gestion des alertes
À étudier pour : les patterns d’observabilité, l’exposition de données time-series, l’intégration des alertes.
Construire sur l’existant
Quand vous étudiez ces serveurs, concentrez-vous sur la façon dont ils structurent les entrées et sorties des tools, comment ils gèrent les credentials de manière sécurisée, comment ils remontent les échecs à l’IA, et quelles options de configuration ils exposent.
Beaucoup de serveurs sont open source sous licences permissives. Forkez-les comme points de départ ou adaptez leurs patterns pour vos propres implémentations.
Prochaines étapes
Commencez petit. Choisissez un système interne que vous interrogez souvent, construisez un serveur minimal avec un ou deux tools, testez-le avec l’Inspector, puis itérez au fur et à mesure que vous découvrez vos besoins.
Pour les déploiements en production, vous aurez besoin d’authentification (OAuth 2.1 pour les serveurs HTTP distants), de rate limiting pour protéger vos systèmes backend, de logging et monitoring pour suivre l’usage, et d’une documentation claire pour que l’IA et votre équipe sachent quand utiliser chaque tool.
L’écosystème MCP grandit rapidement, et les serveurs que vous construisez aujourd’hui pour un usage interne pourraient devenir de précieuses contributions à la communauté demain.