Skip to content

Understanding the Event Bus

The FCC event bus is a thread-safe, in-process pub/sub system that lets framework components communicate without tight coupling. Engines, plugins, and custom code can publish events and subscribe to them with optional filtering. This tutorial covers the event model, subscription patterns, and recording/replay.

Event Types

FCC defines 81 event types organized into ten categories:

Category Event Types Description
Workflow workflow.started, workflow.completed, workflow.failed Lifecycle of a workflow execution
Node node.entered, node.exited Persona nodes in the workflow graph
Persona persona.activated, persona.deactivated Persona activation lifecycle
Simulation simulation.started, simulation.step, simulation.completed, simulation.failed Simulation engine events
Governance governance.gate.evaluated, governance.gate.passed, governance.gate.failed Quality gate evaluations
Deliverable deliverable.created, deliverable.reviewed Artifact production
Action action.started, action.completed, action.failed Action engine events
Collaboration collaboration.session.created, collaboration.session.started, collaboration.session.completed, collaboration.turn.taken Human-agent collaboration
Plugin plugin.loaded, plugin.error Plugin lifecycle

Creating and Publishing Events

Every event is an immutable Event dataclass with a unique ID, timestamp, type, source, and payload:

from fcc.messaging.events import Event, EventType
from fcc.messaging.bus import EventBus

bus = EventBus()

# Create and publish an event
event = Event(
    event_type=EventType.WORKFLOW_STARTED,
    source="my_application",
    payload={"workflow_id": "wf-001", "scenario": "GEN-001"},
)

delivered = bus.publish(event)
print(f"Delivered to {delivered} subscribers")

Event Immutability

Events are frozen dataclasses. Once created, their fields cannot be modified. This ensures that subscribers always see consistent data, even when events are shared across threads.

Subscribing to Events

Global Subscribers

A global subscriber receives every event published to the bus:

received_events = []

def my_handler(event: Event) -> None:
    received_events.append(event)
    print(f"[{event.event_type.value}] from {event.source}: {event.payload}")

bus.subscribe(my_handler)

# This event will be delivered to my_handler
bus.publish(Event(
    event_type=EventType.SIMULATION_STARTED,
    source="engine",
    payload={"scenario_id": "GEN-001"},
))

Filtered Subscribers

Use EventFilter to receive only specific events:

from fcc.messaging.bus import EventFilter

# Subscribe only to action events
action_filter = EventFilter(
    event_types=(
        EventType.ACTION_STARTED,
        EventType.ACTION_COMPLETED,
        EventType.ACTION_FAILED,
    ),
)

action_events = []
bus.subscribe(lambda e: action_events.append(e), action_filter)

Filtering by Source

Receive events only from a specific component:

engine_filter = EventFilter(sources=("ActionEngine",))
bus.subscribe(lambda e: print(f"Engine: {e.event_type.value}"), engine_filter)

Filtering by Correlation ID

Correlation IDs link related events together. For example, all events in a single simulation run share one correlation ID:

run_id = "run-abc-123"

# Subscribe only to events from this run
run_filter = EventFilter(correlation_ids=(run_id,))
bus.subscribe(lambda e: print(f"Run event: {e.event_type.value}"), run_filter)

# Publish a correlated event
bus.publish(Event(
    event_type=EventType.SIMULATION_STEP,
    source="engine",
    payload={"step": 1, "persona_id": "RC"},
    correlation_id=run_id,
))

Unsubscribing

Remove a subscriber when it is no longer needed:

def temporary_handler(event: Event) -> None:
    print(f"Temp: {event.event_type.value}")

bus.subscribe(temporary_handler)
removed = bus.unsubscribe(temporary_handler)
print(f"Subscriber removed: {removed}")  # True

Recording and Replaying Events

The event bus can record all published events for later analysis or replay:

from fcc.messaging.serialization import EventSerializer, EventReplay

# Start recording
bus.start_recording()

# Publish some events
bus.publish(Event(
    event_type=EventType.WORKFLOW_STARTED,
    source="engine",
    payload={"graph": "base_sequence"},
))
bus.publish(Event(
    event_type=EventType.SIMULATION_STEP,
    source="engine",
    payload={"step": 1, "persona_id": "RC"},
))
bus.publish(Event(
    event_type=EventType.WORKFLOW_COMPLETED,
    source="engine",
    payload={"steps": 5},
))

# Stop recording and retrieve history
bus.stop_recording()
history = bus.get_history()
print(f"Recorded {len(history)} events")

# Save to a file
EventSerializer.save(history, "events.json")

# Load and replay through a fresh bus
replay_bus = EventBus()
replay_events = []
replay_bus.subscribe(lambda e: replay_events.append(e))

replayer = EventReplay(replay_bus)
deliveries = replayer.replay_from_file("events.json")
print(f"Replayed {len(replay_events)} events, {deliveries} total deliveries")

Filtered Replay

Replay only a subset of recorded events:

replayer = EventReplay(replay_bus)

# Replay only events from "engine"
deliveries = replayer.replay_filtered(
    history,
    source="engine",
)

# Or replay only events with a specific correlation ID
deliveries = replayer.replay_filtered(
    history,
    correlation_id="run-abc-123",
)

Serialization

Events can be serialized to JSON for storage, transmission, or debugging:

from fcc.messaging.serialization import EventSerializer

event = Event(
    event_type=EventType.ACTION_COMPLETED,
    source="ActionEngine",
    payload={"persona_id": "RC", "action_type": "scaffold"},
)

# Single event
json_str = EventSerializer.to_json(event)
restored = EventSerializer.from_json(json_str)
assert restored.event_type == event.event_type

# Batch operations
batch_json = EventSerializer.to_json_batch([event])
restored_batch = EventSerializer.from_json_batch(batch_json)

Thread Safety

The EventBus uses a threading.Lock to protect subscriber lists and history. It is safe to publish and subscribe from multiple threads simultaneously. Subscriber callbacks are invoked outside the lock, so long-running subscribers will not block other publishers.

Subscriber Exceptions

If a subscriber raises an exception, the event bus catches it silently and continues delivering to remaining subscribers. This prevents one broken subscriber from disrupting the entire event pipeline. Monitor your subscribers independently if you need error reporting.

Introspection

print(f"Total subscribers: {bus.subscriber_count}")
print(f"Recorded events: {len(bus.get_history())}")

# Clear everything
bus.clear()
print(f"After clear: {bus.subscriber_count} subscribers")

Next Steps