Skip to content

Event Bus Integration Guide

The FCC Event Bus provides thread-safe, in-process publish/subscribe messaging for workflow events. It enables observability, collaboration recording, plugin integration, and debugging through event replay.

Overview

  Publisher ──publish()──▶ EventBus ──deliver()──▶ Subscriber(s)
                            ├── Global subscribers (receive all events)
                            ├── Type subscribers (fast path, single type)
                            └── Filtered subscribers (complex criteria)

Key classes:

Class Module Purpose
Event fcc.messaging.events Immutable event record
EventType fcc.messaging.events Enum of 81 event types
EventBus fcc.messaging.bus Thread-safe pub/sub hub
EventFilter fcc.messaging.bus Subscriber delivery criteria
EventSerializer fcc.messaging.serialization JSON serialization
EventReplay fcc.messaging.serialization Replay recorded events

Event Types

The framework defines 81 event types organized into 10 categories:

Workflow events

EventType Value Emitted when
WORKFLOW_STARTED workflow.started A workflow graph begins execution
WORKFLOW_COMPLETED workflow.completed A workflow graph finishes successfully
WORKFLOW_FAILED workflow.failed A workflow graph terminates with an error

Node events

EventType Value Emitted when
NODE_ENTERED node.entered Execution enters a workflow graph node
NODE_EXITED node.exited Execution exits a workflow graph node

Persona events

EventType Value Emitted when
PERSONA_ACTIVATED persona.activated A persona begins processing
PERSONA_DEACTIVATED persona.deactivated A persona finishes processing

Simulation events

EventType Value Emitted when
SIMULATION_STARTED simulation.started A simulation run begins
SIMULATION_STEP simulation.step A single simulation step completes
SIMULATION_COMPLETED simulation.completed A simulation run finishes
SIMULATION_FAILED simulation.failed A simulation run fails

Governance events

EventType Value Emitted when
GOVERNANCE_GATE_EVALUATED governance.gate.evaluated A quality gate is evaluated
GOVERNANCE_GATE_PASSED governance.gate.passed A quality gate passes
GOVERNANCE_GATE_FAILED governance.gate.failed A quality gate fails

Deliverable events

EventType Value Emitted when
DELIVERABLE_CREATED deliverable.created A new deliverable artifact is produced
DELIVERABLE_REVIEWED deliverable.reviewed A deliverable is reviewed

Action events

EventType Value Emitted when
ACTION_STARTED action.started A workflow action begins
ACTION_COMPLETED action.completed A workflow action finishes
ACTION_FAILED action.failed A workflow action fails

Collaboration events

EventType Value Emitted when
COLLABORATION_SESSION_CREATED collaboration.session.created A human-AI session is created
COLLABORATION_SESSION_STARTED collaboration.session.started A collaboration session begins
COLLABORATION_SESSION_COMPLETED collaboration.session.completed A collaboration session ends
COLLABORATION_TURN_TAKEN collaboration.turn.taken A turn is taken in a session

Plugin events

EventType Value Emitted when
PLUGIN_LOADED plugin.loaded A plugin is successfully loaded
PLUGIN_ERROR plugin.error A plugin encounters an error

Creating and Publishing Events

from fcc.messaging.events import Event, EventType
from fcc.messaging.bus import EventBus

bus = EventBus()

# Create and publish an event
event = Event(
    event_type=EventType.WORKFLOW_STARTED,
    source="my_engine",
    payload={"workflow_id": "fcc_base_sequence", "input": "..."},
    correlation_id="run-001",
)

delivered = bus.publish(event)
print(f"Delivered to {delivered} subscribers")

The Event dataclass is frozen (immutable) and auto-generates: - event_id -- a UUID string - timestamp -- ISO 8601 UTC timestamp

Subscribing to Events

Global subscriber (receives everything)

def my_handler(event: Event) -> None:
    print(f"[{event.event_type.value}] {event.source}: {event.payload}")

bus.subscribe(my_handler)

Type-filtered subscriber (fast path)

When you only care about one event type, use the fast path:

from fcc.messaging.bus import EventFilter

bus.subscribe(
    my_handler,
    EventFilter(event_types=(EventType.WORKFLOW_COMPLETED,)),
)

Multi-criteria filter

filter = EventFilter(
    event_types=(EventType.GOVERNANCE_GATE_PASSED, EventType.GOVERNANCE_GATE_FAILED),
    sources=("QualityGateRunner",),
)
bus.subscribe(my_handler, filter)

Correlation-based filter

Track related events across a workflow run:

filter = EventFilter(correlation_ids=("run-001",))
bus.subscribe(my_handler, filter)

Unsubscribing

bus.unsubscribe(my_handler)  # returns True if found

EventFilter Reference

EventFilter is a frozen dataclass with three optional fields. All criteria are AND-combined; empty tuples mean "match all":

Field Type Behavior
event_types tuple[EventType, ...] Match events of these types
sources tuple[str, ...] Match events from these source components
correlation_ids tuple[str, ...] Match events with these correlation IDs
# All governance events from the quality gate runner
EventFilter(
    event_types=(
        EventType.GOVERNANCE_GATE_EVALUATED,
        EventType.GOVERNANCE_GATE_PASSED,
        EventType.GOVERNANCE_GATE_FAILED,
    ),
    sources=("QualityGateRunner",),
)

Recording and Replay

The EventBus supports recording published events for later replay, which is useful for debugging, testing, and audit trails.

Recording events

bus.start_recording()

# ... run your workflow, events are captured ...

bus.stop_recording()
history = bus.get_history()  # list[Event]
print(f"Captured {len(history)} events")

Serialization

Save and load events to/from JSON files:

from fcc.messaging.serialization import EventSerializer

# Save to file
EventSerializer.save(history, "debug/events.json")

# Load from file
events = EventSerializer.load("debug/events.json")

# Single event JSON
json_str = EventSerializer.to_json(event)
restored = EventSerializer.from_json(json_str)

# Batch JSON
json_batch = EventSerializer.to_json_batch(events)
restored_batch = EventSerializer.from_json_batch(json_batch)

Replay

Replay recorded events through a new bus (e.g., with different subscribers):

from fcc.messaging.serialization import EventReplay

replay_bus = EventBus()
replay_bus.subscribe(my_analysis_handler)

replayer = EventReplay(replay_bus)

# Replay all events
total_deliveries = replayer.replay(events)

# Replay from file
total_deliveries = replayer.replay_from_file("debug/events.json")

# Replay only events matching criteria
total_deliveries = replayer.replay_filtered(
    events,
    correlation_id="run-001",
    source="SimulationEngine",
)

Integration with FCC Components

Simulation engine

The SimulationEngine accepts an optional event_bus parameter:

from fcc.simulation.engine import SimulationEngine

bus = EventBus()
engine = SimulationEngine(event_bus=bus)
# Engine emits SIMULATION_STARTED, SIMULATION_STEP, SIMULATION_COMPLETED

Quality gate runner

The QualityGateRunner.run_gate() method accepts an optional event_bus:

from fcc.governance.quality_gates import QualityGateRunner

runner = QualityGateRunner.from_yaml(gates_path)
result = runner.run_gate(gate, artifact, event_bus=bus)
# Emits GOVERNANCE_GATE_PASSED or GOVERNANCE_GATE_FAILED

Observability integration

The observability layer can instrument engines to emit tracing spans alongside events:

from fcc.observability.integration import (
    instrument_simulation_engine,
    instrument_action_engine,
)

Collaboration recording

The SessionRecorder uses the event bus to record collaboration sessions:

from fcc.collaboration.recording import SessionRecorder

recorder = SessionRecorder(bus)
# Captures COLLABORATION_SESSION_CREATED, COLLABORATION_TURN_TAKEN, etc.

Plugin Subscribers

Plugins can register event subscribers that are automatically wired to the EventBus when discovered. See the Plugin Development Guide for details on the EventSubscriberPlugin ABC.

from fcc.messaging.plugin_bridge import EventSubscriberPlugin

class MyLoggerPlugin(EventSubscriberPlugin):
    def plugin_meta(self) -> PluginMeta: ...

    def get_subscribers(self) -> list[tuple[Subscriber, EventFilter | None]]:
        return [
            (self._log_governance, EventFilter(
                event_types=(EventType.GOVERNANCE_GATE_FAILED,)
            )),
            (self._log_all, None),  # global subscriber
        ]

    def _log_governance(self, event: Event) -> None:
        print(f"GATE FAILED: {event.payload}")

    def _log_all(self, event: Event) -> None:
        print(f"[{event.timestamp}] {event.event_type.value}")

Thread Safety

The EventBus is fully thread-safe:

  • All subscription/unsubscription operations are protected by a threading.Lock.
  • History recording is also lock-protected.
  • Event delivery happens outside the lock to avoid deadlocks.
  • Subscriber exceptions are silently caught to prevent one subscriber from breaking other subscribers or the publisher.

Best Practices

  • Use correlation_id to link related events across a workflow execution.
  • Use start_recording() during development to capture event traces for debugging.
  • Serialize event histories to JSON files for audit trails and test fixtures.
  • Keep subscriber handlers fast; long-running work should be dispatched to a background thread or queue.
  • Use EventFilter to reduce noise -- global subscribers receive every event.
  • Prefer the single-type fast path when subscribing to exactly one event type.
  • Create fresh EventBus instances in tests to avoid state leakage between test cases.