Event Bus¶
The FCC messaging system provides a thread-safe, in-process event bus with 81 event types, flexible filtering, serialization, and replay capabilities. It serves as the central nervous system connecting workflow execution, simulation, governance, collaboration, and plugin subsystems.
sequenceDiagram
participant Pub as Publisher
participant Bus as EventBus
participant Filt as EventFilter
participant Sub1 as Subscriber A
participant Sub2 as Subscriber B
Pub->>Bus: publish(Event)
Bus->>Bus: start_recording()
Bus->>Filt: matches(Event)?
Filt-->>Bus: true
Bus->>Sub1: deliver(Event)
Bus->>Filt: matches(Event)?
Filt-->>Bus: false
Note right of Sub2: filtered out
Bus-->>Pub: delivery count = 1
EventType¶
The EventType enum defines 81 event types organized into 10 categories:
| Category | Event Types | Description |
|---|---|---|
| Workflow | WORKFLOW_STARTED, WORKFLOW_COMPLETED, WORKFLOW_FAILED |
Lifecycle of workflow graph traversals |
| Node | NODE_ENTERED, NODE_EXITED |
Individual workflow node transitions |
| Persona | PERSONA_ACTIVATED, PERSONA_DEACTIVATED |
Persona activation during workflows |
| Simulation | SIMULATION_STARTED, SIMULATION_STEP, SIMULATION_COMPLETED, SIMULATION_FAILED |
Simulation engine lifecycle |
| Governance | GOVERNANCE_GATE_EVALUATED, GOVERNANCE_GATE_PASSED, GOVERNANCE_GATE_FAILED |
Quality gate evaluations |
| Deliverable | DELIVERABLE_CREATED, DELIVERABLE_REVIEWED |
Artifact creation and review |
| Action | ACTION_STARTED, ACTION_COMPLETED, ACTION_FAILED |
Action engine lifecycle |
| Collaboration | COLLABORATION_SESSION_CREATED, COLLABORATION_SESSION_STARTED, COLLABORATION_SESSION_COMPLETED, COLLABORATION_TURN_TAKEN |
Human-agent session events |
| Plugin | PLUGIN_LOADED, PLUGIN_ERROR |
Plugin discovery and loading |
Event¶
The Event is a frozen (immutable) dataclass representing a single event in the system:
| Field | Type | Description |
|---|---|---|
event_type |
EventType |
Category of this event |
source |
str |
Component that emitted the event (e.g. "SimulationEngine") |
payload |
dict[str, Any] |
Arbitrary event data |
correlation_id |
str | None |
Links related events (e.g. all events in one simulation run) |
event_id |
str |
Unique UUID assigned automatically |
timestamp |
str |
ISO 8601 timestamp assigned automatically |
Creating Events¶
from fcc.messaging.events import Event, EventType
event = Event(
event_type=EventType.SIMULATION_STARTED,
source="SimulationEngine",
payload={"scenario_id": "GEN-001", "mode": "mock"},
correlation_id="run-abc-123",
)
Serialization¶
Events serialize to and from dictionaries for JSON storage:
EventBus¶
The EventBus is the core pub/sub broker. It is thread-safe, not a singleton (tests create fresh instances), and engines accept an optional bus parameter.
Subscribe¶
from fcc.messaging.bus import EventBus, EventFilter
from fcc.messaging.events import EventType
bus = EventBus()
# Global subscriber — receives ALL events
def log_all(event):
print(f"[{event.event_type.value}] {event.source}")
bus.subscribe(log_all)
# Filtered subscriber — only workflow events
def on_workflow(event):
print(f"Workflow: {event.payload}")
bus.subscribe(on_workflow, EventFilter(
event_types=(EventType.WORKFLOW_STARTED, EventType.WORKFLOW_COMPLETED),
))
# Source-filtered subscriber
bus.subscribe(handler, EventFilter(sources=("SimulationEngine",)))
# Correlation-filtered subscriber
bus.subscribe(handler, EventFilter(correlation_ids=("run-abc-123",)))
Publish¶
The bus delivers events synchronously. If a subscriber raises an exception, it is silently caught so that other subscribers and the publisher are not affected.
Unsubscribe¶
Recording and History¶
The bus can record all published events for later inspection or replay:
bus.start_recording()
# ... publish events ...
history = bus.get_history() # Returns list[Event]
bus.stop_recording()
bus.clear_history()
EventFilter¶
The EventFilter is a frozen dataclass that selects which events a subscriber receives:
| Field | Type | Description |
|---|---|---|
event_types |
tuple[EventType, ...] |
Only deliver events of these types |
sources |
tuple[str, ...] |
Only deliver events from these sources |
correlation_ids |
tuple[str, ...] |
Only deliver events with these correlation IDs |
All filter fields are optional. An empty filter matches all events. When multiple fields are set, all must match (AND logic).
filt = EventFilter(
event_types=(EventType.SIMULATION_STEP,),
sources=("SimulationEngine",),
)
assert filt.matches(event) # True if event matches all criteria
The bus optimizes single-event-type subscriptions into a fast-path lookup by event type, avoiding filter evaluation overhead for the common case.
EventSerializer¶
The EventSerializer provides JSON serialization for events, supporting single and batch operations plus file persistence:
from fcc.messaging.serialization import EventSerializer
# Single event
json_str = EventSerializer.to_json(event)
restored = EventSerializer.from_json(json_str)
# Batch
json_str = EventSerializer.to_json_batch(events)
restored = EventSerializer.from_json_batch(json_str)
# File I/O
count = EventSerializer.save(events, "output/events.json")
loaded = EventSerializer.load("output/events.json")
The file format wraps events in a {"event_count": N, "events": [...]} envelope.
EventReplay¶
The EventReplay class replays recorded events through an EventBus, enabling testing, debugging, and workflow re-execution:
from fcc.messaging.serialization import EventReplay
replay = EventReplay(bus)
# Replay a list of events in order
total_deliveries = replay.replay(events)
# Load and replay from a file
events = EventSerializer.load("output/events.json")
replay.replay(events)
# Replay from a filtered subset
filtered = [e for e in events if e.event_type == EventType.SIMULATION_STEP]
replay.replay(filtered)
Integration Points¶
The event bus is integrated with these FCC subsystems:
| Subsystem | Events Published | When |
|---|---|---|
| SimulationEngine | SIMULATION_*, NODE_*, PERSONA_* |
During simulation runs |
| ActionEngine | ACTION_* |
During action execution |
| CollaborationEngine | COLLABORATION_* |
During human-agent sessions |
| GovernanceGates | GOVERNANCE_* |
During quality gate evaluation |
| PluginRegistry | PLUGIN_* |
During plugin discovery |
EventSubscriberPlugin¶
The EventSubscriberPlugin (the 10th plugin type) connects the plugin system with the event bus. Subscriber plugins can register callbacks for specific event types, enabling external packages to react to FCC workflow events without modifying core code.