Skip to content

Chapter 6: Event Bus and Observability

Learning Objectives

By the end of this chapter you will be able to:

  1. Publish and subscribe to events on the FCC event bus.
  2. Use event filters to receive only the events you care about.
  3. Instrument simulation and action engines with the observability layer.
  4. Collect and export traces and metrics using console and JSON exporters.
  5. Integrate with OpenTelemetry for production observability.

The sequence diagram below shows how the SimulationEngine publishes events that pass through an EventFilter to a Subscriber, then out to a JSON exporter and optionally to OpenTelemetry.

sequenceDiagram
    participant SIM as SimulationEngine
    participant EB as EventBus
    participant F as EventFilter
    participant SUB as Subscriber
    participant EXP as JsonFileExporter
    participant OTEL as OpenTelemetry<br/>(optional)

    SIM->>EB: publish(SIMULATION_STARTED)
    EB->>F: match(event)
    F-->>EB: matches subscriber
    EB->>SUB: deliver(event)
    SIM->>EB: publish(NODE_COMPLETED)
    EB->>SUB: deliver(event)
    SIM->>EB: publish(SIMULATION_COMPLETED)
    EB->>EXP: export(events)
    EB->>OTEL: bridge spans (if available)

In practice, most teams start with only the JSON exporter for offline analysis and enable the OpenTelemetry bridge once a broader observability stack is in place.

The Event Bus

The FCC event bus (src/fcc/messaging/bus.py) is a thread-safe, in-process pub/sub system that enables decoupled communication between framework subsystems. When a simulation node completes, a quality gate is evaluated, or a collaboration session changes state, an event is published to the bus. Any number of subscribers can listen for and react to these events.

Event Types

The framework defines 81 event types in src/fcc/messaging/events.py:

Category Event Types
Simulation SIMULATION_STARTED, SIMULATION_COMPLETED, NODE_STARTED, NODE_COMPLETED
Workflow WORKFLOW_STARTED, WORKFLOW_COMPLETED, FEEDBACK_TRIGGERED
Governance GATE_EVALUATED, GATE_PASSED, GATE_FAILED, CONSTITUTION_CHECKED
Collaboration SESSION_CREATED, SESSION_COMPLETED, TURN_COMPLETED, APPROVAL_REQUESTED
Plugin PLUGIN_LOADED, PLUGIN_ERROR
System ERROR, WARNING, INFO, DEBUG
Custom CUSTOM_1 through CUSTOM_5 (for user-defined events)

Publishing Events

from fcc.messaging.bus import EventBus
from fcc.messaging.events import Event, EventType

bus = EventBus()

event = Event(
    type=EventType.NODE_COMPLETED,
    source="simulation_engine",
    data={
        "node_id": "create_design",
        "persona_id": "software_architect",
        "tokens_used": 1500,
        "duration_ms": 2300,
    },
)

bus.publish(event)

Subscribing to Events

from fcc.messaging.bus import EventBus, Subscriber

bus = EventBus()

def my_handler(event: Event) -> None:
    print(f"Received {event.type.name} from {event.source}")
    print(f"Data: {event.data}")

subscriber = Subscriber(
    name="my_logger",
    handler=my_handler,
    event_types=[EventType.NODE_COMPLETED, EventType.GATE_EVALUATED],
)

bus.subscribe(subscriber)

The subscriber only receives events whose types are in the event_types list. If you omit event_types, the subscriber receives all events.

Event Filters

For more fine-grained control, use EventFilter:

from fcc.messaging.bus import EventFilter

filter = EventFilter(
    event_types=[EventType.NODE_COMPLETED],
    source_pattern="simulation_*",
    data_conditions={"persona_id": "software_architect"},
)

bus.subscribe(Subscriber(
    name="architect_watcher",
    handler=my_handler,
    event_filter=filter,
))

This subscriber only receives NODE_COMPLETED events from sources matching simulation_* where the persona_id in the event data is software_architect.

Event Serialization and Replay

Events can be serialized to JSON for persistence and replay:

from fcc.messaging.serialization import EventSerializer, EventReplay

serializer = EventSerializer()

# Serialize
json_str = serializer.serialize(event)

# Deserialize
restored_event = serializer.deserialize(json_str)

# Replay a sequence of events
replay = EventReplay(bus)
replay.load("events/simulation_trace.json")
replay.play()  # Re-emits all events in order

Replay is valuable for debugging (step through events from a failed simulation), testing (verify subscriber behavior against known event sequences), and demonstration (show stakeholders how a simulation progressed).

The Observability Layer

The observability layer (src/fcc/observability/) provides structured tracing and metrics collection. It is designed to work standalone (console and JSON export) or with OpenTelemetry for production environments.

Tracing

The tracing system records spans -- named, timed operations that form a tree representing the execution structure of a simulation:

from fcc.observability.tracing import FccTracer, traced

tracer = FccTracer(service_name="my_fcc_project")

# Manual span creation
with tracer.start_span("simulate_scenario") as span:
    span.set_attribute("scenario_id", "competitive_analysis")
    span.set_attribute("mode", "mock")
    # ... run simulation ...
    span.set_attribute("steps", 5)

The @traced decorator provides a convenient way to instrument functions:

@traced(tracer, name="process_node")
def process_node(node_id: str, persona_id: str) -> str:
    # The span is automatically created and closed
    return "output"

Span Hierarchy

Spans form a parent-child tree:

simulate_scenario (root)
├── process_node: find_requirements
│   ├── load_persona: domain_expert
│   └── generate_prompt
├── process_node: create_design
│   ├── load_persona: software_architect
│   └── generate_prompt
└── process_node: critique_design
    ├── load_persona: code_reviewer
    ├── generate_prompt
    └── evaluate_gate: test_coverage_minimum

This hierarchy makes it easy to see where time is spent and where errors occur.

Metrics

The metrics system collects 7 pre-defined metrics plus custom metrics:

Metric Type Description
simulation.duration_ms Histogram Total simulation time
simulation.steps Counter Number of steps per simulation
simulation.tokens_total Counter Total tokens consumed
node.duration_ms Histogram Per-node execution time
gate.evaluations Counter Number of gate evaluations
gate.failures Counter Number of gate failures
feedback.cycles Counter Number of feedback loop iterations
from fcc.observability.metrics import FccMetrics

metrics = FccMetrics()

# Record a metric
metrics.record("simulation.tokens_total", 15000, labels={"scenario": "competitive_analysis"})

# Read aggregated metrics
summary = metrics.summary()
for metric_name, values in summary.items():
    print(f"{metric_name}: count={values.count}, avg={values.avg:.1f}")

Exporters

The framework ships with two exporter types:

Console Exporter

Prints spans and metrics to stdout in a human-readable format. Ideal for development:

from fcc.observability.exporters import ConsoleSpanExporter, ConsoleMetricExporter

tracer.add_exporter(ConsoleSpanExporter())
metrics.add_exporter(ConsoleMetricExporter())

JSON File Exporter

Writes spans and metrics to JSON files for offline analysis:

from fcc.observability.exporters import JsonFileSpanExporter, JsonFileMetricExporter

tracer.add_exporter(JsonFileSpanExporter(path="traces/spans.json"))
metrics.add_exporter(JsonFileMetricExporter(path="traces/metrics.json"))

OpenTelemetry Integration

For production environments, the observability layer can bridge to OpenTelemetry:

from opentelemetry import trace as otel_trace
from fcc.observability.integration import instrument_simulation_engine

# Instrument the simulation engine with OTel
engine = SimulationEngine(mode="ai")
instrument_simulation_engine(engine)

# Spans and metrics are now exported via your OTel pipeline

This integration is optional -- the opentelemetry-api and opentelemetry-sdk packages must be installed separately.

Instrumenting the Pipeline

The integration module provides convenience functions for instrumenting the full pipeline:

from fcc.observability.integration import (
    instrument_simulation_engine,
    instrument_action_engine,
)

# Instrument both engines
instrument_simulation_engine(simulation_engine)
instrument_action_engine(action_engine)

# Now every simulation run and action execution is traced and metriced

After instrumentation, every simulation run automatically:

  1. Creates a root span for the simulation.
  2. Creates child spans for each node execution.
  3. Records metrics for duration, tokens, and gate evaluations.
  4. Exports everything via the configured exporters.

Connecting Events and Traces

The event bus and observability layer are complementary:

  • Events are for pub/sub communication between subsystems. They trigger actions (logging, dashboards, plugins).
  • Traces are for performance analysis and debugging. They record timing, hierarchy, and attributes.
  • Metrics are for aggregate monitoring. They track counters and distributions over time.

You can connect them: a subscriber that listens for events and records metrics, or a span exporter that publishes events for each span.

def metrics_subscriber(event: Event) -> None:
    if event.type == EventType.NODE_COMPLETED:
        metrics.record("node.duration_ms", event.data["duration_ms"])
        metrics.record("simulation.tokens_total", event.data["tokens_used"])

bus.subscribe(Subscriber(
    name="metrics_bridge",
    handler=metrics_subscriber,
    event_types=[EventType.NODE_COMPLETED],
))

Key Takeaways

  • The event bus provides thread-safe pub/sub with 81 event types and configurable filters.
  • Events can be serialized, persisted, and replayed for debugging and testing.
  • The observability layer provides structured tracing (spans) and 7 pre-defined metrics.
  • Console and JSON exporters work standalone; OpenTelemetry integration is optional.
  • Events, traces, and metrics are complementary -- connect them for full pipeline visibility.

Cross-References


← Chapter 5: Plugin Development | Next: Chapter 7 -- Collaboration Sessions →