Skip to content

Building Custom Pipelines

This tutorial shows how to compose FCC library components into multi-step pipelines that go beyond the standard CLI workflow. You will learn to filter personas, run targeted simulations, generate selective documentation, validate results, and integrate with external tools.

Pipeline 1: Category-Filtered Simulation

Run a simulation using only personas from specific categories.

from fcc._resources import get_personas_dir, get_workflows_dir
from fcc.personas.registry import PersonaRegistry
from fcc.workflow.graph import WorkflowGraph
from fcc.workflow.models import WorkflowMeta, WorkflowNode, WorkflowEdge
from fcc.simulation.ai_client import AIClient, AIProvider
from fcc.simulation.ai_engine import AISimulationEngine

# Load registry
registry = PersonaRegistry.from_yaml_directory(get_personas_dir())

# Filter to core + governance personas only
selected_categories = {"core", "governance"}
selected = [p for p in registry if p.category in selected_categories]
filtered_registry = PersonaRegistry(selected)
print(f"Selected {len(filtered_registry)} personas from {selected_categories}")

# Build a custom linear graph from the filtered personas
nodes = [WorkflowNode(id=p.id, name=p.name, type="persona") for p in filtered_registry]
edges = []
persona_list = list(filtered_registry)
for i in range(len(persona_list) - 1):
    edges.append(WorkflowEdge(
        from_id=persona_list[i].id,
        to_id=persona_list[i + 1].id,
        label=f"{persona_list[i].id}_to_{persona_list[i + 1].id}",
        type="handoff",
    ))

meta = WorkflowMeta(id="custom_filtered", title="Core + Governance Pipeline")
graph = WorkflowGraph(meta=meta, nodes=nodes, edges=edges)

# Simulate
client = AIClient(provider=AIProvider.MOCK)
engine = AISimulationEngine(
    graph=graph,
    ai_client=client,
    registry=filtered_registry,
    max_steps=30,
)
result = engine.run(
    start_node=persona_list[0].id,
    initial_payload="Security audit of API endpoints",
    scenario_id="FILTERED-001",
)
print(f"Steps: {result.total_steps}, AI calls: {result.total_ai_calls}")

Pipeline 2: Multi-Scenario Batch Runner

Run all scenarios from a directory and collect aggregated results.

from fcc._resources import get_personas_dir, get_workflows_dir, get_scenarios_dir
from fcc.personas.registry import PersonaRegistry
from fcc.workflow.graph import WorkflowGraph
from fcc.simulation.ai_client import AIClient, AIProvider
from fcc.simulation.ai_engine import AISimulationEngine, write_ai_traces

registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
graph = WorkflowGraph.from_json(get_workflows_dir() / "base_sequence.json")
client = AIClient(provider=AIProvider.MOCK)
engine = AISimulationEngine(graph=graph, ai_client=client, registry=registry)

from fcc.scenarios.loader import ScenarioLoader

loader = ScenarioLoader.from_directory(get_scenarios_dir())
all_results = []

for scenario in loader:
    start_node = scenario.get("setup", {}).get("start_node", "RC")
    result = engine.run(
        start_node=start_node,
        initial_payload=scenario["name"],
        scenario_id=scenario["id"],
        scenario_name=scenario["name"],
    )
    all_results.append(result)
    status = "OK" if result.success else "FAIL"
    print(f"  [{status}] {scenario['id']}: {result.total_steps} steps")

# Write all traces to one file
write_ai_traces(all_results, "batch_traces.json")

# Summary
passed = sum(1 for r in all_results if r.success)
print(f"\nBatch complete: {passed}/{len(all_results)} passed")
total_tokens = sum(r.total_tokens for r in all_results)
print(f"Total tokens used: {total_tokens}")

Pipeline 3: Simulate, Validate, then Generate Docs

Chain simulation into validation and conditionally generate documentation only when validation passes.

from pathlib import Path
from fcc._resources import get_data_dir, get_personas_dir, get_workflows_dir
from fcc.personas.registry import PersonaRegistry
from fcc.workflow.graph import WorkflowGraph
from fcc.simulation.ai_client import AIClient, AIProvider
from fcc.simulation.ai_engine import AISimulationEngine
from fcc.scenarios.validators import FCCValidator
from fcc.scaffold.doc_generator import DocGenerator

# Setup
registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
graph = WorkflowGraph.from_json(get_workflows_dir() / "base_sequence.json")
client = AIClient(provider=AIProvider.MOCK)
engine = AISimulationEngine(graph=graph, ai_client=client, registry=registry)

# Step 1: Simulate
result = engine.run(
    start_node="RC",
    initial_payload="Design API versioning strategy",
    scenario_id="PIPE-001",
)

# Step 2: Extract participating personas and validate
actors = [event["actor_id"] for event in result.events]
unique_actors = list(dict.fromkeys(actors))  # preserve order, deduplicate

validator = FCCValidator.from_registry(registry)

# Map actor IDs to full names for FCC cycle validation
actor_names = [registry.get(aid).name for aid in unique_actors if aid in registry]
cycle_result = validator.validate_fcc_cycle(actor_names)

category_result = validator.validate_cross_category_coverage(unique_actors, min_categories=2)
governance_result = validator.validate_governance_gate(unique_actors)
termination_result = validator.validate_feedback_loop_termination(result.total_steps, 50)

all_validations = [cycle_result, category_result, governance_result, termination_result]
all_passed = all(v.passed for v in all_validations)

print("Validation results:")
for v in all_validations:
    status = "PASS" if v.passed else "FAIL"
    print(f"  [{status}] {v.rule}: {v.message}")

# Step 3: Generate docs only if validation passes
if all_passed:
    gen = DocGenerator(registry, data_dir=get_data_dir())
    total = gen.generate_all_docs(Path("validated_docs"), persona_ids=unique_actors)
    print(f"\nGenerated {total} doc files for validated personas")
else:
    print("\nSkipping doc generation -- validation failures detected")

Pipeline 4: Cross-Reference Analysis

Analyze persona interaction patterns to find bottlenecks, isolated personas, or heavily-loaded coordination hubs.

from collections import Counter
from fcc._resources import get_personas_dir
from fcc.personas.registry import PersonaRegistry
from fcc.personas.cross_reference import CrossReferenceMatrix

registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
matrix = CrossReferenceMatrix.from_personas(registry)

# Count interactions per persona (in + out)
interaction_count = Counter()
for entry in matrix:
    interaction_count[entry.source_id] += 1
    interaction_count[entry.target_id] += 1

print("Persona interaction frequency:")
for pid, count in interaction_count.most_common():
    persona = registry.get(pid)
    print(f"  {persona.name} ({pid}): {count} interactions")

# Find isolated personas (no interactions)
all_ids = set(registry.ids)
connected_ids = matrix.all_persona_ids()
isolated = all_ids - connected_ids
if isolated:
    print(f"\nIsolated personas: {isolated}")
else:
    print(f"\nAll {len(all_ids)} personas are connected")

# Relationship type distribution
type_counts = Counter(entry.relationship_type for entry in matrix)
print(f"\nRelationship types: {dict(type_counts)}")

# Find the heaviest handoff chains
handoffs = matrix.by_type("handoff")
print(f"\nHandoff chains ({len(handoffs)} total):")
for h in handoffs[:10]:
    src = registry.get(h.source_id).name
    tgt = registry.get(h.target_id).name
    print(f"  {src} -> {tgt}: {h.interaction}")

Pipeline 5: Champion Orchestration Audit

Verify that each champion persona properly orchestrates its declared team.

from fcc._resources import get_personas_dir
from fcc.personas.registry import PersonaRegistry
from fcc.scenarios.validators import FCCValidator

registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
validator = FCCValidator.from_registry(registry)

for champion in registry.champions():
    base = registry.base_of(champion.id)
    print(f"\n{champion.name} ({champion.id}) -> champions {base.name}")
    print(f"  Orchestrates: {champion.orchestrates}")

    # Simulate that all orchestrated personas executed
    result = validator.validate_champion_orchestration(
        champion_id=champion.id,
        orchestrated_personas=champion.orchestrates,
        personas_executed=champion.orchestrates,  # full coverage
    )
    print(f"  Full coverage: {result.passed}")

    # Simulate partial coverage (missing last persona)
    if champion.orchestrates:
        partial = champion.orchestrates[:-1]
        result = validator.validate_champion_orchestration(
            champion_id=champion.id,
            orchestrated_personas=champion.orchestrates,
            personas_executed=partial,
        )
        print(f"  Partial coverage: {result.passed} ({result.message})")

Pipeline 6: Quality Gate Dashboard

Run all quality gates and produce a summary report.

from fcc._resources import get_governance_dir
from fcc.governance.quality_gates import QualityGateRunner

runner = QualityGateRunner.from_yaml(
    get_governance_dir() / "quality_gates.yaml"
)

# Simulate artifact dicts (in production, these come from real deliverables)
# Each key is a check name, value is bool pass/fail
sample_artifacts = {}
for gate in runner.gates:
    # Simulate 80% pass rate for demonstration
    checks_dict = {}
    for i, check in enumerate(gate.checks):
        checks_dict[check] = (i % 5 != 0)  # fail every 5th check
    sample_artifacts[gate.persona_id] = checks_dict

results = runner.run_all(sample_artifacts)

# Dashboard
total_gates = len(results)
passed_gates = sum(1 for r in results if r.passed)
print(f"Quality Gate Dashboard: {passed_gates}/{total_gates} passed")
print(f"Pass rate: {passed_gates / total_gates:.0%}\n")

for r in results:
    status = "PASS" if r.passed else "FAIL"
    print(f"  [{status}] {r.gate_id}: {r.checks_passed}/{r.checks_total}")

Pipeline 7: Selective Doc Generation with Tag Filtering

Use the tag registry to generate documentation only for personas that match specific capability tags.

from pathlib import Path
from fcc._resources import get_data_dir, get_governance_dir, get_personas_dir
from fcc.governance.tags import TagRegistry
from fcc.personas.registry import PersonaRegistry
from fcc.scaffold.doc_generator import DocGenerator

registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
tags = TagRegistry.from_yaml(get_governance_dir() / "tag_registry.yaml")

# Find all tags in the "research" supercategory
research_tags = tags.by_supercategory("research")
print(f"Research tags: {[t.capability for t in research_tags]}")

# Select personas in the "Find" phase (research-related)
find_personas = registry.by_phase("Find")
find_ids = [p.id for p in find_personas]
print(f"Find-phase personas: {find_ids}")

# Generate docs for just those personas
gen = DocGenerator(registry, data_dir=get_data_dir())
total = gen.generate_all_docs(Path("research_docs"), persona_ids=find_ids)
print(f"Generated {total} files for research personas")

Pipeline 8: Combining with External Tools

FCC simulation results are plain dicts and JSON files. You can pipe them into any external tool or framework.

Export to pandas DataFrame

from fcc.simulation.ai_engine import AISimulationEngine, AISimulationResult

# Assuming `result` is an AISimulationResult from a simulation run
# Convert events to a pandas DataFrame:

import pandas as pd  # requires: pip install pandas

df = pd.DataFrame(result.events)
print(df[["step", "actor", "actor_id", "edge_label", "history_len"]].to_string())

# Analyze step distribution by actor
print(df.groupby("actor_id").size().sort_values(ascending=False))

Export Cross-Reference Matrix as CSV

import csv
from fcc._resources import get_personas_dir
from fcc.personas.cross_reference import CrossReferenceMatrix

matrix = CrossReferenceMatrix.from_yaml(
    get_personas_dir() / "cross_reference.yaml"
)

with open("cross_references.csv", "w", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["source_id", "target_id", "type", "interaction", "strength"])
    for entry in matrix:
        writer.writerow([
            entry.source_id,
            entry.target_id,
            entry.relationship_type,
            entry.interaction,
            entry.strength,
        ])

print("Exported cross-references to cross_references.csv")

Generate a NetworkX Graph from WorkflowGraph

import networkx as nx  # requires: pip install networkx

from fcc._resources import get_workflows_dir
from fcc.workflow.graph import WorkflowGraph

fcc_graph = WorkflowGraph.from_json(get_workflows_dir() / "base_sequence.json")

G = nx.DiGraph()
for node in fcc_graph.nodes:
    G.add_node(node.id, name=node.name, type=node.type)
for edge in fcc_graph.edges:
    G.add_edge(edge.from_id, edge.to_id, label=edge.label, type=edge.type)

print(f"NetworkX graph: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges")
print(f"Is DAG (handoffs only): {nx.is_directed_acyclic_graph(G)}")

# Find shortest path
if nx.has_path(G, "RC", "DE"):
    path = nx.shortest_path(G, "RC", "DE")
    print(f"Shortest path RC -> DE: {path}")

Pipeline 9: End-to-End Automated Report

Combine all pipeline patterns into a single automated report.

#!/usr/bin/env python3
"""Automated FCC pipeline report."""

import json
from pathlib import Path
from fcc._resources import (
    get_data_dir,
    get_governance_dir,
    get_personas_dir,
    get_scenarios_dir,
    get_workflows_dir,
)
from fcc.governance.quality_gates import QualityGateRunner
from fcc.governance.tags import TagRegistry
from fcc.personas.cross_reference import CrossReferenceMatrix
from fcc.personas.registry import PersonaRegistry
from fcc.scaffold.doc_generator import DocGenerator
from fcc.scenarios.loader import ScenarioLoader
from fcc.scenarios.validators import FCCValidator
from fcc.simulation.ai_client import AIClient, AIProvider
from fcc.simulation.ai_engine import AISimulationEngine, write_ai_traces

# ---- Setup ----
registry = PersonaRegistry.from_yaml_directory(get_personas_dir())
graph = WorkflowGraph.from_json(get_workflows_dir() / "base_sequence.json")
client = AIClient(provider=AIProvider.MOCK)
engine = AISimulationEngine(graph=graph, ai_client=client, registry=registry)
validator = FCCValidator.from_registry(registry)
loader = ScenarioLoader.from_directory(get_scenarios_dir())

report = {
    "personas": len(registry),
    "scenarios_available": len(loader),
    "results": [],
}

# ---- Run all scenarios ----
for scenario in loader:
    start_node = scenario.get("setup", {}).get("start_node", "RC")
    sim_result = engine.run(
        start_node=start_node,
        initial_payload=scenario["name"],
        scenario_id=scenario["id"],
        scenario_name=scenario["name"],
    )

    # Validate
    actors = list({e["actor_id"] for e in sim_result.events})
    actor_names = [registry.get(a).name for a in actors if a in registry]
    cycle = validator.validate_fcc_cycle(actor_names)

    report["results"].append({
        "scenario_id": scenario["id"],
        "steps": sim_result.total_steps,
        "ai_calls": sim_result.total_ai_calls,
        "success": sim_result.success,
        "fcc_cycle_valid": cycle.passed,
        "phases_covered": cycle.details.get("phases_covered", []),
    })

# ---- Write report ----
with open("pipeline_report.json", "w") as f:
    json.dump(report, f, indent=2)

print(f"Report written: {len(report['results'])} scenarios evaluated")
passed = sum(1 for r in report["results"] if r["success"] and r["fcc_cycle_valid"])
print(f"Fully valid: {passed}/{len(report['results'])}")

Summary

The FCC library is designed for composition. Each component -- registry, graph, simulation engine, validator, doc generator, tag registry, quality gates -- operates independently and accepts standard Python data structures. This makes it straightforward to build custom pipelines that filter, transform, validate, and generate outputs tailored to your specific workflow needs.