Skip to content

Event Bus

The FCC messaging system provides a thread-safe, in-process event bus with 81 event types, flexible filtering, serialization, and replay capabilities. It serves as the central nervous system connecting workflow execution, simulation, governance, collaboration, and plugin subsystems.

sequenceDiagram
    participant Pub as Publisher
    participant Bus as EventBus
    participant Filt as EventFilter
    participant Sub1 as Subscriber A
    participant Sub2 as Subscriber B

    Pub->>Bus: publish(Event)
    Bus->>Bus: start_recording()
    Bus->>Filt: matches(Event)?
    Filt-->>Bus: true
    Bus->>Sub1: deliver(Event)
    Bus->>Filt: matches(Event)?
    Filt-->>Bus: false
    Note right of Sub2: filtered out
    Bus-->>Pub: delivery count = 1

EventType

The EventType enum defines 81 event types organized into 10 categories:

Category Event Types Description
Workflow WORKFLOW_STARTED, WORKFLOW_COMPLETED, WORKFLOW_FAILED Lifecycle of workflow graph traversals
Node NODE_ENTERED, NODE_EXITED Individual workflow node transitions
Persona PERSONA_ACTIVATED, PERSONA_DEACTIVATED Persona activation during workflows
Simulation SIMULATION_STARTED, SIMULATION_STEP, SIMULATION_COMPLETED, SIMULATION_FAILED Simulation engine lifecycle
Governance GOVERNANCE_GATE_EVALUATED, GOVERNANCE_GATE_PASSED, GOVERNANCE_GATE_FAILED Quality gate evaluations
Deliverable DELIVERABLE_CREATED, DELIVERABLE_REVIEWED Artifact creation and review
Action ACTION_STARTED, ACTION_COMPLETED, ACTION_FAILED Action engine lifecycle
Collaboration COLLABORATION_SESSION_CREATED, COLLABORATION_SESSION_STARTED, COLLABORATION_SESSION_COMPLETED, COLLABORATION_TURN_TAKEN Human-agent session events
Plugin PLUGIN_LOADED, PLUGIN_ERROR Plugin discovery and loading

Event

The Event is a frozen (immutable) dataclass representing a single event in the system:

Field Type Description
event_type EventType Category of this event
source str Component that emitted the event (e.g. "SimulationEngine")
payload dict[str, Any] Arbitrary event data
correlation_id str | None Links related events (e.g. all events in one simulation run)
event_id str Unique UUID assigned automatically
timestamp str ISO 8601 timestamp assigned automatically

Creating Events

from fcc.messaging.events import Event, EventType

event = Event(
    event_type=EventType.SIMULATION_STARTED,
    source="SimulationEngine",
    payload={"scenario_id": "GEN-001", "mode": "mock"},
    correlation_id="run-abc-123",
)

Serialization

Events serialize to and from dictionaries for JSON storage:

d = event.to_dict()
restored = Event.from_dict(d)

EventBus

The EventBus is the core pub/sub broker. It is thread-safe, not a singleton (tests create fresh instances), and engines accept an optional bus parameter.

Subscribe

from fcc.messaging.bus import EventBus, EventFilter
from fcc.messaging.events import EventType

bus = EventBus()

# Global subscriber — receives ALL events
def log_all(event):
    print(f"[{event.event_type.value}] {event.source}")

bus.subscribe(log_all)

# Filtered subscriber — only workflow events
def on_workflow(event):
    print(f"Workflow: {event.payload}")

bus.subscribe(on_workflow, EventFilter(
    event_types=(EventType.WORKFLOW_STARTED, EventType.WORKFLOW_COMPLETED),
))

# Source-filtered subscriber
bus.subscribe(handler, EventFilter(sources=("SimulationEngine",)))

# Correlation-filtered subscriber
bus.subscribe(handler, EventFilter(correlation_ids=("run-abc-123",)))

Publish

delivered = bus.publish(event)  # Returns number of subscribers that received it

The bus delivers events synchronously. If a subscriber raises an exception, it is silently caught so that other subscribers and the publisher are not affected.

Unsubscribe

removed = bus.unsubscribe(handler)  # Returns True if found and removed

Recording and History

The bus can record all published events for later inspection or replay:

bus.start_recording()

# ... publish events ...

history = bus.get_history()  # Returns list[Event]
bus.stop_recording()
bus.clear_history()

EventFilter

The EventFilter is a frozen dataclass that selects which events a subscriber receives:

Field Type Description
event_types tuple[EventType, ...] Only deliver events of these types
sources tuple[str, ...] Only deliver events from these sources
correlation_ids tuple[str, ...] Only deliver events with these correlation IDs

All filter fields are optional. An empty filter matches all events. When multiple fields are set, all must match (AND logic).

filt = EventFilter(
    event_types=(EventType.SIMULATION_STEP,),
    sources=("SimulationEngine",),
)
assert filt.matches(event)  # True if event matches all criteria

The bus optimizes single-event-type subscriptions into a fast-path lookup by event type, avoiding filter evaluation overhead for the common case.

EventSerializer

The EventSerializer provides JSON serialization for events, supporting single and batch operations plus file persistence:

from fcc.messaging.serialization import EventSerializer

# Single event
json_str = EventSerializer.to_json(event)
restored = EventSerializer.from_json(json_str)

# Batch
json_str = EventSerializer.to_json_batch(events)
restored = EventSerializer.from_json_batch(json_str)

# File I/O
count = EventSerializer.save(events, "output/events.json")
loaded = EventSerializer.load("output/events.json")

The file format wraps events in a {"event_count": N, "events": [...]} envelope.

EventReplay

The EventReplay class replays recorded events through an EventBus, enabling testing, debugging, and workflow re-execution:

from fcc.messaging.serialization import EventReplay

replay = EventReplay(bus)

# Replay a list of events in order
total_deliveries = replay.replay(events)

# Load and replay from a file
events = EventSerializer.load("output/events.json")
replay.replay(events)

# Replay from a filtered subset
filtered = [e for e in events if e.event_type == EventType.SIMULATION_STEP]
replay.replay(filtered)

Integration Points

The event bus is integrated with these FCC subsystems:

Subsystem Events Published When
SimulationEngine SIMULATION_*, NODE_*, PERSONA_* During simulation runs
ActionEngine ACTION_* During action execution
CollaborationEngine COLLABORATION_* During human-agent sessions
GovernanceGates GOVERNANCE_* During quality gate evaluation
PluginRegistry PLUGIN_* During plugin discovery

EventSubscriberPlugin

The EventSubscriberPlugin (the 10th plugin type) connects the plugin system with the event bus. Subscriber plugins can register callbacks for specific event types, enabling external packages to react to FCC workflow events without modifying core code.