Skip to content

Messaging Realtime Demo

This demo walks through the three layers of FCC real-time messaging: the in-process EventBus, the WebSocket protocol, and the SSE (Server-Sent Events) bridge. Together they form a pipeline that streams FCC workflow events from Python to the browser.


Table of Contents

  1. Introduction: Three Layers
  2. Layer 1: EventBus
  3. Layer 2: WSProtocol
  4. Layer 3: SSEStreamBridge
  5. AsyncEventBusAdapter
  6. EventBus-to-Browser Pipeline
  7. ProtocolBridge Routing

Introduction: Three Layers

FCC messaging is organized in three layers, each building on the previous:

Layer 3: SSE Bridge   -----> HTTP (text/event-stream)  -----> Dashboard
Layer 2: WS Protocol  -----> WebSocket                 -----> React Frontend
Layer 1: EventBus     -----> In-process pub/sub         -----> Python consumers
Layer Module Transport Use Case
1 fcc.messaging.bus In-process Python-to-Python event dispatch
2 fcc.messaging.ws_protocol WebSocket Real-time browser streaming
3 fcc.messaging.sse HTTP SSE Lightweight dashboard feeds

Each layer can be used independently or composed into a full pipeline.


Layer 1: EventBus

The EventBus (src/fcc/messaging/bus.py) is a thread-safe in-process pub/sub system.

Creating an EventBus

from fcc.messaging.bus import EventBus

bus = EventBus()

Not a singleton -- tests create fresh instances, and engines accept an optional bus parameter.

Subscribing

from fcc.messaging.events import Event

def my_handler(event: Event) -> None:
    print(f"Received: {event.event_type.value} from {event.source}")

bus.subscribe(my_handler)

Publishing

from fcc.messaging.events import Event, EventType

event = Event(
    event_type=EventType.WORKFLOW_STARTED,
    source="demo",
    payload={"workflow_id": "wf-001"},
)
count = bus.publish(event)
print(f"Delivered to {count} subscriber(s)")

Filtering by EventType

from fcc.messaging.bus import EventFilter

# Only receive workflow events
workflow_filter = EventFilter(
    event_types=(EventType.WORKFLOW_STARTED, EventType.WORKFLOW_COMPLETED),
)
bus.subscribe(my_handler, event_filter=workflow_filter)

Filtering by Source

# Only receive events from a specific component
source_filter = EventFilter(sources=("simulation_engine",))
bus.subscribe(my_handler, event_filter=source_filter)

Filtering by Correlation ID

# Only receive events belonging to a specific run
correlation_filter = EventFilter(correlation_ids=("run-42",))
bus.subscribe(my_handler, event_filter=correlation_filter)

Unsubscribing

removed = bus.unsubscribe(my_handler)
print(f"Subscriber removed: {removed}")

Recording and Replay

# Start recording
bus.start_recording()

# Publish several events
bus.publish(Event(event_type=EventType.WORKFLOW_STARTED, source="demo"))
bus.publish(Event(event_type=EventType.NODE_ENTERED, source="demo"))
bus.publish(Event(event_type=EventType.WORKFLOW_COMPLETED, source="demo"))

# Stop recording
bus.stop_recording()

# Retrieve recorded history
history = bus.get_history()
print(f"Recorded {len(history)} events")
for event in history:
    print(f"  {event.event_type.value} at {event.timestamp}")

# Clear history
bus.clear_history()

Subscriber Count

print(f"Total subscribers: {bus.subscriber_count}")

Event Types

The FCC framework defines 81 event types across 10 categories:

Category Event Types Count
Workflow workflow.started, workflow.completed, workflow.failed 3
Node node.entered, node.exited 2
Persona persona.activated, persona.deactivated 2
Simulation simulation.started, simulation.step, simulation.completed, simulation.failed 4
Governance governance.gate.evaluated, governance.gate.passed, governance.gate.failed 3
Deliverable deliverable.created, deliverable.reviewed 2
Action action.started, action.completed, action.failed 3
Collaboration collaboration.session.created, collaboration.session.started, collaboration.session.completed, collaboration.turn.taken 4
Plugin plugin.loaded, plugin.error 2
Protocol A2A protocol.a2a.task.received, protocol.a2a.task.completed, protocol.a2a.task.failed, protocol.a2a.card.requested 4
Protocol MCP protocol.mcp.tool.called, protocol.mcp.tool.completed, protocol.mcp.tool.failed, protocol.mcp.resource.read, protocol.mcp.prompt.rendered 5
Protocol Bridge protocol.bridge.error 1
Simulation Budget simulation.budget.warning, simulation.budget.exceeded 2

Clearing All State

bus.clear()  # Removes all subscribers and clears history

Layer 2: WSProtocol

The WSProtocol (src/fcc/messaging/ws_protocol.py) defines a structured WebSocket message protocol for real-time communication.

Message Types

The protocol supports 10 message types:

Direction Type Purpose
Client to Server subscribe Register for event delivery
Client to Server unsubscribe Remove a subscription
Client to Server request Query subscriptions
Client to Server ping Keepalive check
Server to Client event Deliver a matched event
Server to Client response Reply to a request
Server to Client error Report an error
Server to Client pong Keepalive response
Bidirectional ack Acknowledge subscribe/unsubscribe

WSMessage Dataclass

from fcc.messaging.ws_protocol import WSMessage, WSMessageType

msg = WSMessage(
    type=WSMessageType.SUBSCRIBE,
    payload={"event_types": ["workflow.started", "workflow.completed"]},
)

# Serialize
json_str = msg.to_json()
msg_dict = msg.to_dict()

# Deserialize
restored = WSMessage.from_json(json_str)
restored = WSMessage.from_dict(msg_dict)

# Fields
print(f"Type: {msg.type.value}")
print(f"Message ID: {msg.message_id}")      # Auto-generated UUID
print(f"Timestamp: {msg.timestamp}")         # Auto-generated ISO 8601
print(f"Correlation ID: {msg.correlation_id}")  # None or linked ID

WSSubscription Dataclass

from fcc.messaging.ws_protocol import WSSubscription

sub = WSSubscription(
    subscription_id="sub-001",
    event_types=("workflow.started", "simulation.completed"),
    sources=("simulation_engine",),
    correlation_id=None,
)

# Check if an event matches this subscription
event_data = {
    "event_type": "workflow.started",
    "source": "simulation_engine",
}
print(f"Matches: {sub.matches(event_data)}")  # True

# Serialize
sub_dict = sub.to_dict()
restored_sub = WSSubscription.from_dict(sub_dict)

Using WSProtocol

The WSProtocol class manages subscriptions and routes messages:

from fcc.messaging.ws_protocol import WSProtocol

protocol = WSProtocol()

# Handle a subscribe message
response = protocol.handle_message('{"type": "subscribe", "payload": {"subscription_id": "s1", "event_types": ["workflow.started"]}}')
print(f"Response type: {response.type.value}")  # ack
print(f"Subscription count: {protocol.subscription_count}")  # 1

# Handle a ping
response = protocol.handle_message('{"type": "ping"}')
print(f"Response type: {response.type.value}")  # pong

# Handle an unsubscribe
response = protocol.handle_message('{"type": "unsubscribe", "payload": {"subscription_id": "s1"}}')
print(f"Response type: {response.type.value}")  # ack
print(f"Subscription count: {protocol.subscription_count}")  # 0

Subscription Management

# List active subscriptions
subs = protocol.list_subscriptions()

# Look up by ID
sub = protocol.get_subscription("s1")

# Check which subscriptions should receive an event
event_data = {"event_type": "workflow.started", "source": "engine"}
matching_ids = protocol.should_deliver(event_data)

# Create an event message for delivery
event_msg = protocol.create_event_message(event_data)
print(f"Event message type: {event_msg.type.value}")  # event

Request Handling

The protocol supports two request actions:

# List all subscriptions
response = protocol.handle_message('{"type": "request", "payload": {"action": "list_subscriptions"}}')
print(response.payload)  # {"subscriptions": [...]}

# Get subscription count
response = protocol.handle_message('{"type": "request", "payload": {"action": "subscription_count"}}')
print(response.payload)  # {"count": 0}

Error Handling

Invalid messages return error responses:

response = protocol.handle_message("not valid json")
print(f"Type: {response.type.value}")  # error
print(f"Error: {response.payload['error']}")  # "Invalid message format"

# Unknown request action
response = protocol.handle_message('{"type": "request", "payload": {"action": "unknown"}}')
print(f"Error: {response.payload['error']}")  # "Unknown action: unknown"

Layer 3: SSEStreamBridge

The SSEStreamBridge (src/fcc/messaging/sse.py) formats events as Server-Sent Events for HTTP streaming.

Formatting Events

from fcc.messaging.sse import format_sse_event
from fcc.messaging.events import Event, EventType

event = Event(
    event_type=EventType.WORKFLOW_STARTED,
    source="demo",
    payload={"workflow_id": "wf-001"},
)

sse_text = format_sse_event(event)
print(sse_text)

Output:

event: workflow.started
data: {"event_id": "...", "event_type": "workflow.started", ...}
id: <event_id>

Each message ends with a blank line as required by the SSE specification.

Custom Event Names

sse_text = format_sse_event(event, event_name="custom.event.name")
# event: custom.event.name
# data: {...}
# id: ...

Parsing SSE Lines

from fcc.messaging.sse import parse_sse_line

parsed = parse_sse_line("event: workflow.started")
print(parsed)  # {"event": "workflow.started"}

parsed = parse_sse_line("data: {\"key\": \"value\"}")
print(parsed)  # {"data": "{\"key\": \"value\"}"}

SSEStreamBridge Buffering

from fcc.messaging.sse import SSEStreamBridge

bridge = SSEStreamBridge(max_buffer=1000)

# Add events
bridge.add_event(event)
print(f"Buffer size: {bridge.buffer_size}")   # 1
print(f"Event count: {bridge.event_count}")   # 1

# Stream buffered events
for sse_msg in bridge.stream():
    print(sse_msg)

# Clear buffer (event_count is not reset)
bridge.clear()
print(f"Buffer size after clear: {bridge.buffer_size}")  # 0
print(f"Event count after clear: {bridge.event_count}")  # 1

HTTP Headers

headers = bridge.headers()
print(headers)
# {
#     'Content-Type': 'text/event-stream',
#     'Cache-Control': 'no-cache',
#     'Connection': 'keep-alive',
#     'X-Accel-Buffering': 'no',
# }

EventBus Integration

Wire the bridge to an EventBus so events are automatically formatted and buffered:

from fcc.messaging.bus import EventBus

bus = EventBus()
bridge = SSEStreamBridge()

# Create a subscriber that forwards events to the bridge
subscriber = bridge.create_subscriber()
bus.subscribe(subscriber)

# Now every event published to the bus is also buffered in the bridge
bus.publish(Event(
    event_type=EventType.SIMULATION_STARTED,
    source="engine",
))

print(f"Bridge buffer size: {bridge.buffer_size}")  # 1

AsyncEventBusAdapter

The AsyncEventBusAdapter (src/fcc/messaging/async_adapter.py) wraps the synchronous EventBus for asyncio usage.

Creating the Adapter

import asyncio
from fcc.messaging.async_adapter import AsyncEventBusAdapter
from fcc.messaging.bus import EventBus

bus = EventBus()
adapter = AsyncEventBusAdapter(bus)

# Access the underlying bus
print(f"Bus subscriber count: {adapter.bus.subscriber_count}")

Async Publishing

async def demo_publish():
    event = Event(
        event_type=EventType.WORKFLOW_STARTED,
        source="async_demo",
    )
    count = await adapter.publish(event)
    print(f"Delivered to {count} sync subscriber(s)")

The publish method:

  1. Publishes to the synchronous bus via asyncio.to_thread().
  2. Delivers to all registered async subscribers.

Async Subscribers

async def my_async_handler(event: Event) -> None:
    print(f"Async received: {event.event_type.value}")

adapter.subscribe_async(my_async_handler)
print(f"Async subscribers: {adapter.async_subscriber_count}")  # 1

# Remove async subscriber
adapter.unsubscribe_async(my_async_handler)

Sync Subscribers Through the Adapter

def my_sync_handler(event: Event) -> None:
    print(f"Sync received: {event.event_type.value}")

adapter.subscribe_sync(my_sync_handler)
# Delegates to the underlying bus.subscribe()

Queue-Based Forwarding

The adapter can forward synchronous bus events to an asyncio.Queue for consumption in async code:

async def demo_forwarding():
    await adapter.start_forwarding()
    print(f"Forwarding active: {adapter.running}")  # True

    # Events published anywhere on the bus are queued
    adapter.bus.publish(Event(
        event_type=EventType.NODE_ENTERED,
        source="engine",
    ))

    # Consume from the queue
    event = await adapter.get_event(timeout=1.0)
    if event:
        print(f"Got event: {event.event_type.value}")
    else:
        print("Timeout -- no event received")

    print(f"Queue size: {adapter.queue_size}")

    # Stop forwarding
    adapter.stop_forwarding()
    print(f"Forwarding active: {adapter.running}")  # False

Consuming Events in a Loop

async def event_consumer():
    await adapter.start_forwarding()
    try:
        while True:
            event = await adapter.get_event(timeout=5.0)
            if event is None:
                continue  # Timeout, keep waiting
            print(f"Processing: {event.event_type.value}")
    finally:
        adapter.stop_forwarding()

EventBus-to-Browser Pipeline

The three layers combine into a full pipeline that streams events from Python to the browser.

Pipeline 1: EventBus to WebSocket to React

Python:                          Browser:
EventBus.publish()               useWebSocket hook
    |                                ^
    v                                |
WSProtocol.create_event_message()    |
    |                                |
    v                                |
WebSocket server sends JSON   ----->  ws.onmessage()

Implementation:

from fcc.messaging.bus import EventBus
from fcc.messaging.ws_protocol import WSProtocol

bus = EventBus()
protocol = WSProtocol()

def ws_forwarder(event: Event) -> None:
    event_data = event.to_dict()
    matching_subs = protocol.should_deliver(event_data)
    if matching_subs:
        ws_message = protocol.create_event_message(event_data)
        # Send ws_message.to_json() to all matching WebSocket clients
        for sub_id in matching_subs:
            send_to_client(sub_id, ws_message.to_json())

bus.subscribe(ws_forwarder)

On the React side, the useWebSocket hook handles connection, parsing, and auto-reconnect:

const { isConnected, messages, lastMessage } = useWebSocket(
  'ws://localhost:8765/ws/events'
)

// Auto-reconnects with 3s delay, up to 10 attempts
// Messages are parsed as WebSocketMessage objects
// isConnected drives the status indicator on Dashboard

Pipeline 2: EventBus to SSE to Dashboard

Python:                            Browser:
EventBus.publish()                 EventSource
    |                                  ^
    v                                  |
SSEStreamBridge.add_event()            |
    |                                  |
    v                                  |
HTTP response (text/event-stream) --> onmessage()

Implementation:

from fcc.messaging.sse import SSEStreamBridge

bridge = SSEStreamBridge()
subscriber = bridge.create_subscriber()
bus.subscribe(subscriber)

# In an HTTP handler (e.g. Flask, FastAPI):
def sse_endpoint():
    headers = bridge.headers()
    def generate():
        yield from bridge.stream()
    return Response(generate(), headers=headers)

Event feed

Choosing Between WebSocket and SSE

Factor WebSocket SSE
Direction Bidirectional Server to client only
Subscriptions Dynamic (subscribe/unsubscribe) All events
Reconnection Manual (useWebSocket handles it) Built into EventSource
Protocol overhead Custom WSMessage format Standard SSE format
Best for Interactive React frontend Read-only dashboards

ProtocolBridge Routing

The ProtocolBridge (src/fcc/protocols/bridge.py) acts as a central dispatcher, routing protocol messages through the EventBus.

Creating the Bridge

from fcc.messaging.bus import EventBus
from fcc.protocols.bridge import ProtocolBridge

bus = EventBus()
bridge = ProtocolBridge.create_default(bus)

The default bridge registers two stub handlers:

  • a2a -- Returns {"status": "accepted", "protocol": "a2a", ...}
  • mcp -- Returns {"status": "accepted", "protocol": "mcp", ...}

Routing Messages

# Route an A2A task
result = bridge.route("a2a", {"task_id": "t1", "skill": "review"})
print(result)
# {'status': 'accepted', 'protocol': 'a2a', 'message_id': 't1'}

# Route an MCP tool call
result = bridge.route("mcp", {"tool": "persona_lookup", "args": {"id": "RC"}})
print(result)
# {'status': 'accepted', 'protocol': 'mcp', 'tool': 'persona_lookup'}

Event Publishing

When a message is routed, the bridge automatically publishes events:

received_events = []

def event_listener(event: Event) -> None:
    received_events.append(event)

bus.subscribe(event_listener)

bridge.route("a2a", {"task_id": "t2", "skill": "analyze"})

for event in received_events:
    print(f"  {event.event_type.value}: {event.payload}")
# protocol.a2a.task.received: {'protocol': 'a2a', 'message': {'task_id': 't2', ...}}

Protocol-specific entry events:

Protocol Entry Event Type
a2a protocol.a2a.task.received
mcp protocol.mcp.tool.called
Unknown protocol.bridge.error

Custom Route Handlers

Replace the default stubs with real implementations:

def custom_a2a_handler(message: dict) -> dict:
    task_id = message.get("task_id", "unknown")
    # Process the A2A task...
    return {"status": "completed", "task_id": task_id, "result": "..."}

bridge.register_route("a2a", custom_a2a_handler)

Route Management

# Check if a route exists
print(bridge.has_route("a2a"))  # True
print(bridge.has_route("grpc"))  # False

# List all registered routes
print(bridge.list_routes())  # ['a2a', 'mcp']

Error Handling

If a handler raises an exception or the protocol is unknown, the bridge publishes a PROTOCOL_BRIDGE_ERROR event:

# Unknown protocol
result = bridge.route("unknown_protocol", {"data": "test"})
print(result)
# {'status': 'error', 'error': "No route registered for protocol 'unknown_protocol'"}

# Handler exception
def broken_handler(msg: dict) -> dict:
    raise ValueError("Something went wrong")

bridge.register_route("broken", broken_handler)
result = bridge.route("broken", {"data": "test"})
print(result)
# {'status': 'error', 'error': 'Something went wrong'}

Full Pipeline: Protocol to Browser

Combining all layers, a protocol event flows from the bridge to the browser:

1. bridge.route("a2a", message)
       |
2. ProtocolBridge publishes PROTOCOL_A2A_TASK_RECEIVED to EventBus
       |
3. EventBus delivers to ws_forwarder subscriber
       |
4. WSProtocol.create_event_message() wraps the event
       |
5. WebSocket server sends JSON to connected clients
       |
6. React useWebSocket hook receives and parses the message
       |
7. Protocol Explorer page renders the event in the live feed

See Also

  • Web Frontend Guided Demo -- React frontend consuming WebSocket events
  • Open Science Demo -- governance gate events through the pipeline
  • Distiller Bridge Demo -- bridge adapter integration
  • src/fcc/messaging/bus.py -- EventBus source
  • src/fcc/messaging/ws_protocol.py -- WSProtocol source
  • src/fcc/messaging/sse.py -- SSEStreamBridge source
  • src/fcc/messaging/async_adapter.py -- AsyncEventBusAdapter source
  • src/fcc/protocols/bridge.py -- ProtocolBridge source