Event Bus Integration Guide¶
The FCC Event Bus provides thread-safe, in-process publish/subscribe messaging for workflow events. It enables observability, collaboration recording, plugin integration, and debugging through event replay.
Overview¶
Publisher ──publish()──▶ EventBus ──deliver()──▶ Subscriber(s)
│
├── Global subscribers (receive all events)
├── Type subscribers (fast path, single type)
└── Filtered subscribers (complex criteria)
Key classes:
| Class | Module | Purpose |
|---|---|---|
Event |
fcc.messaging.events |
Immutable event record |
EventType |
fcc.messaging.events |
Enum of 81 event types |
EventBus |
fcc.messaging.bus |
Thread-safe pub/sub hub |
EventFilter |
fcc.messaging.bus |
Subscriber delivery criteria |
EventSerializer |
fcc.messaging.serialization |
JSON serialization |
EventReplay |
fcc.messaging.serialization |
Replay recorded events |
Event Types¶
The framework defines 81 event types organized into 10 categories:
Workflow events¶
| EventType | Value | Emitted when |
|---|---|---|
WORKFLOW_STARTED |
workflow.started |
A workflow graph begins execution |
WORKFLOW_COMPLETED |
workflow.completed |
A workflow graph finishes successfully |
WORKFLOW_FAILED |
workflow.failed |
A workflow graph terminates with an error |
Node events¶
| EventType | Value | Emitted when |
|---|---|---|
NODE_ENTERED |
node.entered |
Execution enters a workflow graph node |
NODE_EXITED |
node.exited |
Execution exits a workflow graph node |
Persona events¶
| EventType | Value | Emitted when |
|---|---|---|
PERSONA_ACTIVATED |
persona.activated |
A persona begins processing |
PERSONA_DEACTIVATED |
persona.deactivated |
A persona finishes processing |
Simulation events¶
| EventType | Value | Emitted when |
|---|---|---|
SIMULATION_STARTED |
simulation.started |
A simulation run begins |
SIMULATION_STEP |
simulation.step |
A single simulation step completes |
SIMULATION_COMPLETED |
simulation.completed |
A simulation run finishes |
SIMULATION_FAILED |
simulation.failed |
A simulation run fails |
Governance events¶
| EventType | Value | Emitted when |
|---|---|---|
GOVERNANCE_GATE_EVALUATED |
governance.gate.evaluated |
A quality gate is evaluated |
GOVERNANCE_GATE_PASSED |
governance.gate.passed |
A quality gate passes |
GOVERNANCE_GATE_FAILED |
governance.gate.failed |
A quality gate fails |
Deliverable events¶
| EventType | Value | Emitted when |
|---|---|---|
DELIVERABLE_CREATED |
deliverable.created |
A new deliverable artifact is produced |
DELIVERABLE_REVIEWED |
deliverable.reviewed |
A deliverable is reviewed |
Action events¶
| EventType | Value | Emitted when |
|---|---|---|
ACTION_STARTED |
action.started |
A workflow action begins |
ACTION_COMPLETED |
action.completed |
A workflow action finishes |
ACTION_FAILED |
action.failed |
A workflow action fails |
Collaboration events¶
| EventType | Value | Emitted when |
|---|---|---|
COLLABORATION_SESSION_CREATED |
collaboration.session.created |
A human-AI session is created |
COLLABORATION_SESSION_STARTED |
collaboration.session.started |
A collaboration session begins |
COLLABORATION_SESSION_COMPLETED |
collaboration.session.completed |
A collaboration session ends |
COLLABORATION_TURN_TAKEN |
collaboration.turn.taken |
A turn is taken in a session |
Plugin events¶
| EventType | Value | Emitted when |
|---|---|---|
PLUGIN_LOADED |
plugin.loaded |
A plugin is successfully loaded |
PLUGIN_ERROR |
plugin.error |
A plugin encounters an error |
Creating and Publishing Events¶
from fcc.messaging.events import Event, EventType
from fcc.messaging.bus import EventBus
bus = EventBus()
# Create and publish an event
event = Event(
event_type=EventType.WORKFLOW_STARTED,
source="my_engine",
payload={"workflow_id": "fcc_base_sequence", "input": "..."},
correlation_id="run-001",
)
delivered = bus.publish(event)
print(f"Delivered to {delivered} subscribers")
The Event dataclass is frozen (immutable) and auto-generates:
- event_id -- a UUID string
- timestamp -- ISO 8601 UTC timestamp
Subscribing to Events¶
Global subscriber (receives everything)¶
def my_handler(event: Event) -> None:
print(f"[{event.event_type.value}] {event.source}: {event.payload}")
bus.subscribe(my_handler)
Type-filtered subscriber (fast path)¶
When you only care about one event type, use the fast path:
from fcc.messaging.bus import EventFilter
bus.subscribe(
my_handler,
EventFilter(event_types=(EventType.WORKFLOW_COMPLETED,)),
)
Multi-criteria filter¶
filter = EventFilter(
event_types=(EventType.GOVERNANCE_GATE_PASSED, EventType.GOVERNANCE_GATE_FAILED),
sources=("QualityGateRunner",),
)
bus.subscribe(my_handler, filter)
Correlation-based filter¶
Track related events across a workflow run:
Unsubscribing¶
EventFilter Reference¶
EventFilter is a frozen dataclass with three optional fields. All criteria
are AND-combined; empty tuples mean "match all":
| Field | Type | Behavior |
|---|---|---|
event_types |
tuple[EventType, ...] |
Match events of these types |
sources |
tuple[str, ...] |
Match events from these source components |
correlation_ids |
tuple[str, ...] |
Match events with these correlation IDs |
# All governance events from the quality gate runner
EventFilter(
event_types=(
EventType.GOVERNANCE_GATE_EVALUATED,
EventType.GOVERNANCE_GATE_PASSED,
EventType.GOVERNANCE_GATE_FAILED,
),
sources=("QualityGateRunner",),
)
Recording and Replay¶
The EventBus supports recording published events for later replay, which is useful for debugging, testing, and audit trails.
Recording events¶
bus.start_recording()
# ... run your workflow, events are captured ...
bus.stop_recording()
history = bus.get_history() # list[Event]
print(f"Captured {len(history)} events")
Serialization¶
Save and load events to/from JSON files:
from fcc.messaging.serialization import EventSerializer
# Save to file
EventSerializer.save(history, "debug/events.json")
# Load from file
events = EventSerializer.load("debug/events.json")
# Single event JSON
json_str = EventSerializer.to_json(event)
restored = EventSerializer.from_json(json_str)
# Batch JSON
json_batch = EventSerializer.to_json_batch(events)
restored_batch = EventSerializer.from_json_batch(json_batch)
Replay¶
Replay recorded events through a new bus (e.g., with different subscribers):
from fcc.messaging.serialization import EventReplay
replay_bus = EventBus()
replay_bus.subscribe(my_analysis_handler)
replayer = EventReplay(replay_bus)
# Replay all events
total_deliveries = replayer.replay(events)
# Replay from file
total_deliveries = replayer.replay_from_file("debug/events.json")
# Replay only events matching criteria
total_deliveries = replayer.replay_filtered(
events,
correlation_id="run-001",
source="SimulationEngine",
)
Integration with FCC Components¶
Simulation engine¶
The SimulationEngine accepts an optional event_bus parameter:
from fcc.simulation.engine import SimulationEngine
bus = EventBus()
engine = SimulationEngine(event_bus=bus)
# Engine emits SIMULATION_STARTED, SIMULATION_STEP, SIMULATION_COMPLETED
Quality gate runner¶
The QualityGateRunner.run_gate() method accepts an optional event_bus:
from fcc.governance.quality_gates import QualityGateRunner
runner = QualityGateRunner.from_yaml(gates_path)
result = runner.run_gate(gate, artifact, event_bus=bus)
# Emits GOVERNANCE_GATE_PASSED or GOVERNANCE_GATE_FAILED
Observability integration¶
The observability layer can instrument engines to emit tracing spans alongside events:
from fcc.observability.integration import (
instrument_simulation_engine,
instrument_action_engine,
)
Collaboration recording¶
The SessionRecorder uses the event bus to record collaboration sessions:
from fcc.collaboration.recording import SessionRecorder
recorder = SessionRecorder(bus)
# Captures COLLABORATION_SESSION_CREATED, COLLABORATION_TURN_TAKEN, etc.
Plugin Subscribers¶
Plugins can register event subscribers that are automatically wired to the
EventBus when discovered. See the
Plugin Development Guide for details on the
EventSubscriberPlugin ABC.
from fcc.messaging.plugin_bridge import EventSubscriberPlugin
class MyLoggerPlugin(EventSubscriberPlugin):
def plugin_meta(self) -> PluginMeta: ...
def get_subscribers(self) -> list[tuple[Subscriber, EventFilter | None]]:
return [
(self._log_governance, EventFilter(
event_types=(EventType.GOVERNANCE_GATE_FAILED,)
)),
(self._log_all, None), # global subscriber
]
def _log_governance(self, event: Event) -> None:
print(f"GATE FAILED: {event.payload}")
def _log_all(self, event: Event) -> None:
print(f"[{event.timestamp}] {event.event_type.value}")
Thread Safety¶
The EventBus is fully thread-safe:
- All subscription/unsubscription operations are protected by a
threading.Lock. - History recording is also lock-protected.
- Event delivery happens outside the lock to avoid deadlocks.
- Subscriber exceptions are silently caught to prevent one subscriber from breaking other subscribers or the publisher.
Best Practices¶
- Use
correlation_idto link related events across a workflow execution. - Use
start_recording()during development to capture event traces for debugging. - Serialize event histories to JSON files for audit trails and test fixtures.
- Keep subscriber handlers fast; long-running work should be dispatched to a background thread or queue.
- Use
EventFilterto reduce noise -- global subscribers receive every event. - Prefer the single-type fast path when subscribing to exactly one event type.
- Create fresh
EventBusinstances in tests to avoid state leakage between test cases.