A pipeline monitoring MCP server exposes pipeline status, failure details, and trigger capabilities as conversation tools — replacing the multi-step investigation through an orchestrator UI (navigate to the DAG, find the failed task, read the logs) with direct queries.
This pattern is also a natural fit for the Cascading Agent Pattern, where an always-on monitoring agent detects failures and triggers investigation or remediation through the MCP server.
The example below uses simulated data. In production, replace the dictionary with calls to your orchestrator’s API — Airflow’s REST API, Dagster’s GraphQL endpoint, Prefect’s client library, or whatever scheduler you’re running.
The Server
from mcp.server.fastmcp import FastMCP, Contextfrom pydantic import BaseModel, Fieldfrom datetime import datetimefrom enum import Enumimport json
mcp = FastMCP("PipelineMonitorMCP")
class PipelineStatus(str, Enum): RUNNING = "running" SUCCESS = "success" FAILED = "failed" PENDING = "pending"
class PipelineRun(BaseModel): """Details of a pipeline execution.""" pipeline_id: str run_id: str status: PipelineStatus started_at: str finished_at: str | None = None duration_seconds: int | None = None error_message: str | None = NoneThe PipelineRun Pydantic model ensures every response has the same shape. The AI knows exactly what fields to expect, which makes it reliable at extracting and comparing values across multiple tool calls. See MCP Tool Design Patterns for more on structured output.
Core Tools
Pipeline Status
The basic “how’s this pipeline doing?” query:
@mcp.tool()def get_pipeline_status(pipeline_id: str) -> PipelineRun: """Get the current status of a data pipeline.
Args: pipeline_id: Unique identifier for the pipeline
Returns: Latest run details including status, timing, and any errors """ if pipeline_id not in PIPELINES: return PipelineRun( pipeline_id=pipeline_id, run_id="unknown", status=PipelineStatus.PENDING, started_at=datetime.now().isoformat(), error_message=f"Pipeline '{pipeline_id}' not found" )
return PIPELINES[pipeline_id]["last_run"]Returning a PipelineRun even for not-found cases (rather than raising an exception) keeps the interface consistent. The AI sees a status of “pending” with an error message, which it can report gracefully rather than crashing the tool call.
Failed Pipelines
The morning triage tool — “what broke overnight?”
@mcp.tool()def list_failed_pipelines(hours: int = 24) -> str: """List all pipelines that failed recently.
Args: hours: Look back period in hours (default: 24)
Returns: List of failed pipelines with error details """ failures = [] for pipeline_id, data in PIPELINES.items(): if data["last_run"].status == PipelineStatus.FAILED: failures.append({ "pipeline": pipeline_id, "run_id": data["last_run"].run_id, "error": data["last_run"].error_message, "failed_at": data["last_run"].finished_at, "owner": data["owner"] })
return json.dumps(failures, indent=2)Including the owner field in failure reports means the AI can answer “whose pipeline failed?” — useful for routing incidents to the right team.
Pipeline Overview
A summary of everything that’s registered:
@mcp.tool()def list_all_pipelines() -> str: """List all registered pipelines with their current status.
Returns: Summary of all pipelines """ summary = [] for pipeline_id, data in PIPELINES.items(): summary.append({ "pipeline": pipeline_id, "status": data["last_run"].status.value, "schedule": data["schedule"], "owner": data["owner"], "last_run": data["last_run"].started_at })
return json.dumps(summary, indent=2)Pipeline Trigger
The action tool — rerun a failed pipeline:
@mcp.tool()async def trigger_pipeline(pipeline_id: str, ctx: Context) -> str: """Trigger a manual run of a pipeline.
Args: pipeline_id: Pipeline to trigger ctx: Context for progress reporting
Returns: Confirmation with new run ID """ if pipeline_id not in PIPELINES: return json.dumps({"error": f"Pipeline '{pipeline_id}' not found"})
await ctx.info(f"Triggering pipeline: {pipeline_id}")
# In real implementation, call your orchestrator's API new_run_id = f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
return json.dumps({ "status": "triggered", "pipeline": pipeline_id, "run_id": new_run_id, "message": f"Pipeline {pipeline_id} triggered successfully" }, indent=2)
if __name__ == "__main__": mcp.run(transport="stdio")Note that trigger_pipeline has side effects — it starts a real pipeline run. MCP clients will prompt the user for confirmation before executing this tool. The Security Posture for AI Agents trust gradient is relevant here: triggering a pipeline is “Operator” level trust, which requires deliberate authorization.
What You Can Ask
With this server connected:
- “What pipelines failed in the last 24 hours?”
- “What’s the status of the daily sales ETL?”
- “Trigger a rerun of the sync_inventory pipeline”
- “List all pipelines owned by the data-platform team”
- “How long did the daily sales ETL take to run?”
Production Considerations
Error message quality. The most valuable thing a monitoring server can expose is the why behind failures. “Connection timeout to inventory service” is actionable. “Task failed” is not. When wrapping your orchestrator’s API, extract the meaningful error message — often it’s buried in task logs, not the top-level status.
Historical data. The example only exposes the last run. In production, add a get_pipeline_history tool that shows the last N runs for a pipeline. Patterns like “this pipeline fails every Monday” are only visible with history.
Log access. Consider a tool that retrieves task-level logs for a specific run. When the AI sees “Connection timeout to inventory service,” it might want to dig deeper — what was the full stack trace? Which task specifically failed?
Rate limiting triggers. A trigger_pipeline tool without rate limiting means the AI could theoretically trigger the same pipeline dozens of times if it misinterprets a failure as “try again.” Add a check: if the pipeline is already running, refuse the trigger with a clear message.
Reference implementations. Study the dbt MCP server for patterns on wrapping CLI commands and handling async operations. The Elementary MCP integration demonstrates observability patterns including anomaly detection and alert management.