Chapter 7: Event Bus and Observability¶
Overview¶
The FCC framework uses two complementary systems for runtime visibility: an event bus for decoupled communication between subsystems and an observability layer for structured tracing and metrics collection. Together they answer the questions "what happened?" (events), "how long did it take?" (tracing), and "how much?" (metrics).
The sequence diagram below shows how an ActionEngine publishes lifecycle events to the bus, how multiple subscribers receive each event, and how the serializer captures the full trace for later replay.
sequenceDiagram
participant AE as ActionEngine
participant EB as EventBus
participant S1 as Subscriber A<br/>(Logger)
participant S2 as Subscriber B<br/>(Metrics)
participant SR as EventSerializer
AE->>EB: publish(action.started)
EB->>S1: deliver(event)
EB->>S2: deliver(event)
AE->>EB: publish(action.completed)
EB->>S1: deliver(event)
EB->>S2: deliver(event)
EB->>SR: serialize(events)
SR-->>EB: JSON persisted
Note over EB,SR: Events are replayable<br/>via EventReplay
Because subscribers are decoupled from publishers, new consumers — metrics, compliance, UI updates — can be added without changing any emitting code.
The 81 Event Types¶
Events are categorised by the subsystem that emits them. The EventType enum in src/fcc/messaging/events.py defines all 81 types across 21 categories:
| Category | Count | Event Types |
|---|---|---|
| Action (3) | 3 | action.started, action.completed, action.failed |
| Benchmark (3) | 3 | benchmark.started, benchmark.completed, benchmark.regression |
| Collaboration (5) | 5 | collaboration.session.created, collaboration.session.started, collaboration.session.completed, collaboration.turn.taken, collaboration.maturity.assessed |
| Compliance (4) | 4 | compliance.audit.started, compliance.audit.completed, compliance.finding.raised, compliance.remediation.required |
| Deliverable (2) | 2 | deliverable.created, deliverable.reviewed |
| Demo (3) | 3 | demo.started, demo.step.completed, demo.completed |
| Ecosystem (2) | 2 | ecosystem.health.check, ecosystem.project.status |
| Federation (3) | 3 | federation.entity.resolved, federation.namespace.registered, federation.change.detected |
| Governance (3) | 3 | governance.gate.evaluated, governance.gate.passed, governance.gate.failed |
| Knowledge (10) | 10 | knowledge.graph.built, knowledge.graph.exported, knowledge.graph.entity.created, knowledge.graph.entity.updated, knowledge.graph.edge.created, knowledge.graph.query.executed, knowledge.graph.federation.namespace.registered, knowledge.graph.federation.merged, knowledge.graph.federation.cross_edge.added, knowledge.graph.federation.entity.resolved |
| Messaging (4) | 4 | messaging.dlq.enqueued, messaging.circuit.open, messaging.circuit.closed, messaging.priority.escalated |
| Node (2) | 2 | node.entered, node.exited |
| Object Model (2) | 2 | objectmodel.facade.registered, objectmodel.query.executed |
| Persona (2) | 2 | persona.activated, persona.deactivated |
| Plugin (2) | 2 | plugin.loaded, plugin.error |
| Protocol (10) | 10 | protocol.a2a.task.received, protocol.a2a.task.completed, protocol.a2a.task.failed, protocol.a2a.card.requested, protocol.mcp.tool.called, protocol.mcp.tool.completed, protocol.mcp.tool.failed, protocol.mcp.resource.read, protocol.mcp.prompt.rendered, protocol.bridge.error |
| RAG (3) | 3 | rag.query.started, rag.query.completed, rag.index.built |
| Search (2) | 2 | search.index.built, search.query.executed |
| Simulation (6) | 6 | simulation.started, simulation.step, simulation.completed, simulation.failed, simulation.budget.warning, simulation.budget.exceeded |
| Tutorial (3) | 3 | tutorial.progress.updated, tutorial.checkpoint.reached, tutorial.completed |
| Vertical (1) | 1 | vertical.loaded |
| Viz (3) | 3 | viz.data.updated, viz.filter.changed, viz.selection.changed |
| Workflow (3) | 3 | workflow.started, workflow.completed, workflow.failed |
Each event is a frozen dataclass with a UUID, ISO 8601 timestamp, source identifier, payload dict, and optional correlation ID:
from fcc.messaging.events import Event, EventType
event = Event(
event_type=EventType.ACTION_COMPLETED,
source="ActionEngine",
payload={"persona_id": "SQC", "action_type": "test"},
correlation_id="run-42",
)
print(event.event_id) # UUID string
print(event.timestamp) # ISO 8601
print(event.event_type) # EventType.ACTION_COMPLETED
print(event.correlation_id) # "run-42"
EventBus Pub/Sub Architecture¶
The EventBus in src/fcc/messaging/bus.py is a thread-safe, in-process pub/sub system. It does not require external infrastructure -- no message broker, no network calls.
Publishing Events¶
from fcc.messaging.bus import EventBus
bus = EventBus()
delivered = bus.publish(event)
print(f"Delivered to {delivered} subscribers")
publish() is synchronous: it calls each matching subscriber in the publishing thread and returns the count of successful deliveries. Subscriber exceptions are caught and swallowed to prevent one broken subscriber from blocking others.
Subscribing¶
Three subscription modes are available:
Global subscriber -- receives every event:
def my_handler(event: Event) -> None:
print(f"Received: {event.event_type.value}")
bus.subscribe(my_handler)
Type-filtered subscriber -- receives only events of specific types:
from fcc.messaging.bus import EventFilter
bus.subscribe(
my_handler,
EventFilter(event_types=(EventType.ACTION_COMPLETED, EventType.ACTION_FAILED)),
)
Full-filter subscriber -- filters on event type, source, and/or correlation ID:
bus.subscribe(
my_handler,
EventFilter(
event_types=(EventType.SIMULATION_STEP,),
sources=("SimulationEngine",),
correlation_ids=("run-42",),
),
)
EventFilter¶
The EventFilter frozen dataclass supports three filter dimensions:
@dataclass(frozen=True)
class EventFilter:
event_types: tuple[EventType, ...] = () # empty = match all
sources: tuple[str, ...] = () # empty = match all
correlation_ids: tuple[str, ...] = () # empty = match all
All specified dimensions must match (logical AND). Empty tuples act as wildcards.
Unsubscribing¶
Recording and History¶
The bus can record published events for later inspection:
bus.start_recording()
bus.publish(event_1)
bus.publish(event_2)
bus.stop_recording()
history = bus.get_history() # [event_1, event_2]
bus.clear_history()
Serialization and Replay¶
The EventSerializer and EventReplay classes in src/fcc/messaging/serialization.py enable persistence and replay of event streams.
Serialization¶
from fcc.messaging.serialization import EventSerializer
# Single event
json_str = EventSerializer.to_json(event)
restored = EventSerializer.from_json(json_str)
# Batch
json_batch = EventSerializer.to_json_batch([event_1, event_2])
restored_batch = EventSerializer.from_json_batch(json_batch)
# File persistence
EventSerializer.save([event_1, event_2], "events.json")
loaded = EventSerializer.load("events.json")
The file format wraps events in a JSON object with an event_count header:
{
"event_count": 2,
"events": [
{"event_id": "...", "event_type": "action.completed", ...},
{"event_id": "...", "event_type": "action.failed", ...}
]
}
Replay¶
EventReplay re-publishes saved events through an EventBus:
from fcc.messaging.serialization import EventReplay
replay = EventReplay(bus)
# Replay from a list
total_delivered = replay.replay(events)
# Replay from a file
total_delivered = replay.replay_from_file("events.json")
# Replay with filtering
total_delivered = replay.replay_filtered(
events,
correlation_id="run-42",
source="ActionEngine",
)
Replay is useful for testing (reproduce a specific event sequence), debugging (replay a failed workflow to inspect subscriber behaviour), and auditing (verify that governance gates were evaluated for every deliverable).
Tracing¶
The observability layer in src/fcc/observability/ provides structured tracing built on a SpanData / SpanContext / FccTracer stack.
SpanData¶
A SpanData frozen dataclass represents a completed span:
from fcc.observability.tracing import SpanData
span = SpanData(
span_id="abc-123",
trace_id="trace-001",
name="simulation.run",
start_time="2026-03-27T10:00:00+00:00",
end_time="2026-03-27T10:00:05+00:00",
status="ok",
attributes={"scenario_id": "SC-001"},
)
print(span.duration_ms) # 5000.0
FccTracer¶
The tracer creates spans as context managers with automatic nesting:
from fcc.observability.tracing import FccTracer
tracer = FccTracer(service_name="my-app")
with tracer.span("simulation.run") as ctx:
ctx.set_attribute("scenario_id", "SC-001")
with tracer.span("simulation.step", {"node_id": "N1"}) as step_ctx:
# child span — automatically linked to parent
step_ctx.set_attribute("persona_id", "SQC")
# All completed spans are collected
for span in tracer.completed_spans:
print(span.name, span.duration_ms, span.parent_span_id)
Child spans automatically receive the parent's trace ID and record the parent's span ID, forming a tree.
The @traced Decorator¶
For functions that accept a tracer keyword argument:
from fcc.observability.tracing import traced
@traced("my_operation")
def do_work(data, tracer=None):
# automatically wrapped in a span if tracer is provided
return process(data)
# With tracing
result = do_work(data, tracer=tracer)
# Without tracing (no overhead)
result = do_work(data)
If tracer is None, the decorator is a no-op.
Metrics¶
The FccMetrics class in src/fcc/observability/metrics.py collects MetricPoint data points.
MetricPoint¶
from fcc.observability.metrics import MetricPoint
point = MetricPoint(
name="fcc.simulation.step",
value=1.0,
metric_type="counter",
labels={"node_id": "N1", "step": "3"},
)
Three metric types are supported: counter (monotonically increasing), gauge (point-in-time value), and histogram (distribution).
FccMetrics¶
Seven pre-defined metrics cover common operations:
from fcc.observability.metrics import FccMetrics
metrics = FccMetrics()
# Pre-defined convenience methods
metrics.record_simulation_step(step=3, node_id="N1")
metrics.record_persona_activation(persona_id="SQC")
metrics.record_gate_result(gate_id="G-SEC-01", passed=True)
metrics.record_deliverable(persona_id="SQC", action_type="test")
metrics.record_ai_call(provider="anthropic", model="claude-3", latency_ms=1200.0, tokens=850)
metrics.record_action_execution(persona_id="SQC", action_type="test", success=True, duration_ms=450.0)
# Generic operations
metrics.increment("my.custom.counter", labels={"env": "prod"})
metrics.observe("my.custom.gauge", 42.0, labels={"env": "prod"})
# Query
all_points = metrics.points
sim_points = metrics.points_by_name("fcc.simulation.step")
total_steps = metrics.total("fcc.simulation.step")
Console and JSON Exporters¶
Exporters write collected spans and metrics to external destinations.
Span Exporters¶
from fcc.observability.exporters import ConsoleSpanExporter, JsonFileSpanExporter
# Console (writes to stderr by default)
console = ConsoleSpanExporter()
console.export(tracer.completed_spans)
# Output: [SPAN] simulation.run trace=abc12345... 5000.0ms status=ok scenario_id=SC-001
# JSON file
json_exporter = JsonFileSpanExporter("traces/run-42.json")
json_exporter.export(tracer.completed_spans)
Metric Exporters¶
from fcc.observability.exporters import ConsoleMetricExporter, JsonFileMetricExporter
# Console
console = ConsoleMetricExporter()
console.export(metrics.points)
# Output: [METRIC] fcc.simulation.step=1.0 type=counter node_id=N1 step=3
# JSON file
json_exporter = JsonFileMetricExporter("metrics/run-42.json")
json_exporter.export(metrics.points)
Both exporter types implement an ABC (SpanExporter / MetricExporter) with a single export() method, making it straightforward to write custom exporters (e.g., for Prometheus, Datadog, or a database).
Optional OpenTelemetry Integration¶
When opentelemetry-api and opentelemetry-sdk are installed, FccTracer detects their presence automatically:
The instrument_simulation_engine() and instrument_action_engine() functions in src/fcc/observability/integration.py wire up automatic tracing for the simulation and action engines without modifying their source code. This bridges FCC's internal tracing with OTel-based APM tools.
If OTel is not installed, the framework falls back to its internal SpanData-based implementation with zero overhead.
Key Takeaways
- 81 event types cover 10 subsystem categories: workflow, node, persona, simulation, governance, deliverable, action, collaboration, and plugin.
EventBusprovides thread-safe pub/sub with global, type-filtered, and multi-dimension-filtered subscriptions.EventSerializerpersists events to JSON;EventReplayreplays them through the bus for testing and debugging.FccTracercreates nested spans via context managers; the@traceddecorator instruments functions non-invasively.FccMetricscollects counters and gauges with 7 pre-defined metric types and generic increment/observe.- Console and JSON file exporters ship built-in; custom exporters implement the
SpanExporter/MetricExporterABCs.- OpenTelemetry integration is automatic when the SDK is installed, with zero-overhead fallback otherwise.
Previous: Chapter 6 -- Plugin Architecture | Next: Chapter 8 -- Collaboration Engine
Try this in Notebook 07