ML Pipeline Handoff¶
Duration: 75 minutes Difficulty: Advanced Pattern: Sequential Chain + Governance Gate
This scenario demonstrates a complete ML pipeline from data engineering through production deployment, with five personas handing off across pipeline stages and generating a model card at the end.
Scenario Overview¶
Problem: A customer churn prediction model needs to move from raw data through feature engineering, training, evaluation, and production deployment with proper documentation.
Goal: Execute a five-stage ML pipeline with handoffs, quality gates, and model card generation.
Persona Team¶
| Persona | ID | Pipeline Stage | Category |
|---|---|---|---|
| Pipeline Orchestrator | POR | Data pipeline design and orchestration | data_engineering |
| Feature Architect | FAR | Feature engineering and selection | data_engineering |
| Experiment Scientist | ESC | Model training and experiment tracking | ml_lifecycle |
| Inference Orchestrator | IOR | Model serving and inference pipeline | ml_lifecycle |
| Model Ops Steward | MOS | Production deployment and monitoring | ml_lifecycle |
Setup¶
from fcc.personas.registry import PersonaRegistry
from fcc.simulation.engine import SimulationEngine
from fcc.simulation.messages import SimulationMessage
from fcc.messaging.bus import EventBus
from fcc.messaging.events import Event, EventType
registry = PersonaRegistry.from_yaml_directory("src/fcc/data/personas")
bus = EventBus()
engine = SimulationEngine(registry=registry, mode="deterministic")
project = {
"name": "Customer Churn Prediction",
"dataset": "customer_transactions_2025",
"target": "churned",
"deadline": "Sprint 44",
}
Stage 1: Data Pipeline Design (POR)¶
The Pipeline Orchestrator designs the data ingestion and transformation pipeline:
stage1_message = SimulationMessage(
sender="orchestrator",
receiver="POR",
content=(
f"Design a data pipeline for the '{project['name']}' project.\n"
f"Dataset: {project['dataset']}\n"
f"Target variable: {project['target']}\n\n"
"Requirements:\n"
"- Ingest from PostgreSQL and S3 parquet files\n"
"- Handle missing values and outliers\n"
"- Apply data quality checks\n"
"- Output cleaned dataset for feature engineering\n"
"Produce a pipeline specification with DAG definition."
),
phase="create",
)
pipeline_spec = engine.step(stage1_message)
print(f"Stage 1 (POR): Pipeline spec - {len(pipeline_spec.content)} chars")
Stage 2: Feature Engineering (FAR)¶
The Feature Architect designs and selects features:
stage2_message = SimulationMessage(
sender="POR",
receiver="FAR",
content=(
f"Design features for '{project['name']}' based on this "
f"pipeline spec:\n\n{pipeline_spec.content[:500]}\n\n"
"Requirements:\n"
"- Create temporal features (recency, frequency, monetary)\n"
"- Encode categorical variables\n"
"- Engineer interaction features\n"
"- Perform feature selection (top 50 features)\n"
"- Document feature definitions and data lineage\n"
"Produce a feature store specification."
),
phase="create",
)
feature_spec = engine.step(stage2_message)
print(f"Stage 2 (FAR): Feature spec - {len(feature_spec.content)} chars")
Stage 3: Model Training (ESC)¶
The Experiment Scientist trains and evaluates models:
stage3_message = SimulationMessage(
sender="FAR",
receiver="ESC",
content=(
f"Train models for '{project['name']}' using these features:\n\n"
f"{feature_spec.content[:500]}\n\n"
"Requirements:\n"
"- Train baseline (logistic regression) and advanced models\n"
"- Use stratified 5-fold cross-validation\n"
"- Track experiments with MLflow\n"
"- Evaluate: AUC-ROC, precision@k, recall, F1\n"
"- Select best model with justification\n"
"Produce an experiment report with metrics."
),
phase="create",
)
experiment_report = engine.step(stage3_message)
print(f"Stage 3 (ESC): Experiment report - {len(experiment_report.content)} chars")
Quality Gate: Model Performance¶
Evaluate model readiness before deployment:
from fcc.collaboration.scoring import ScoringEngine
scorer = ScoringEngine()
model_quality = scorer.score_text(experiment_report.content)
print(f"\nModel quality score: {model_quality:.2f}")
performance_threshold = 0.65
if model_quality < performance_threshold:
print("GATE FAILED: Model performance below threshold")
print("Action: Return to ESC for hyperparameter tuning")
else:
print("GATE PASSED: Proceeding to inference pipeline")
Stage 4: Inference Pipeline (IOR)¶
The Inference Orchestrator builds the serving infrastructure:
stage4_message = SimulationMessage(
sender="ESC",
receiver="IOR",
content=(
f"Build an inference pipeline for the selected model:\n\n"
f"{experiment_report.content[:500]}\n\n"
"Requirements:\n"
"- REST API endpoint for real-time predictions\n"
"- Batch inference job for daily scoring\n"
"- Input validation and feature transformation\n"
"- Prediction logging for monitoring\n"
"- Latency target: <100ms p99\n"
"Produce an inference pipeline specification."
),
phase="create",
)
inference_spec = engine.step(stage4_message)
print(f"Stage 4 (IOR): Inference spec - {len(inference_spec.content)} chars")
Stage 5: Production Deployment (MOS)¶
The Model Ops Steward handles production deployment and monitoring:
stage5_message = SimulationMessage(
sender="IOR",
receiver="MOS",
content=(
f"Deploy the inference pipeline to production:\n\n"
f"{inference_spec.content[:500]}\n\n"
"Requirements:\n"
"- Blue-green deployment with canary release\n"
"- Model performance monitoring (drift detection)\n"
"- Automated rollback on performance degradation\n"
"- Alerting for prediction distribution shifts\n"
"- A/B test framework for model comparison\n"
"Produce a deployment plan and monitoring spec."
),
phase="create",
)
deployment_plan = engine.step(stage5_message)
print(f"Stage 5 (MOS): Deployment plan - {len(deployment_plan.content)} chars")
Model Card Generation¶
Compile a model card documenting the entire pipeline:
model_card = {
"model_name": project["name"],
"version": "1.0.0",
"pipeline_stages": [
{"stage": "data_pipeline", "persona": "POR",
"summary_length": len(pipeline_spec.content)},
{"stage": "feature_engineering", "persona": "FAR",
"summary_length": len(feature_spec.content)},
{"stage": "training", "persona": "ESC",
"summary_length": len(experiment_report.content)},
{"stage": "inference", "persona": "IOR",
"summary_length": len(inference_spec.content)},
{"stage": "deployment", "persona": "MOS",
"summary_length": len(deployment_plan.content)},
],
"quality_score": model_quality,
"deployment_status": "production",
}
import json
print("\nModel Card:")
print(json.dumps(model_card, indent=2))
Exercises¶
- Add governance: Insert a GCA (Governance Compliance Auditor) gate between training and deployment.
- Parallel models: Use the Parallel Fan-out pattern at Stage 3 to train multiple model architectures simultaneously.
- Federated features: Use the federation module to import features from a partner project's feature store.
- Monitoring loop: Create a feedback loop from MOS back to ESC when model drift is detected.
Summary¶
In this scenario you executed a five-stage ML pipeline:
- POR designed the data ingestion pipeline
- FAR engineered and selected features
- ESC trained, evaluated, and selected the best model
- IOR built the inference serving infrastructure
- MOS deployed to production with monitoring
- A quality gate ensured model readiness before deployment
- A model card documented the complete pipeline
Next Steps¶
- Model Selection Competition -- Parallel model comparison
- DevOps Deployment Chain -- CI/CD automation