ServicesAboutNotesContact Get in touch →
EN FR
Note

MCP Pipeline Monitoring Server Pattern

A practical MCP server pattern for pipeline monitoring — checking job status, listing failures, and triggering reruns across orchestrators like Airflow and Dagster.

Planted
mcpdata engineering

A pipeline monitoring MCP server exposes pipeline status, failure details, and trigger capabilities as conversation tools — replacing the multi-step investigation through an orchestrator UI (navigate to the DAG, find the failed task, read the logs) with direct queries.

This pattern is also a natural fit for the Cascading Agent Pattern, where an always-on monitoring agent detects failures and triggers investigation or remediation through the MCP server.

The example below uses simulated data. In production, replace the dictionary with calls to your orchestrator’s API — Airflow’s REST API, Dagster’s GraphQL endpoint, Prefect’s client library, or whatever scheduler you’re running.

The Server

from mcp.server.fastmcp import FastMCP, Context
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
import json
mcp = FastMCP("PipelineMonitorMCP")
class PipelineStatus(str, Enum):
RUNNING = "running"
SUCCESS = "success"
FAILED = "failed"
PENDING = "pending"
class PipelineRun(BaseModel):
"""Details of a pipeline execution."""
pipeline_id: str
run_id: str
status: PipelineStatus
started_at: str
finished_at: str | None = None
duration_seconds: int | None = None
error_message: str | None = None

The PipelineRun Pydantic model ensures every response has the same shape. The AI knows exactly what fields to expect, which makes it reliable at extracting and comparing values across multiple tool calls. See MCP Tool Design Patterns for more on structured output.

Core Tools

Pipeline Status

The basic “how’s this pipeline doing?” query:

@mcp.tool()
def get_pipeline_status(pipeline_id: str) -> PipelineRun:
"""Get the current status of a data pipeline.
Args:
pipeline_id: Unique identifier for the pipeline
Returns:
Latest run details including status, timing, and any errors
"""
if pipeline_id not in PIPELINES:
return PipelineRun(
pipeline_id=pipeline_id,
run_id="unknown",
status=PipelineStatus.PENDING,
started_at=datetime.now().isoformat(),
error_message=f"Pipeline '{pipeline_id}' not found"
)
return PIPELINES[pipeline_id]["last_run"]

Returning a PipelineRun even for not-found cases (rather than raising an exception) keeps the interface consistent. The AI sees a status of “pending” with an error message, which it can report gracefully rather than crashing the tool call.

Failed Pipelines

The morning triage tool — “what broke overnight?”

@mcp.tool()
def list_failed_pipelines(hours: int = 24) -> str:
"""List all pipelines that failed recently.
Args:
hours: Look back period in hours (default: 24)
Returns:
List of failed pipelines with error details
"""
failures = []
for pipeline_id, data in PIPELINES.items():
if data["last_run"].status == PipelineStatus.FAILED:
failures.append({
"pipeline": pipeline_id,
"run_id": data["last_run"].run_id,
"error": data["last_run"].error_message,
"failed_at": data["last_run"].finished_at,
"owner": data["owner"]
})
return json.dumps(failures, indent=2)

Including the owner field in failure reports means the AI can answer “whose pipeline failed?” — useful for routing incidents to the right team.

Pipeline Overview

A summary of everything that’s registered:

@mcp.tool()
def list_all_pipelines() -> str:
"""List all registered pipelines with their current status.
Returns:
Summary of all pipelines
"""
summary = []
for pipeline_id, data in PIPELINES.items():
summary.append({
"pipeline": pipeline_id,
"status": data["last_run"].status.value,
"schedule": data["schedule"],
"owner": data["owner"],
"last_run": data["last_run"].started_at
})
return json.dumps(summary, indent=2)

Pipeline Trigger

The action tool — rerun a failed pipeline:

@mcp.tool()
async def trigger_pipeline(pipeline_id: str, ctx: Context) -> str:
"""Trigger a manual run of a pipeline.
Args:
pipeline_id: Pipeline to trigger
ctx: Context for progress reporting
Returns:
Confirmation with new run ID
"""
if pipeline_id not in PIPELINES:
return json.dumps({"error": f"Pipeline '{pipeline_id}' not found"})
await ctx.info(f"Triggering pipeline: {pipeline_id}")
# In real implementation, call your orchestrator's API
new_run_id = f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return json.dumps({
"status": "triggered",
"pipeline": pipeline_id,
"run_id": new_run_id,
"message": f"Pipeline {pipeline_id} triggered successfully"
}, indent=2)
if __name__ == "__main__":
mcp.run(transport="stdio")

Note that trigger_pipeline has side effects — it starts a real pipeline run. MCP clients will prompt the user for confirmation before executing this tool. The Security Posture for AI Agents trust gradient is relevant here: triggering a pipeline is “Operator” level trust, which requires deliberate authorization.

What You Can Ask

With this server connected:

  • “What pipelines failed in the last 24 hours?”
  • “What’s the status of the daily sales ETL?”
  • “Trigger a rerun of the sync_inventory pipeline”
  • “List all pipelines owned by the data-platform team”
  • “How long did the daily sales ETL take to run?”

Production Considerations

Error message quality. The most valuable thing a monitoring server can expose is the why behind failures. “Connection timeout to inventory service” is actionable. “Task failed” is not. When wrapping your orchestrator’s API, extract the meaningful error message — often it’s buried in task logs, not the top-level status.

Historical data. The example only exposes the last run. In production, add a get_pipeline_history tool that shows the last N runs for a pipeline. Patterns like “this pipeline fails every Monday” are only visible with history.

Log access. Consider a tool that retrieves task-level logs for a specific run. When the AI sees “Connection timeout to inventory service,” it might want to dig deeper — what was the full stack trace? Which task specifically failed?

Rate limiting triggers. A trigger_pipeline tool without rate limiting means the AI could theoretically trigger the same pipeline dozens of times if it misinterprets a failure as “try again.” Add a check: if the pipeline is already running, refuse the trigger with a clear message.

Reference implementations. Study the dbt MCP server for patterns on wrapping CLI commands and handling async operations. The Elementary MCP integration demonstrates observability patterns including anomaly detection and alert management.