Skip to content

ML Pipeline Handoff

Duration: 75 minutes Difficulty: Advanced Pattern: Sequential Chain + Governance Gate

This scenario demonstrates a complete ML pipeline from data engineering through production deployment, with five personas handing off across pipeline stages and generating a model card at the end.

Scenario Overview

Problem: A customer churn prediction model needs to move from raw data through feature engineering, training, evaluation, and production deployment with proper documentation.

Goal: Execute a five-stage ML pipeline with handoffs, quality gates, and model card generation.

Persona Team

Persona ID Pipeline Stage Category
Pipeline Orchestrator POR Data pipeline design and orchestration data_engineering
Feature Architect FAR Feature engineering and selection data_engineering
Experiment Scientist ESC Model training and experiment tracking ml_lifecycle
Inference Orchestrator IOR Model serving and inference pipeline ml_lifecycle
Model Ops Steward MOS Production deployment and monitoring ml_lifecycle

Setup

from fcc.personas.registry import PersonaRegistry
from fcc.simulation.engine import SimulationEngine
from fcc.simulation.messages import SimulationMessage
from fcc.messaging.bus import EventBus
from fcc.messaging.events import Event, EventType

registry = PersonaRegistry.from_yaml_directory("src/fcc/data/personas")
bus = EventBus()
engine = SimulationEngine(registry=registry, mode="deterministic")

project = {
    "name": "Customer Churn Prediction",
    "dataset": "customer_transactions_2025",
    "target": "churned",
    "deadline": "Sprint 44",
}

Stage 1: Data Pipeline Design (POR)

The Pipeline Orchestrator designs the data ingestion and transformation pipeline:

stage1_message = SimulationMessage(
    sender="orchestrator",
    receiver="POR",
    content=(
        f"Design a data pipeline for the '{project['name']}' project.\n"
        f"Dataset: {project['dataset']}\n"
        f"Target variable: {project['target']}\n\n"
        "Requirements:\n"
        "- Ingest from PostgreSQL and S3 parquet files\n"
        "- Handle missing values and outliers\n"
        "- Apply data quality checks\n"
        "- Output cleaned dataset for feature engineering\n"
        "Produce a pipeline specification with DAG definition."
    ),
    phase="create",
)

pipeline_spec = engine.step(stage1_message)
print(f"Stage 1 (POR): Pipeline spec - {len(pipeline_spec.content)} chars")

Stage 2: Feature Engineering (FAR)

The Feature Architect designs and selects features:

stage2_message = SimulationMessage(
    sender="POR",
    receiver="FAR",
    content=(
        f"Design features for '{project['name']}' based on this "
        f"pipeline spec:\n\n{pipeline_spec.content[:500]}\n\n"
        "Requirements:\n"
        "- Create temporal features (recency, frequency, monetary)\n"
        "- Encode categorical variables\n"
        "- Engineer interaction features\n"
        "- Perform feature selection (top 50 features)\n"
        "- Document feature definitions and data lineage\n"
        "Produce a feature store specification."
    ),
    phase="create",
)

feature_spec = engine.step(stage2_message)
print(f"Stage 2 (FAR): Feature spec - {len(feature_spec.content)} chars")

Stage 3: Model Training (ESC)

The Experiment Scientist trains and evaluates models:

stage3_message = SimulationMessage(
    sender="FAR",
    receiver="ESC",
    content=(
        f"Train models for '{project['name']}' using these features:\n\n"
        f"{feature_spec.content[:500]}\n\n"
        "Requirements:\n"
        "- Train baseline (logistic regression) and advanced models\n"
        "- Use stratified 5-fold cross-validation\n"
        "- Track experiments with MLflow\n"
        "- Evaluate: AUC-ROC, precision@k, recall, F1\n"
        "- Select best model with justification\n"
        "Produce an experiment report with metrics."
    ),
    phase="create",
)

experiment_report = engine.step(stage3_message)
print(f"Stage 3 (ESC): Experiment report - {len(experiment_report.content)} chars")

Quality Gate: Model Performance

Evaluate model readiness before deployment:

from fcc.collaboration.scoring import ScoringEngine

scorer = ScoringEngine()
model_quality = scorer.score_text(experiment_report.content)
print(f"\nModel quality score: {model_quality:.2f}")

performance_threshold = 0.65
if model_quality < performance_threshold:
    print("GATE FAILED: Model performance below threshold")
    print("Action: Return to ESC for hyperparameter tuning")
else:
    print("GATE PASSED: Proceeding to inference pipeline")

Stage 4: Inference Pipeline (IOR)

The Inference Orchestrator builds the serving infrastructure:

stage4_message = SimulationMessage(
    sender="ESC",
    receiver="IOR",
    content=(
        f"Build an inference pipeline for the selected model:\n\n"
        f"{experiment_report.content[:500]}\n\n"
        "Requirements:\n"
        "- REST API endpoint for real-time predictions\n"
        "- Batch inference job for daily scoring\n"
        "- Input validation and feature transformation\n"
        "- Prediction logging for monitoring\n"
        "- Latency target: <100ms p99\n"
        "Produce an inference pipeline specification."
    ),
    phase="create",
)

inference_spec = engine.step(stage4_message)
print(f"Stage 4 (IOR): Inference spec - {len(inference_spec.content)} chars")

Stage 5: Production Deployment (MOS)

The Model Ops Steward handles production deployment and monitoring:

stage5_message = SimulationMessage(
    sender="IOR",
    receiver="MOS",
    content=(
        f"Deploy the inference pipeline to production:\n\n"
        f"{inference_spec.content[:500]}\n\n"
        "Requirements:\n"
        "- Blue-green deployment with canary release\n"
        "- Model performance monitoring (drift detection)\n"
        "- Automated rollback on performance degradation\n"
        "- Alerting for prediction distribution shifts\n"
        "- A/B test framework for model comparison\n"
        "Produce a deployment plan and monitoring spec."
    ),
    phase="create",
)

deployment_plan = engine.step(stage5_message)
print(f"Stage 5 (MOS): Deployment plan - {len(deployment_plan.content)} chars")

Model Card Generation

Compile a model card documenting the entire pipeline:

model_card = {
    "model_name": project["name"],
    "version": "1.0.0",
    "pipeline_stages": [
        {"stage": "data_pipeline", "persona": "POR",
         "summary_length": len(pipeline_spec.content)},
        {"stage": "feature_engineering", "persona": "FAR",
         "summary_length": len(feature_spec.content)},
        {"stage": "training", "persona": "ESC",
         "summary_length": len(experiment_report.content)},
        {"stage": "inference", "persona": "IOR",
         "summary_length": len(inference_spec.content)},
        {"stage": "deployment", "persona": "MOS",
         "summary_length": len(deployment_plan.content)},
    ],
    "quality_score": model_quality,
    "deployment_status": "production",
}

import json
print("\nModel Card:")
print(json.dumps(model_card, indent=2))

Exercises

  1. Add governance: Insert a GCA (Governance Compliance Auditor) gate between training and deployment.
  2. Parallel models: Use the Parallel Fan-out pattern at Stage 3 to train multiple model architectures simultaneously.
  3. Federated features: Use the federation module to import features from a partner project's feature store.
  4. Monitoring loop: Create a feedback loop from MOS back to ESC when model drift is detected.

Summary

In this scenario you executed a five-stage ML pipeline:

  • POR designed the data ingestion pipeline
  • FAR engineered and selected features
  • ESC trained, evaluated, and selected the best model
  • IOR built the inference serving infrastructure
  • MOS deployed to production with monitoring
  • A quality gate ensured model readiness before deployment
  • A model card documented the complete pipeline

Next Steps