Understanding the Event Bus¶
The FCC event bus is a thread-safe, in-process pub/sub system that lets framework components communicate without tight coupling. Engines, plugins, and custom code can publish events and subscribe to them with optional filtering. This tutorial covers the event model, subscription patterns, and recording/replay.
Event Types¶
FCC defines 81 event types organized into ten categories:
| Category | Event Types | Description |
|---|---|---|
| Workflow | workflow.started, workflow.completed, workflow.failed |
Lifecycle of a workflow execution |
| Node | node.entered, node.exited |
Persona nodes in the workflow graph |
| Persona | persona.activated, persona.deactivated |
Persona activation lifecycle |
| Simulation | simulation.started, simulation.step, simulation.completed, simulation.failed |
Simulation engine events |
| Governance | governance.gate.evaluated, governance.gate.passed, governance.gate.failed |
Quality gate evaluations |
| Deliverable | deliverable.created, deliverable.reviewed |
Artifact production |
| Action | action.started, action.completed, action.failed |
Action engine events |
| Collaboration | collaboration.session.created, collaboration.session.started, collaboration.session.completed, collaboration.turn.taken |
Human-agent collaboration |
| Plugin | plugin.loaded, plugin.error |
Plugin lifecycle |
Creating and Publishing Events¶
Every event is an immutable Event dataclass with a unique ID, timestamp, type, source, and payload:
from fcc.messaging.events import Event, EventType
from fcc.messaging.bus import EventBus
bus = EventBus()
# Create and publish an event
event = Event(
event_type=EventType.WORKFLOW_STARTED,
source="my_application",
payload={"workflow_id": "wf-001", "scenario": "GEN-001"},
)
delivered = bus.publish(event)
print(f"Delivered to {delivered} subscribers")
Event Immutability
Events are frozen dataclasses. Once created, their fields cannot be modified. This ensures that subscribers always see consistent data, even when events are shared across threads.
Subscribing to Events¶
Global Subscribers¶
A global subscriber receives every event published to the bus:
received_events = []
def my_handler(event: Event) -> None:
received_events.append(event)
print(f"[{event.event_type.value}] from {event.source}: {event.payload}")
bus.subscribe(my_handler)
# This event will be delivered to my_handler
bus.publish(Event(
event_type=EventType.SIMULATION_STARTED,
source="engine",
payload={"scenario_id": "GEN-001"},
))
Filtered Subscribers¶
Use EventFilter to receive only specific events:
from fcc.messaging.bus import EventFilter
# Subscribe only to action events
action_filter = EventFilter(
event_types=(
EventType.ACTION_STARTED,
EventType.ACTION_COMPLETED,
EventType.ACTION_FAILED,
),
)
action_events = []
bus.subscribe(lambda e: action_events.append(e), action_filter)
Filtering by Source¶
Receive events only from a specific component:
engine_filter = EventFilter(sources=("ActionEngine",))
bus.subscribe(lambda e: print(f"Engine: {e.event_type.value}"), engine_filter)
Filtering by Correlation ID¶
Correlation IDs link related events together. For example, all events in a single simulation run share one correlation ID:
run_id = "run-abc-123"
# Subscribe only to events from this run
run_filter = EventFilter(correlation_ids=(run_id,))
bus.subscribe(lambda e: print(f"Run event: {e.event_type.value}"), run_filter)
# Publish a correlated event
bus.publish(Event(
event_type=EventType.SIMULATION_STEP,
source="engine",
payload={"step": 1, "persona_id": "RC"},
correlation_id=run_id,
))
Unsubscribing¶
Remove a subscriber when it is no longer needed:
def temporary_handler(event: Event) -> None:
print(f"Temp: {event.event_type.value}")
bus.subscribe(temporary_handler)
removed = bus.unsubscribe(temporary_handler)
print(f"Subscriber removed: {removed}") # True
Recording and Replaying Events¶
The event bus can record all published events for later analysis or replay:
from fcc.messaging.serialization import EventSerializer, EventReplay
# Start recording
bus.start_recording()
# Publish some events
bus.publish(Event(
event_type=EventType.WORKFLOW_STARTED,
source="engine",
payload={"graph": "base_sequence"},
))
bus.publish(Event(
event_type=EventType.SIMULATION_STEP,
source="engine",
payload={"step": 1, "persona_id": "RC"},
))
bus.publish(Event(
event_type=EventType.WORKFLOW_COMPLETED,
source="engine",
payload={"steps": 5},
))
# Stop recording and retrieve history
bus.stop_recording()
history = bus.get_history()
print(f"Recorded {len(history)} events")
# Save to a file
EventSerializer.save(history, "events.json")
# Load and replay through a fresh bus
replay_bus = EventBus()
replay_events = []
replay_bus.subscribe(lambda e: replay_events.append(e))
replayer = EventReplay(replay_bus)
deliveries = replayer.replay_from_file("events.json")
print(f"Replayed {len(replay_events)} events, {deliveries} total deliveries")
Filtered Replay¶
Replay only a subset of recorded events:
replayer = EventReplay(replay_bus)
# Replay only events from "engine"
deliveries = replayer.replay_filtered(
history,
source="engine",
)
# Or replay only events with a specific correlation ID
deliveries = replayer.replay_filtered(
history,
correlation_id="run-abc-123",
)
Serialization¶
Events can be serialized to JSON for storage, transmission, or debugging:
from fcc.messaging.serialization import EventSerializer
event = Event(
event_type=EventType.ACTION_COMPLETED,
source="ActionEngine",
payload={"persona_id": "RC", "action_type": "scaffold"},
)
# Single event
json_str = EventSerializer.to_json(event)
restored = EventSerializer.from_json(json_str)
assert restored.event_type == event.event_type
# Batch operations
batch_json = EventSerializer.to_json_batch([event])
restored_batch = EventSerializer.from_json_batch(batch_json)
Thread Safety¶
The EventBus uses a threading.Lock to protect subscriber lists and history. It is safe to publish and subscribe from multiple threads simultaneously. Subscriber callbacks are invoked outside the lock, so long-running subscribers will not block other publishers.
Subscriber Exceptions
If a subscriber raises an exception, the event bus catches it silently and continues delivering to remaining subscribers. This prevents one broken subscriber from disrupting the entire event pipeline. Monitor your subscribers independently if you need error reporting.
Introspection¶
print(f"Total subscribers: {bus.subscriber_count}")
print(f"Recorded events: {len(bus.get_history())}")
# Clear everything
bus.clear()
print(f"After clear: {bus.subscriber_count} subscribers")
Next Steps¶
- Your First Collaboration Session -- See events in a collaboration workflow
- Creating Event Subscribers -- Build a subscriber plugin
- Adding Observability to Workflows -- Connect events to tracing