Building Custom Pipelines¶
This tutorial shows how to compose FCC library components into multi-step pipelines that go beyond the standard CLI workflow. You will learn to filter personas, run targeted simulations, generate selective documentation, validate results, and integrate with external tools.
Pipeline 1: Category-Filtered Simulation¶
Run a simulation using only personas from specific categories.
from fcc._resources import get_personas_dir, get_workflows_dir
from fcc.personas.registry import PersonaRegistry
from fcc.workflow.graph import WorkflowGraph
from fcc.workflow.models import WorkflowMeta, WorkflowNode, WorkflowEdge
from fcc.simulation.ai_client import AIClient, AIProvider
from fcc.simulation.ai_engine import AISimulationEngine
# Load registry
registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
# Filter to core + governance personas only
selected_categories = {"core", "governance"}
selected = [p for p in registry if p.category in selected_categories]
filtered_registry = PersonaRegistry(selected)
print(f"Selected {len(filtered_registry)} personas from {selected_categories}")
# Build a custom linear graph from the filtered personas
nodes = [WorkflowNode(id=p.id, name=p.name, type="persona") for p in filtered_registry]
edges = []
persona_list = list(filtered_registry)
for i in range(len(persona_list) - 1):
edges.append(WorkflowEdge(
from_id=persona_list[i].id,
to_id=persona_list[i + 1].id,
label=f"{persona_list[i].id}_to_{persona_list[i + 1].id}",
type="handoff",
))
meta = WorkflowMeta(id="custom_filtered", title="Core + Governance Pipeline")
graph = WorkflowGraph(meta=meta, nodes=nodes, edges=edges)
# Simulate
client = AIClient(provider=AIProvider.MOCK)
engine = AISimulationEngine(
graph=graph,
ai_client=client,
registry=filtered_registry,
max_steps=30,
)
result = engine.run(
start_node=persona_list[0].id,
initial_payload="Security audit of API endpoints",
scenario_id="FILTERED-001",
)
print(f"Steps: {result.total_steps}, AI calls: {result.total_ai_calls}")
Pipeline 2: Multi-Scenario Batch Runner¶
Run all scenarios from a directory and collect aggregated results.
from fcc._resources import get_personas_dir, get_workflows_dir, get_scenarios_dir
from fcc.personas.registry import PersonaRegistry
from fcc.workflow.graph import WorkflowGraph
from fcc.simulation.ai_client import AIClient, AIProvider
from fcc.simulation.ai_engine import AISimulationEngine, write_ai_traces
registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
graph = WorkflowGraph.from_json(get_workflows_dir() / "base_sequence.json")
client = AIClient(provider=AIProvider.MOCK)
engine = AISimulationEngine(graph=graph, ai_client=client, registry=registry)
from fcc.scenarios.loader import ScenarioLoader
loader = ScenarioLoader.from_directory(get_scenarios_dir())
all_results = []
for scenario in loader:
start_node = scenario.get("setup", {}).get("start_node", "RC")
result = engine.run(
start_node=start_node,
initial_payload=scenario["name"],
scenario_id=scenario["id"],
scenario_name=scenario["name"],
)
all_results.append(result)
status = "OK" if result.success else "FAIL"
print(f" [{status}] {scenario['id']}: {result.total_steps} steps")
# Write all traces to one file
write_ai_traces(all_results, "batch_traces.json")
# Summary
passed = sum(1 for r in all_results if r.success)
print(f"\nBatch complete: {passed}/{len(all_results)} passed")
total_tokens = sum(r.total_tokens for r in all_results)
print(f"Total tokens used: {total_tokens}")
Pipeline 3: Simulate, Validate, then Generate Docs¶
Chain simulation into validation and conditionally generate documentation only when validation passes.
from pathlib import Path
from fcc._resources import get_data_dir, get_personas_dir, get_workflows_dir
from fcc.personas.registry import PersonaRegistry
from fcc.workflow.graph import WorkflowGraph
from fcc.simulation.ai_client import AIClient, AIProvider
from fcc.simulation.ai_engine import AISimulationEngine
from fcc.scenarios.validators import FCCValidator
from fcc.scaffold.doc_generator import DocGenerator
# Setup
registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
graph = WorkflowGraph.from_json(get_workflows_dir() / "base_sequence.json")
client = AIClient(provider=AIProvider.MOCK)
engine = AISimulationEngine(graph=graph, ai_client=client, registry=registry)
# Step 1: Simulate
result = engine.run(
start_node="RC",
initial_payload="Design API versioning strategy",
scenario_id="PIPE-001",
)
# Step 2: Extract participating personas and validate
actors = [event["actor_id"] for event in result.events]
unique_actors = list(dict.fromkeys(actors)) # preserve order, deduplicate
validator = FCCValidator.from_registry(registry)
# Map actor IDs to full names for FCC cycle validation
actor_names = [registry.get(aid).name for aid in unique_actors if aid in registry]
cycle_result = validator.validate_fcc_cycle(actor_names)
category_result = validator.validate_cross_category_coverage(unique_actors, min_categories=2)
governance_result = validator.validate_governance_gate(unique_actors)
termination_result = validator.validate_feedback_loop_termination(result.total_steps, 50)
all_validations = [cycle_result, category_result, governance_result, termination_result]
all_passed = all(v.passed for v in all_validations)
print("Validation results:")
for v in all_validations:
status = "PASS" if v.passed else "FAIL"
print(f" [{status}] {v.rule}: {v.message}")
# Step 3: Generate docs only if validation passes
if all_passed:
gen = DocGenerator(registry, data_dir=get_data_dir())
total = gen.generate_all_docs(Path("validated_docs"), persona_ids=unique_actors)
print(f"\nGenerated {total} doc files for validated personas")
else:
print("\nSkipping doc generation -- validation failures detected")
Pipeline 4: Cross-Reference Analysis¶
Analyze persona interaction patterns to find bottlenecks, isolated personas, or heavily-loaded coordination hubs.
from collections import Counter
from fcc._resources import get_personas_dir
from fcc.personas.registry import PersonaRegistry
from fcc.personas.cross_reference import CrossReferenceMatrix
registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
matrix = CrossReferenceMatrix.from_personas(registry)
# Count interactions per persona (in + out)
interaction_count = Counter()
for entry in matrix:
interaction_count[entry.source_id] += 1
interaction_count[entry.target_id] += 1
print("Persona interaction frequency:")
for pid, count in interaction_count.most_common():
persona = registry.get(pid)
print(f" {persona.name} ({pid}): {count} interactions")
# Find isolated personas (no interactions)
all_ids = set(registry.ids)
connected_ids = matrix.all_persona_ids()
isolated = all_ids - connected_ids
if isolated:
print(f"\nIsolated personas: {isolated}")
else:
print(f"\nAll {len(all_ids)} personas are connected")
# Relationship type distribution
type_counts = Counter(entry.relationship_type for entry in matrix)
print(f"\nRelationship types: {dict(type_counts)}")
# Find the heaviest handoff chains
handoffs = matrix.by_type("handoff")
print(f"\nHandoff chains ({len(handoffs)} total):")
for h in handoffs[:10]:
src = registry.get(h.source_id).name
tgt = registry.get(h.target_id).name
print(f" {src} -> {tgt}: {h.interaction}")
Pipeline 5: Champion Orchestration Audit¶
Verify that each champion persona properly orchestrates its declared team.
from fcc._resources import get_personas_dir
from fcc.personas.registry import PersonaRegistry
from fcc.scenarios.validators import FCCValidator
registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
validator = FCCValidator.from_registry(registry)
for champion in registry.champions():
base = registry.base_of(champion.id)
print(f"\n{champion.name} ({champion.id}) -> champions {base.name}")
print(f" Orchestrates: {champion.orchestrates}")
# Simulate that all orchestrated personas executed
result = validator.validate_champion_orchestration(
champion_id=champion.id,
orchestrated_personas=champion.orchestrates,
personas_executed=champion.orchestrates, # full coverage
)
print(f" Full coverage: {result.passed}")
# Simulate partial coverage (missing last persona)
if champion.orchestrates:
partial = champion.orchestrates[:-1]
result = validator.validate_champion_orchestration(
champion_id=champion.id,
orchestrated_personas=champion.orchestrates,
personas_executed=partial,
)
print(f" Partial coverage: {result.passed} ({result.message})")
Pipeline 6: Quality Gate Dashboard¶
Run all quality gates and produce a summary report.
from fcc._resources import get_governance_dir
from fcc.governance.quality_gates import QualityGateRunner
runner = QualityGateRunner.from_yaml(
get_governance_dir() / "quality_gates.yaml"
)
# Simulate artifact dicts (in production, these come from real deliverables)
# Each key is a check name, value is bool pass/fail
sample_artifacts = {}
for gate in runner.gates:
# Simulate 80% pass rate for demonstration
checks_dict = {}
for i, check in enumerate(gate.checks):
checks_dict[check] = (i % 5 != 0) # fail every 5th check
sample_artifacts[gate.persona_id] = checks_dict
results = runner.run_all(sample_artifacts)
# Dashboard
total_gates = len(results)
passed_gates = sum(1 for r in results if r.passed)
print(f"Quality Gate Dashboard: {passed_gates}/{total_gates} passed")
print(f"Pass rate: {passed_gates / total_gates:.0%}\n")
for r in results:
status = "PASS" if r.passed else "FAIL"
print(f" [{status}] {r.gate_id}: {r.checks_passed}/{r.checks_total}")
Pipeline 7: Selective Doc Generation with Tag Filtering¶
Use the tag registry to generate documentation only for personas that match specific capability tags.
from pathlib import Path
from fcc._resources import get_data_dir, get_governance_dir, get_personas_dir
from fcc.governance.tags import TagRegistry
from fcc.personas.registry import PersonaRegistry
from fcc.scaffold.doc_generator import DocGenerator
registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
tags = TagRegistry.from_yaml(get_governance_dir() / "tag_registry.yaml")
# Find all tags in the "research" supercategory
research_tags = tags.by_supercategory("research")
print(f"Research tags: {[t.capability for t in research_tags]}")
# Select personas in the "Find" phase (research-related)
find_personas = registry.by_phase("Find")
find_ids = [p.id for p in find_personas]
print(f"Find-phase personas: {find_ids}")
# Generate docs for just those personas
gen = DocGenerator(registry, data_dir=get_data_dir())
total = gen.generate_all_docs(Path("research_docs"), persona_ids=find_ids)
print(f"Generated {total} files for research personas")
Pipeline 8: Combining with External Tools¶
FCC simulation results are plain dicts and JSON files. You can pipe them into any external tool or framework.
Export to pandas DataFrame¶
from fcc.simulation.ai_engine import AISimulationEngine, AISimulationResult
# Assuming `result` is an AISimulationResult from a simulation run
# Convert events to a pandas DataFrame:
import pandas as pd # requires: pip install pandas
df = pd.DataFrame(result.events)
print(df[["step", "actor", "actor_id", "edge_label", "history_len"]].to_string())
# Analyze step distribution by actor
print(df.groupby("actor_id").size().sort_values(ascending=False))
Export Cross-Reference Matrix as CSV¶
import csv
from fcc._resources import get_personas_dir
from fcc.personas.cross_reference import CrossReferenceMatrix
matrix = CrossReferenceMatrix.from_yaml(
get_personas_dir() / "cross_reference.yaml"
)
with open("cross_references.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["source_id", "target_id", "type", "interaction", "strength"])
for entry in matrix:
writer.writerow([
entry.source_id,
entry.target_id,
entry.relationship_type,
entry.interaction,
entry.strength,
])
print("Exported cross-references to cross_references.csv")
Generate a NetworkX Graph from WorkflowGraph¶
import networkx as nx # requires: pip install networkx
from fcc._resources import get_workflows_dir
from fcc.workflow.graph import WorkflowGraph
fcc_graph = WorkflowGraph.from_json(get_workflows_dir() / "base_sequence.json")
G = nx.DiGraph()
for node in fcc_graph.nodes:
G.add_node(node.id, name=node.name, type=node.type)
for edge in fcc_graph.edges:
G.add_edge(edge.from_id, edge.to_id, label=edge.label, type=edge.type)
print(f"NetworkX graph: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges")
print(f"Is DAG (handoffs only): {nx.is_directed_acyclic_graph(G)}")
# Find shortest path
if nx.has_path(G, "RC", "DE"):
path = nx.shortest_path(G, "RC", "DE")
print(f"Shortest path RC -> DE: {path}")
Pipeline 9: End-to-End Automated Report¶
Combine all pipeline patterns into a single automated report.
#!/usr/bin/env python3
"""Automated FCC pipeline report."""
import json
from pathlib import Path
from fcc._resources import (
get_data_dir,
get_governance_dir,
get_personas_dir,
get_scenarios_dir,
get_workflows_dir,
)
from fcc.governance.quality_gates import QualityGateRunner
from fcc.governance.tags import TagRegistry
from fcc.personas.cross_reference import CrossReferenceMatrix
from fcc.personas.registry import PersonaRegistry
from fcc.scaffold.doc_generator import DocGenerator
from fcc.scenarios.loader import ScenarioLoader
from fcc.scenarios.validators import FCCValidator
from fcc.simulation.ai_client import AIClient, AIProvider
from fcc.simulation.ai_engine import AISimulationEngine, write_ai_traces
# ---- Setup ----
registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
graph = WorkflowGraph.from_json(get_workflows_dir() / "base_sequence.json")
client = AIClient(provider=AIProvider.MOCK)
engine = AISimulationEngine(graph=graph, ai_client=client, registry=registry)
validator = FCCValidator.from_registry(registry)
loader = ScenarioLoader.from_directory(get_scenarios_dir())
report = {
"personas": len(registry),
"scenarios_available": len(loader),
"results": [],
}
# ---- Run all scenarios ----
for scenario in loader:
start_node = scenario.get("setup", {}).get("start_node", "RC")
sim_result = engine.run(
start_node=start_node,
initial_payload=scenario["name"],
scenario_id=scenario["id"],
scenario_name=scenario["name"],
)
# Validate
actors = list({e["actor_id"] for e in sim_result.events})
actor_names = [registry.get(a).name for a in actors if a in registry]
cycle = validator.validate_fcc_cycle(actor_names)
report["results"].append({
"scenario_id": scenario["id"],
"steps": sim_result.total_steps,
"ai_calls": sim_result.total_ai_calls,
"success": sim_result.success,
"fcc_cycle_valid": cycle.passed,
"phases_covered": cycle.details.get("phases_covered", []),
})
# ---- Write report ----
with open("pipeline_report.json", "w") as f:
json.dump(report, f, indent=2)
print(f"Report written: {len(report['results'])} scenarios evaluated")
passed = sum(1 for r in report["results"] if r["success"] and r["fcc_cycle_valid"])
print(f"Fully valid: {passed}/{len(report['results'])}")
Summary¶
The FCC library is designed for composition. Each component -- registry, graph, simulation engine, validator, doc generator, tag registry, quality gates -- operates independently and accepts standard Python data structures. This makes it straightforward to build custom pipelines that filter, transform, validate, and generate outputs tailored to your specific workflow needs.