Skip to content

Messaging API Reference

Module: fcc.messaging

The messaging module provides an in-process event bus, event models, serialization, and replay for the FCC framework.

flowchart LR
    subgraph Publishers
        SE[SimulationEngine]
        AE[ActionEngine]
        CE[CollaborationEngine]
    end
    subgraph EventBus
        EB[EventBus] --> EF{EventFilter}
    end
    subgraph Subscribers
        S1[Subscriber A]
        S2[Subscriber B]
        S3[EventSubscriberPlugin]
    end
    SE -->|publish Event| EB
    AE -->|publish Event| EB
    CE -->|publish Event| EB
    EF -->|matches| S1
    EF -->|matches| S2
    EF -->|matches| S3

Module Structure

Module Description
fcc.messaging.events Event and EventType definitions
fcc.messaging.bus EventBus, EventFilter, Subscriber
fcc.messaging.serialization EventSerializer, EventReplay
fcc.messaging.plugin_bridge EventSubscriberPlugin (10th plugin type)

EventType

from fcc.messaging.events import EventType

Enum with 25 values. See Event Bus Specification for the full list.

Key members: WORKFLOW_STARTED, WORKFLOW_COMPLETED, WORKFLOW_FAILED, NODE_ENTERED, NODE_EXITED, PERSONA_ACTIVATED, PERSONA_DEACTIVATED, SIMULATION_STARTED, SIMULATION_STEP, SIMULATION_COMPLETED, SIMULATION_FAILED, GOVERNANCE_GATE_EVALUATED, GOVERNANCE_GATE_PASSED, GOVERNANCE_GATE_FAILED, DELIVERABLE_CREATED, DELIVERABLE_REVIEWED, ACTION_STARTED, ACTION_COMPLETED, ACTION_FAILED, COLLABORATION_SESSION_CREATED, COLLABORATION_SESSION_STARTED, COLLABORATION_SESSION_COMPLETED, COLLABORATION_TURN_TAKEN, PLUGIN_LOADED, PLUGIN_ERROR.

Event

from fcc.messaging.events import Event

Frozen dataclass. All events are immutable after creation.

Method Signature Returns Description
to_dict() () -> dict[str, Any] dict Serialize to JSON-compatible dictionary
from_dict() (data: dict) -> Event Event Class method to deserialize

Constructor parameters: event_type: EventType, source: str, payload: dict[str, Any] = {}, correlation_id: str | None = None, event_id: str = auto, timestamp: str = auto.

Usage

from fcc.messaging.events import Event, EventType

event = Event(
    event_type=EventType.WORKFLOW_STARTED,
    source="MyEngine",
    payload={"workflow_id": "base_sequence"},
)
print(event.event_id)    # UUID
print(event.timestamp)   # ISO 8601

EventBus

from fcc.messaging.bus import EventBus

Thread-safe pub/sub broker. Not a singleton.

Method Signature Returns Description
subscribe() (subscriber, event_filter=None) None Register a subscriber
unsubscribe() (subscriber) bool Remove subscriber; returns True if found
publish() (event) int Publish event; returns delivery count
start_recording() () None Start recording events to history
stop_recording() () None Stop recording
get_history() () list[Event] Return copy of recorded history
clear_history() () None Clear recorded history

Usage

from fcc.messaging.bus import EventBus, EventFilter
from fcc.messaging.events import Event, EventType

bus = EventBus()

received = []
bus.subscribe(lambda e: received.append(e))
bus.publish(Event(event_type=EventType.WORKFLOW_STARTED, source="test"))
assert len(received) == 1

EventFilter

from fcc.messaging.bus import EventFilter

Frozen dataclass for selecting events.

Method Signature Returns Description
matches() (event: Event) bool True if event passes this filter

Constructor parameters: event_types: tuple[EventType, ...] = (), sources: tuple[str, ...] = (), correlation_ids: tuple[str, ...] = ().

Subscriber

from fcc.messaging.bus import Subscriber

Type alias: Callable[[Event], Any]. Any callable that accepts an Event.

EventSerializer

from fcc.messaging.serialization import EventSerializer
Method Signature Returns Description
to_json() (event: Event) str Single event to JSON string
from_json() (data: str) Event Single event from JSON string
to_json_batch() (events: list[Event]) str Batch to JSON string
from_json_batch() (data: str) list[Event] Batch from JSON string
save() (events, path) int Save to file; returns count
load() (path) list[Event] Load from file

EventReplay

from fcc.messaging.serialization import EventReplay
Method Signature Returns Description
replay() (events: list[Event]) int Replay events; returns total deliveries

Usage

from fcc.messaging.serialization import EventReplay, EventSerializer

events = EventSerializer.load("events.json")
replay = EventReplay(bus)
total = replay.replay(events)