Un serveur MCP de supervision de pipelines expose le statut des pipelines, les détails des échecs et les capacités de déclenchement sous forme d’outils conversationnels — remplaçant l’investigation en plusieurs étapes dans une interface d’orchestrateur (naviguer vers le DAG, trouver la tâche en échec, lire les logs) par des requêtes directes.
Ce pattern est aussi parfaitement adapté au Cascading Agent Pattern, où un agent de supervision permanent détecte les échecs et déclenche des investigations ou des remédiations via le serveur MCP.
L’exemple ci-dessous utilise des données simulées. En production, remplacez le dictionnaire par des appels à l’API de votre orchestrateur — l’API REST d’Airflow, le point de terminaison GraphQL de Dagster, la bibliothèque client de Prefect, ou le planificateur que vous utilisez.
Le serveur
from mcp.server.fastmcp import FastMCP, Contextfrom pydantic import BaseModel, Fieldfrom datetime import datetimefrom enum import Enumimport json
mcp = FastMCP("PipelineMonitorMCP")
class PipelineStatus(str, Enum): RUNNING = "running" SUCCESS = "success" FAILED = "failed" PENDING = "pending"
class PipelineRun(BaseModel): """Détails d'une exécution de pipeline.""" pipeline_id: str run_id: str status: PipelineStatus started_at: str finished_at: str | None = None duration_seconds: int | None = None error_message: str | None = NoneLe modèle Pydantic PipelineRun garantit que chaque réponse a la même structure. L’IA sait exactement quels champs attendre, ce qui la rend fiable pour extraire et comparer des valeurs entre plusieurs appels d’outils. Voir MCP Tool Design Patterns pour plus d’informations sur les sorties structurées.
Outils principaux
Statut de pipeline
La requête de base « comment se porte ce pipeline ? » :
@mcp.tool()def get_pipeline_status(pipeline_id: str) -> PipelineRun: """Get the current status of a data pipeline.
Args: pipeline_id: Unique identifier for the pipeline
Returns: Latest run details including status, timing, and any errors """ if pipeline_id not in PIPELINES: return PipelineRun( pipeline_id=pipeline_id, run_id="unknown", status=PipelineStatus.PENDING, started_at=datetime.now().isoformat(), error_message=f"Pipeline '{pipeline_id}' not found" )
return PIPELINES[pipeline_id]["last_run"]Renvoyer un PipelineRun même pour les cas « non trouvé » (plutôt que de lever une exception) maintient une interface cohérente. L’IA voit un statut « pending » avec un message d’erreur, qu’elle peut signaler de façon claire plutôt que de faire échouer l’appel d’outil.
Pipelines en échec
L’outil de triage matinal — « qu’est-ce qui a planté pendant la nuit ? » :
@mcp.tool()def list_failed_pipelines(hours: int = 24) -> str: """List all pipelines that failed recently.
Args: hours: Look back period in hours (default: 24)
Returns: List of failed pipelines with error details """ failures = [] for pipeline_id, data in PIPELINES.items(): if data["last_run"].status == PipelineStatus.FAILED: failures.append({ "pipeline": pipeline_id, "run_id": data["last_run"].run_id, "error": data["last_run"].error_message, "failed_at": data["last_run"].finished_at, "owner": data["owner"] })
return json.dumps(failures, indent=2)Inclure le champ owner dans les rapports d’échec permet à l’IA de répondre à « quel pipeline de quelle équipe a échoué ? » — utile pour router les incidents vers la bonne équipe.
Vue d’ensemble des pipelines
Un résumé de tout ce qui est enregistré :
@mcp.tool()def list_all_pipelines() -> str: """List all registered pipelines with their current status.
Returns: Summary of all pipelines """ summary = [] for pipeline_id, data in PIPELINES.items(): summary.append({ "pipeline": pipeline_id, "status": data["last_run"].status.value, "schedule": data["schedule"], "owner": data["owner"], "last_run": data["last_run"].started_at })
return json.dumps(summary, indent=2)Déclenchement de pipeline
L’outil d’action — relancer un pipeline en échec :
@mcp.tool()async def trigger_pipeline(pipeline_id: str, ctx: Context) -> str: """Trigger a manual run of a pipeline.
Args: pipeline_id: Pipeline to trigger ctx: Context for progress reporting
Returns: Confirmation with new run ID """ if pipeline_id not in PIPELINES: return json.dumps({"error": f"Pipeline '{pipeline_id}' not found"})
await ctx.info(f"Triggering pipeline: {pipeline_id}")
# In real implementation, call your orchestrator's API new_run_id = f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return json.dumps({ "status": "triggered", "pipeline": pipeline_id, "run_id": new_run_id, "message": f"Pipeline {pipeline_id} triggered successfully" }, indent=2)
if __name__ == "__main__": mcp.run(transport="stdio")Notez que trigger_pipeline a des effets de bord — il démarre une vraie exécution de pipeline. Les clients MCP demanderont une confirmation à l’utilisateur avant d’exécuter cet outil. Le gradient de confiance décrit dans Security Posture for AI Agents est pertinent ici : déclencher un pipeline correspond au niveau de confiance « Operator », ce qui exige une autorisation délibérée.
Ce que vous pouvez demander
Avec ce serveur connecté :
- « Quels pipelines ont échoué dans les dernières 24 heures ? »
- « Quel est le statut de l’ETL de ventes quotidien ? »
- « Déclenche une relance du pipeline sync_inventory »
- « Liste tous les pipelines appartenant à l’équipe data-platform »
- « Combien de temps a duré l’ETL de ventes quotidien ? »
Considérations pour la production
Qualité des messages d’erreur. Ce qu’un serveur de supervision peut exposer de plus précieux, c’est le pourquoi des échecs. « Connection timeout to inventory service » est actionnable. « Task failed » ne l’est pas. Lorsque vous encapsulez l’API de votre orchestrateur, extrayez le message d’erreur significatif — il est souvent enfoui dans les logs de tâche, pas dans le statut de niveau supérieur.
Données historiques. L’exemple n’expose que la dernière exécution. En production, ajoutez un outil get_pipeline_history qui affiche les N dernières exécutions d’un pipeline. Des patterns comme « ce pipeline échoue tous les lundis » ne sont visibles qu’avec l’historique.
Accès aux logs. Envisagez un outil qui récupère les logs au niveau de la tâche pour une exécution spécifique. Quand l’IA voit « Connection timeout to inventory service », elle peut vouloir creuser davantage — quelle était la trace de pile complète ? Quelle tâche spécifiquement a échoué ?
Limitation du déclenchement. Un outil trigger_pipeline sans limitation de taux signifie que l’IA pourrait théoriquement déclencher le même pipeline des dizaines de fois si elle interprète mal un échec comme « réessaye ». Ajoutez une vérification : si le pipeline est déjà en cours, refusez le déclenchement avec un message clair.
Implémentations de référence. Étudiez le serveur MCP dbt pour les patterns d’encapsulation de commandes CLI et la gestion des opérations asynchrones. L’intégration MCP Elementary illustre des patterns d’observabilité incluant la détection d’anomalies et la gestion des alertes.