Messaging API Reference¶
Module: fcc.messaging
The messaging module provides an in-process event bus, event models, serialization, and replay for the FCC framework.
flowchart LR
subgraph Publishers
SE[SimulationEngine]
AE[ActionEngine]
CE[CollaborationEngine]
end
subgraph EventBus
EB[EventBus] --> EF{EventFilter}
end
subgraph Subscribers
S1[Subscriber A]
S2[Subscriber B]
S3[EventSubscriberPlugin]
end
SE -->|publish Event| EB
AE -->|publish Event| EB
CE -->|publish Event| EB
EF -->|matches| S1
EF -->|matches| S2
EF -->|matches| S3
Module Structure¶
| Module | Description |
|---|---|
fcc.messaging.events |
Event and EventType definitions |
fcc.messaging.bus |
EventBus, EventFilter, Subscriber |
fcc.messaging.serialization |
EventSerializer, EventReplay |
fcc.messaging.plugin_bridge |
EventSubscriberPlugin (10th plugin type) |
EventType¶
Enum with 25 values. See Event Bus Specification for the full list.
Key members: WORKFLOW_STARTED, WORKFLOW_COMPLETED, WORKFLOW_FAILED, NODE_ENTERED, NODE_EXITED, PERSONA_ACTIVATED, PERSONA_DEACTIVATED, SIMULATION_STARTED, SIMULATION_STEP, SIMULATION_COMPLETED, SIMULATION_FAILED, GOVERNANCE_GATE_EVALUATED, GOVERNANCE_GATE_PASSED, GOVERNANCE_GATE_FAILED, DELIVERABLE_CREATED, DELIVERABLE_REVIEWED, ACTION_STARTED, ACTION_COMPLETED, ACTION_FAILED, COLLABORATION_SESSION_CREATED, COLLABORATION_SESSION_STARTED, COLLABORATION_SESSION_COMPLETED, COLLABORATION_TURN_TAKEN, PLUGIN_LOADED, PLUGIN_ERROR.
Event¶
Frozen dataclass. All events are immutable after creation.
| Method | Signature | Returns | Description |
|---|---|---|---|
to_dict() |
() -> dict[str, Any] |
dict | Serialize to JSON-compatible dictionary |
from_dict() |
(data: dict) -> Event |
Event | Class method to deserialize |
Constructor parameters: event_type: EventType, source: str, payload: dict[str, Any] = {}, correlation_id: str | None = None, event_id: str = auto, timestamp: str = auto.
Usage¶
from fcc.messaging.events import Event, EventType
event = Event(
event_type=EventType.WORKFLOW_STARTED,
source="MyEngine",
payload={"workflow_id": "base_sequence"},
)
print(event.event_id) # UUID
print(event.timestamp) # ISO 8601
EventBus¶
Thread-safe pub/sub broker. Not a singleton.
| Method | Signature | Returns | Description |
|---|---|---|---|
subscribe() |
(subscriber, event_filter=None) |
None | Register a subscriber |
unsubscribe() |
(subscriber) |
bool | Remove subscriber; returns True if found |
publish() |
(event) |
int | Publish event; returns delivery count |
start_recording() |
() |
None | Start recording events to history |
stop_recording() |
() |
None | Stop recording |
get_history() |
() |
list[Event] | Return copy of recorded history |
clear_history() |
() |
None | Clear recorded history |
Usage¶
from fcc.messaging.bus import EventBus, EventFilter
from fcc.messaging.events import Event, EventType
bus = EventBus()
received = []
bus.subscribe(lambda e: received.append(e))
bus.publish(Event(event_type=EventType.WORKFLOW_STARTED, source="test"))
assert len(received) == 1
EventFilter¶
Frozen dataclass for selecting events.
| Method | Signature | Returns | Description |
|---|---|---|---|
matches() |
(event: Event) |
bool | True if event passes this filter |
Constructor parameters: event_types: tuple[EventType, ...] = (), sources: tuple[str, ...] = (), correlation_ids: tuple[str, ...] = ().
Subscriber¶
Type alias: Callable[[Event], Any]. Any callable that accepts an Event.
EventSerializer¶
| Method | Signature | Returns | Description |
|---|---|---|---|
to_json() |
(event: Event) |
str | Single event to JSON string |
from_json() |
(data: str) |
Event | Single event from JSON string |
to_json_batch() |
(events: list[Event]) |
str | Batch to JSON string |
from_json_batch() |
(data: str) |
list[Event] | Batch from JSON string |
save() |
(events, path) |
int | Save to file; returns count |
load() |
(path) |
list[Event] | Load from file |
EventReplay¶
| Method | Signature | Returns | Description |
|---|---|---|---|
replay() |
(events: list[Event]) |
int | Replay events; returns total deliveries |