Skip to content

Chapter 7: Event Bus and Observability

Overview

The FCC framework uses two complementary systems for runtime visibility: an event bus for decoupled communication between subsystems and an observability layer for structured tracing and metrics collection. Together they answer the questions "what happened?" (events), "how long did it take?" (tracing), and "how much?" (metrics).

The sequence diagram below shows how an ActionEngine publishes lifecycle events to the bus, how multiple subscribers receive each event, and how the serializer captures the full trace for later replay.

sequenceDiagram
    participant AE as ActionEngine
    participant EB as EventBus
    participant S1 as Subscriber A<br/>(Logger)
    participant S2 as Subscriber B<br/>(Metrics)
    participant SR as EventSerializer

    AE->>EB: publish(action.started)
    EB->>S1: deliver(event)
    EB->>S2: deliver(event)
    AE->>EB: publish(action.completed)
    EB->>S1: deliver(event)
    EB->>S2: deliver(event)
    EB->>SR: serialize(events)
    SR-->>EB: JSON persisted
    Note over EB,SR: Events are replayable<br/>via EventReplay

Because subscribers are decoupled from publishers, new consumers — metrics, compliance, UI updates — can be added without changing any emitting code.

The 81 Event Types

Events are categorised by the subsystem that emits them. The EventType enum in src/fcc/messaging/events.py defines all 81 types across 21 categories:

Category Count Event Types
Action (3) 3 action.started, action.completed, action.failed
Benchmark (3) 3 benchmark.started, benchmark.completed, benchmark.regression
Collaboration (5) 5 collaboration.session.created, collaboration.session.started, collaboration.session.completed, collaboration.turn.taken, collaboration.maturity.assessed
Compliance (4) 4 compliance.audit.started, compliance.audit.completed, compliance.finding.raised, compliance.remediation.required
Deliverable (2) 2 deliverable.created, deliverable.reviewed
Demo (3) 3 demo.started, demo.step.completed, demo.completed
Ecosystem (2) 2 ecosystem.health.check, ecosystem.project.status
Federation (3) 3 federation.entity.resolved, federation.namespace.registered, federation.change.detected
Governance (3) 3 governance.gate.evaluated, governance.gate.passed, governance.gate.failed
Knowledge (10) 10 knowledge.graph.built, knowledge.graph.exported, knowledge.graph.entity.created, knowledge.graph.entity.updated, knowledge.graph.edge.created, knowledge.graph.query.executed, knowledge.graph.federation.namespace.registered, knowledge.graph.federation.merged, knowledge.graph.federation.cross_edge.added, knowledge.graph.federation.entity.resolved
Messaging (4) 4 messaging.dlq.enqueued, messaging.circuit.open, messaging.circuit.closed, messaging.priority.escalated
Node (2) 2 node.entered, node.exited
Object Model (2) 2 objectmodel.facade.registered, objectmodel.query.executed
Persona (2) 2 persona.activated, persona.deactivated
Plugin (2) 2 plugin.loaded, plugin.error
Protocol (10) 10 protocol.a2a.task.received, protocol.a2a.task.completed, protocol.a2a.task.failed, protocol.a2a.card.requested, protocol.mcp.tool.called, protocol.mcp.tool.completed, protocol.mcp.tool.failed, protocol.mcp.resource.read, protocol.mcp.prompt.rendered, protocol.bridge.error
RAG (3) 3 rag.query.started, rag.query.completed, rag.index.built
Search (2) 2 search.index.built, search.query.executed
Simulation (6) 6 simulation.started, simulation.step, simulation.completed, simulation.failed, simulation.budget.warning, simulation.budget.exceeded
Tutorial (3) 3 tutorial.progress.updated, tutorial.checkpoint.reached, tutorial.completed
Vertical (1) 1 vertical.loaded
Viz (3) 3 viz.data.updated, viz.filter.changed, viz.selection.changed
Workflow (3) 3 workflow.started, workflow.completed, workflow.failed

Each event is a frozen dataclass with a UUID, ISO 8601 timestamp, source identifier, payload dict, and optional correlation ID:

from fcc.messaging.events import Event, EventType

event = Event(
    event_type=EventType.ACTION_COMPLETED,
    source="ActionEngine",
    payload={"persona_id": "SQC", "action_type": "test"},
    correlation_id="run-42",
)

print(event.event_id)        # UUID string
print(event.timestamp)       # ISO 8601
print(event.event_type)      # EventType.ACTION_COMPLETED
print(event.correlation_id)  # "run-42"

EventBus Pub/Sub Architecture

The EventBus in src/fcc/messaging/bus.py is a thread-safe, in-process pub/sub system. It does not require external infrastructure -- no message broker, no network calls.

Publishing Events

from fcc.messaging.bus import EventBus

bus = EventBus()
delivered = bus.publish(event)
print(f"Delivered to {delivered} subscribers")

publish() is synchronous: it calls each matching subscriber in the publishing thread and returns the count of successful deliveries. Subscriber exceptions are caught and swallowed to prevent one broken subscriber from blocking others.

Subscribing

Three subscription modes are available:

Global subscriber -- receives every event:

def my_handler(event: Event) -> None:
    print(f"Received: {event.event_type.value}")

bus.subscribe(my_handler)

Type-filtered subscriber -- receives only events of specific types:

from fcc.messaging.bus import EventFilter

bus.subscribe(
    my_handler,
    EventFilter(event_types=(EventType.ACTION_COMPLETED, EventType.ACTION_FAILED)),
)

Full-filter subscriber -- filters on event type, source, and/or correlation ID:

bus.subscribe(
    my_handler,
    EventFilter(
        event_types=(EventType.SIMULATION_STEP,),
        sources=("SimulationEngine",),
        correlation_ids=("run-42",),
    ),
)

EventFilter

The EventFilter frozen dataclass supports three filter dimensions:

@dataclass(frozen=True)
class EventFilter:
    event_types: tuple[EventType, ...] = ()     # empty = match all
    sources: tuple[str, ...] = ()                # empty = match all
    correlation_ids: tuple[str, ...] = ()        # empty = match all

All specified dimensions must match (logical AND). Empty tuples act as wildcards.

Unsubscribing

removed = bus.unsubscribe(my_handler)  # True if found and removed

Recording and History

The bus can record published events for later inspection:

bus.start_recording()
bus.publish(event_1)
bus.publish(event_2)
bus.stop_recording()

history = bus.get_history()  # [event_1, event_2]
bus.clear_history()

Serialization and Replay

The EventSerializer and EventReplay classes in src/fcc/messaging/serialization.py enable persistence and replay of event streams.

Serialization

from fcc.messaging.serialization import EventSerializer

# Single event
json_str = EventSerializer.to_json(event)
restored = EventSerializer.from_json(json_str)

# Batch
json_batch = EventSerializer.to_json_batch([event_1, event_2])
restored_batch = EventSerializer.from_json_batch(json_batch)

# File persistence
EventSerializer.save([event_1, event_2], "events.json")
loaded = EventSerializer.load("events.json")

The file format wraps events in a JSON object with an event_count header:

{
  "event_count": 2,
  "events": [
    {"event_id": "...", "event_type": "action.completed", ...},
    {"event_id": "...", "event_type": "action.failed", ...}
  ]
}

Replay

EventReplay re-publishes saved events through an EventBus:

from fcc.messaging.serialization import EventReplay

replay = EventReplay(bus)

# Replay from a list
total_delivered = replay.replay(events)

# Replay from a file
total_delivered = replay.replay_from_file("events.json")

# Replay with filtering
total_delivered = replay.replay_filtered(
    events,
    correlation_id="run-42",
    source="ActionEngine",
)

Replay is useful for testing (reproduce a specific event sequence), debugging (replay a failed workflow to inspect subscriber behaviour), and auditing (verify that governance gates were evaluated for every deliverable).

Tracing

The observability layer in src/fcc/observability/ provides structured tracing built on a SpanData / SpanContext / FccTracer stack.

SpanData

A SpanData frozen dataclass represents a completed span:

from fcc.observability.tracing import SpanData

span = SpanData(
    span_id="abc-123",
    trace_id="trace-001",
    name="simulation.run",
    start_time="2026-03-27T10:00:00+00:00",
    end_time="2026-03-27T10:00:05+00:00",
    status="ok",
    attributes={"scenario_id": "SC-001"},
)

print(span.duration_ms)  # 5000.0

FccTracer

The tracer creates spans as context managers with automatic nesting:

from fcc.observability.tracing import FccTracer

tracer = FccTracer(service_name="my-app")

with tracer.span("simulation.run") as ctx:
    ctx.set_attribute("scenario_id", "SC-001")

    with tracer.span("simulation.step", {"node_id": "N1"}) as step_ctx:
        # child span — automatically linked to parent
        step_ctx.set_attribute("persona_id", "SQC")

# All completed spans are collected
for span in tracer.completed_spans:
    print(span.name, span.duration_ms, span.parent_span_id)

Child spans automatically receive the parent's trace ID and record the parent's span ID, forming a tree.

The @traced Decorator

For functions that accept a tracer keyword argument:

from fcc.observability.tracing import traced

@traced("my_operation")
def do_work(data, tracer=None):
    # automatically wrapped in a span if tracer is provided
    return process(data)

# With tracing
result = do_work(data, tracer=tracer)

# Without tracing (no overhead)
result = do_work(data)

If tracer is None, the decorator is a no-op.

Metrics

The FccMetrics class in src/fcc/observability/metrics.py collects MetricPoint data points.

MetricPoint

from fcc.observability.metrics import MetricPoint

point = MetricPoint(
    name="fcc.simulation.step",
    value=1.0,
    metric_type="counter",
    labels={"node_id": "N1", "step": "3"},
)

Three metric types are supported: counter (monotonically increasing), gauge (point-in-time value), and histogram (distribution).

FccMetrics

Seven pre-defined metrics cover common operations:

from fcc.observability.metrics import FccMetrics

metrics = FccMetrics()

# Pre-defined convenience methods
metrics.record_simulation_step(step=3, node_id="N1")
metrics.record_persona_activation(persona_id="SQC")
metrics.record_gate_result(gate_id="G-SEC-01", passed=True)
metrics.record_deliverable(persona_id="SQC", action_type="test")
metrics.record_ai_call(provider="anthropic", model="claude-3", latency_ms=1200.0, tokens=850)
metrics.record_action_execution(persona_id="SQC", action_type="test", success=True, duration_ms=450.0)

# Generic operations
metrics.increment("my.custom.counter", labels={"env": "prod"})
metrics.observe("my.custom.gauge", 42.0, labels={"env": "prod"})

# Query
all_points = metrics.points
sim_points = metrics.points_by_name("fcc.simulation.step")
total_steps = metrics.total("fcc.simulation.step")

Console and JSON Exporters

Exporters write collected spans and metrics to external destinations.

Span Exporters

from fcc.observability.exporters import ConsoleSpanExporter, JsonFileSpanExporter

# Console (writes to stderr by default)
console = ConsoleSpanExporter()
console.export(tracer.completed_spans)
# Output: [SPAN] simulation.run trace=abc12345... 5000.0ms status=ok scenario_id=SC-001

# JSON file
json_exporter = JsonFileSpanExporter("traces/run-42.json")
json_exporter.export(tracer.completed_spans)

Metric Exporters

from fcc.observability.exporters import ConsoleMetricExporter, JsonFileMetricExporter

# Console
console = ConsoleMetricExporter()
console.export(metrics.points)
# Output: [METRIC] fcc.simulation.step=1.0 type=counter node_id=N1 step=3

# JSON file
json_exporter = JsonFileMetricExporter("metrics/run-42.json")
json_exporter.export(metrics.points)

Both exporter types implement an ABC (SpanExporter / MetricExporter) with a single export() method, making it straightforward to write custom exporters (e.g., for Prometheus, Datadog, or a database).

Optional OpenTelemetry Integration

When opentelemetry-api and opentelemetry-sdk are installed, FccTracer detects their presence automatically:

tracer = FccTracer()
print(tracer.otel_available)  # True if OTel is installed

The instrument_simulation_engine() and instrument_action_engine() functions in src/fcc/observability/integration.py wire up automatic tracing for the simulation and action engines without modifying their source code. This bridges FCC's internal tracing with OTel-based APM tools.

If OTel is not installed, the framework falls back to its internal SpanData-based implementation with zero overhead.

Key Takeaways

  • 81 event types cover 10 subsystem categories: workflow, node, persona, simulation, governance, deliverable, action, collaboration, and plugin.
  • EventBus provides thread-safe pub/sub with global, type-filtered, and multi-dimension-filtered subscriptions.
  • EventSerializer persists events to JSON; EventReplay replays them through the bus for testing and debugging.
  • FccTracer creates nested spans via context managers; the @traced decorator instruments functions non-invasively.
  • FccMetrics collects counters and gauges with 7 pre-defined metric types and generic increment/observe.
  • Console and JSON file exporters ship built-in; custom exporters implement the SpanExporter / MetricExporter ABCs.
  • OpenTelemetry integration is automatic when the SDK is installed, with zero-overhead fallback otherwise.

Previous: Chapter 6 -- Plugin Architecture | Next: Chapter 8 -- Collaboration Engine

Try this in Notebook 07