Messaging Realtime Demo¶
This demo walks through the three layers of FCC real-time messaging: the in-process EventBus, the WebSocket protocol, and the SSE (Server-Sent Events) bridge. Together they form a pipeline that streams FCC workflow events from Python to the browser.
Table of Contents¶
- Introduction: Three Layers
- Layer 1: EventBus
- Layer 2: WSProtocol
- Layer 3: SSEStreamBridge
- AsyncEventBusAdapter
- EventBus-to-Browser Pipeline
- ProtocolBridge Routing
Introduction: Three Layers¶
FCC messaging is organized in three layers, each building on the previous:
Layer 3: SSE Bridge -----> HTTP (text/event-stream) -----> Dashboard
Layer 2: WS Protocol -----> WebSocket -----> React Frontend
Layer 1: EventBus -----> In-process pub/sub -----> Python consumers
| Layer | Module | Transport | Use Case |
|---|---|---|---|
| 1 | fcc.messaging.bus |
In-process | Python-to-Python event dispatch |
| 2 | fcc.messaging.ws_protocol |
WebSocket | Real-time browser streaming |
| 3 | fcc.messaging.sse |
HTTP SSE | Lightweight dashboard feeds |
Each layer can be used independently or composed into a full pipeline.
Layer 1: EventBus¶
The EventBus (src/fcc/messaging/bus.py) is a thread-safe in-process
pub/sub system.
Creating an EventBus¶
Not a singleton -- tests create fresh instances, and engines accept an
optional bus parameter.
Subscribing¶
from fcc.messaging.events import Event
def my_handler(event: Event) -> None:
print(f"Received: {event.event_type.value} from {event.source}")
bus.subscribe(my_handler)
Publishing¶
from fcc.messaging.events import Event, EventType
event = Event(
event_type=EventType.WORKFLOW_STARTED,
source="demo",
payload={"workflow_id": "wf-001"},
)
count = bus.publish(event)
print(f"Delivered to {count} subscriber(s)")
Filtering by EventType¶
from fcc.messaging.bus import EventFilter
# Only receive workflow events
workflow_filter = EventFilter(
event_types=(EventType.WORKFLOW_STARTED, EventType.WORKFLOW_COMPLETED),
)
bus.subscribe(my_handler, event_filter=workflow_filter)
Filtering by Source¶
# Only receive events from a specific component
source_filter = EventFilter(sources=("simulation_engine",))
bus.subscribe(my_handler, event_filter=source_filter)
Filtering by Correlation ID¶
# Only receive events belonging to a specific run
correlation_filter = EventFilter(correlation_ids=("run-42",))
bus.subscribe(my_handler, event_filter=correlation_filter)
Unsubscribing¶
Recording and Replay¶
# Start recording
bus.start_recording()
# Publish several events
bus.publish(Event(event_type=EventType.WORKFLOW_STARTED, source="demo"))
bus.publish(Event(event_type=EventType.NODE_ENTERED, source="demo"))
bus.publish(Event(event_type=EventType.WORKFLOW_COMPLETED, source="demo"))
# Stop recording
bus.stop_recording()
# Retrieve recorded history
history = bus.get_history()
print(f"Recorded {len(history)} events")
for event in history:
print(f" {event.event_type.value} at {event.timestamp}")
# Clear history
bus.clear_history()
Subscriber Count¶
Event Types¶
The FCC framework defines 81 event types across 10 categories:
| Category | Event Types | Count |
|---|---|---|
| Workflow | workflow.started, workflow.completed, workflow.failed |
3 |
| Node | node.entered, node.exited |
2 |
| Persona | persona.activated, persona.deactivated |
2 |
| Simulation | simulation.started, simulation.step, simulation.completed, simulation.failed |
4 |
| Governance | governance.gate.evaluated, governance.gate.passed, governance.gate.failed |
3 |
| Deliverable | deliverable.created, deliverable.reviewed |
2 |
| Action | action.started, action.completed, action.failed |
3 |
| Collaboration | collaboration.session.created, collaboration.session.started, collaboration.session.completed, collaboration.turn.taken |
4 |
| Plugin | plugin.loaded, plugin.error |
2 |
| Protocol A2A | protocol.a2a.task.received, protocol.a2a.task.completed, protocol.a2a.task.failed, protocol.a2a.card.requested |
4 |
| Protocol MCP | protocol.mcp.tool.called, protocol.mcp.tool.completed, protocol.mcp.tool.failed, protocol.mcp.resource.read, protocol.mcp.prompt.rendered |
5 |
| Protocol Bridge | protocol.bridge.error |
1 |
| Simulation Budget | simulation.budget.warning, simulation.budget.exceeded |
2 |
Clearing All State¶
Layer 2: WSProtocol¶
The WSProtocol (src/fcc/messaging/ws_protocol.py) defines a structured
WebSocket message protocol for real-time communication.
Message Types¶
The protocol supports 10 message types:
| Direction | Type | Purpose |
|---|---|---|
| Client to Server | subscribe |
Register for event delivery |
| Client to Server | unsubscribe |
Remove a subscription |
| Client to Server | request |
Query subscriptions |
| Client to Server | ping |
Keepalive check |
| Server to Client | event |
Deliver a matched event |
| Server to Client | response |
Reply to a request |
| Server to Client | error |
Report an error |
| Server to Client | pong |
Keepalive response |
| Bidirectional | ack |
Acknowledge subscribe/unsubscribe |
WSMessage Dataclass¶
from fcc.messaging.ws_protocol import WSMessage, WSMessageType
msg = WSMessage(
type=WSMessageType.SUBSCRIBE,
payload={"event_types": ["workflow.started", "workflow.completed"]},
)
# Serialize
json_str = msg.to_json()
msg_dict = msg.to_dict()
# Deserialize
restored = WSMessage.from_json(json_str)
restored = WSMessage.from_dict(msg_dict)
# Fields
print(f"Type: {msg.type.value}")
print(f"Message ID: {msg.message_id}") # Auto-generated UUID
print(f"Timestamp: {msg.timestamp}") # Auto-generated ISO 8601
print(f"Correlation ID: {msg.correlation_id}") # None or linked ID
WSSubscription Dataclass¶
from fcc.messaging.ws_protocol import WSSubscription
sub = WSSubscription(
subscription_id="sub-001",
event_types=("workflow.started", "simulation.completed"),
sources=("simulation_engine",),
correlation_id=None,
)
# Check if an event matches this subscription
event_data = {
"event_type": "workflow.started",
"source": "simulation_engine",
}
print(f"Matches: {sub.matches(event_data)}") # True
# Serialize
sub_dict = sub.to_dict()
restored_sub = WSSubscription.from_dict(sub_dict)
Using WSProtocol¶
The WSProtocol class manages subscriptions and routes messages:
from fcc.messaging.ws_protocol import WSProtocol
protocol = WSProtocol()
# Handle a subscribe message
response = protocol.handle_message('{"type": "subscribe", "payload": {"subscription_id": "s1", "event_types": ["workflow.started"]}}')
print(f"Response type: {response.type.value}") # ack
print(f"Subscription count: {protocol.subscription_count}") # 1
# Handle a ping
response = protocol.handle_message('{"type": "ping"}')
print(f"Response type: {response.type.value}") # pong
# Handle an unsubscribe
response = protocol.handle_message('{"type": "unsubscribe", "payload": {"subscription_id": "s1"}}')
print(f"Response type: {response.type.value}") # ack
print(f"Subscription count: {protocol.subscription_count}") # 0
Subscription Management¶
# List active subscriptions
subs = protocol.list_subscriptions()
# Look up by ID
sub = protocol.get_subscription("s1")
# Check which subscriptions should receive an event
event_data = {"event_type": "workflow.started", "source": "engine"}
matching_ids = protocol.should_deliver(event_data)
# Create an event message for delivery
event_msg = protocol.create_event_message(event_data)
print(f"Event message type: {event_msg.type.value}") # event
Request Handling¶
The protocol supports two request actions:
# List all subscriptions
response = protocol.handle_message('{"type": "request", "payload": {"action": "list_subscriptions"}}')
print(response.payload) # {"subscriptions": [...]}
# Get subscription count
response = protocol.handle_message('{"type": "request", "payload": {"action": "subscription_count"}}')
print(response.payload) # {"count": 0}
Error Handling¶
Invalid messages return error responses:
response = protocol.handle_message("not valid json")
print(f"Type: {response.type.value}") # error
print(f"Error: {response.payload['error']}") # "Invalid message format"
# Unknown request action
response = protocol.handle_message('{"type": "request", "payload": {"action": "unknown"}}')
print(f"Error: {response.payload['error']}") # "Unknown action: unknown"
Layer 3: SSEStreamBridge¶
The SSEStreamBridge (src/fcc/messaging/sse.py) formats events as
Server-Sent Events for HTTP streaming.
Formatting Events¶
from fcc.messaging.sse import format_sse_event
from fcc.messaging.events import Event, EventType
event = Event(
event_type=EventType.WORKFLOW_STARTED,
source="demo",
payload={"workflow_id": "wf-001"},
)
sse_text = format_sse_event(event)
print(sse_text)
Output:
event: workflow.started
data: {"event_id": "...", "event_type": "workflow.started", ...}
id: <event_id>
Each message ends with a blank line as required by the SSE specification.
Custom Event Names¶
sse_text = format_sse_event(event, event_name="custom.event.name")
# event: custom.event.name
# data: {...}
# id: ...
Parsing SSE Lines¶
from fcc.messaging.sse import parse_sse_line
parsed = parse_sse_line("event: workflow.started")
print(parsed) # {"event": "workflow.started"}
parsed = parse_sse_line("data: {\"key\": \"value\"}")
print(parsed) # {"data": "{\"key\": \"value\"}"}
SSEStreamBridge Buffering¶
from fcc.messaging.sse import SSEStreamBridge
bridge = SSEStreamBridge(max_buffer=1000)
# Add events
bridge.add_event(event)
print(f"Buffer size: {bridge.buffer_size}") # 1
print(f"Event count: {bridge.event_count}") # 1
# Stream buffered events
for sse_msg in bridge.stream():
print(sse_msg)
# Clear buffer (event_count is not reset)
bridge.clear()
print(f"Buffer size after clear: {bridge.buffer_size}") # 0
print(f"Event count after clear: {bridge.event_count}") # 1
HTTP Headers¶
headers = bridge.headers()
print(headers)
# {
# 'Content-Type': 'text/event-stream',
# 'Cache-Control': 'no-cache',
# 'Connection': 'keep-alive',
# 'X-Accel-Buffering': 'no',
# }
EventBus Integration¶
Wire the bridge to an EventBus so events are automatically formatted and buffered:
from fcc.messaging.bus import EventBus
bus = EventBus()
bridge = SSEStreamBridge()
# Create a subscriber that forwards events to the bridge
subscriber = bridge.create_subscriber()
bus.subscribe(subscriber)
# Now every event published to the bus is also buffered in the bridge
bus.publish(Event(
event_type=EventType.SIMULATION_STARTED,
source="engine",
))
print(f"Bridge buffer size: {bridge.buffer_size}") # 1
AsyncEventBusAdapter¶
The AsyncEventBusAdapter (src/fcc/messaging/async_adapter.py) wraps the
synchronous EventBus for asyncio usage.
Creating the Adapter¶
import asyncio
from fcc.messaging.async_adapter import AsyncEventBusAdapter
from fcc.messaging.bus import EventBus
bus = EventBus()
adapter = AsyncEventBusAdapter(bus)
# Access the underlying bus
print(f"Bus subscriber count: {adapter.bus.subscriber_count}")
Async Publishing¶
async def demo_publish():
event = Event(
event_type=EventType.WORKFLOW_STARTED,
source="async_demo",
)
count = await adapter.publish(event)
print(f"Delivered to {count} sync subscriber(s)")
The publish method:
- Publishes to the synchronous bus via
asyncio.to_thread(). - Delivers to all registered async subscribers.
Async Subscribers¶
async def my_async_handler(event: Event) -> None:
print(f"Async received: {event.event_type.value}")
adapter.subscribe_async(my_async_handler)
print(f"Async subscribers: {adapter.async_subscriber_count}") # 1
# Remove async subscriber
adapter.unsubscribe_async(my_async_handler)
Sync Subscribers Through the Adapter¶
def my_sync_handler(event: Event) -> None:
print(f"Sync received: {event.event_type.value}")
adapter.subscribe_sync(my_sync_handler)
# Delegates to the underlying bus.subscribe()
Queue-Based Forwarding¶
The adapter can forward synchronous bus events to an asyncio.Queue for
consumption in async code:
async def demo_forwarding():
await adapter.start_forwarding()
print(f"Forwarding active: {adapter.running}") # True
# Events published anywhere on the bus are queued
adapter.bus.publish(Event(
event_type=EventType.NODE_ENTERED,
source="engine",
))
# Consume from the queue
event = await adapter.get_event(timeout=1.0)
if event:
print(f"Got event: {event.event_type.value}")
else:
print("Timeout -- no event received")
print(f"Queue size: {adapter.queue_size}")
# Stop forwarding
adapter.stop_forwarding()
print(f"Forwarding active: {adapter.running}") # False
Consuming Events in a Loop¶
async def event_consumer():
await adapter.start_forwarding()
try:
while True:
event = await adapter.get_event(timeout=5.0)
if event is None:
continue # Timeout, keep waiting
print(f"Processing: {event.event_type.value}")
finally:
adapter.stop_forwarding()
EventBus-to-Browser Pipeline¶
The three layers combine into a full pipeline that streams events from Python to the browser.
Pipeline 1: EventBus to WebSocket to React¶
Python: Browser:
EventBus.publish() useWebSocket hook
| ^
v |
WSProtocol.create_event_message() |
| |
v |
WebSocket server sends JSON -----> ws.onmessage()
Implementation:
from fcc.messaging.bus import EventBus
from fcc.messaging.ws_protocol import WSProtocol
bus = EventBus()
protocol = WSProtocol()
def ws_forwarder(event: Event) -> None:
event_data = event.to_dict()
matching_subs = protocol.should_deliver(event_data)
if matching_subs:
ws_message = protocol.create_event_message(event_data)
# Send ws_message.to_json() to all matching WebSocket clients
for sub_id in matching_subs:
send_to_client(sub_id, ws_message.to_json())
bus.subscribe(ws_forwarder)
On the React side, the useWebSocket hook handles connection, parsing, and
auto-reconnect:
const { isConnected, messages, lastMessage } = useWebSocket(
'ws://localhost:8765/ws/events'
)
// Auto-reconnects with 3s delay, up to 10 attempts
// Messages are parsed as WebSocketMessage objects
// isConnected drives the status indicator on Dashboard
Pipeline 2: EventBus to SSE to Dashboard¶
Python: Browser:
EventBus.publish() EventSource
| ^
v |
SSEStreamBridge.add_event() |
| |
v |
HTTP response (text/event-stream) --> onmessage()
Implementation:
from fcc.messaging.sse import SSEStreamBridge
bridge = SSEStreamBridge()
subscriber = bridge.create_subscriber()
bus.subscribe(subscriber)
# In an HTTP handler (e.g. Flask, FastAPI):
def sse_endpoint():
headers = bridge.headers()
def generate():
yield from bridge.stream()
return Response(generate(), headers=headers)

Choosing Between WebSocket and SSE¶
| Factor | WebSocket | SSE |
|---|---|---|
| Direction | Bidirectional | Server to client only |
| Subscriptions | Dynamic (subscribe/unsubscribe) | All events |
| Reconnection | Manual (useWebSocket handles it) | Built into EventSource |
| Protocol overhead | Custom WSMessage format | Standard SSE format |
| Best for | Interactive React frontend | Read-only dashboards |
ProtocolBridge Routing¶
The ProtocolBridge (src/fcc/protocols/bridge.py) acts as a central
dispatcher, routing protocol messages through the EventBus.
Creating the Bridge¶
from fcc.messaging.bus import EventBus
from fcc.protocols.bridge import ProtocolBridge
bus = EventBus()
bridge = ProtocolBridge.create_default(bus)
The default bridge registers two stub handlers:
a2a-- Returns{"status": "accepted", "protocol": "a2a", ...}mcp-- Returns{"status": "accepted", "protocol": "mcp", ...}
Routing Messages¶
# Route an A2A task
result = bridge.route("a2a", {"task_id": "t1", "skill": "review"})
print(result)
# {'status': 'accepted', 'protocol': 'a2a', 'message_id': 't1'}
# Route an MCP tool call
result = bridge.route("mcp", {"tool": "persona_lookup", "args": {"id": "RC"}})
print(result)
# {'status': 'accepted', 'protocol': 'mcp', 'tool': 'persona_lookup'}
Event Publishing¶
When a message is routed, the bridge automatically publishes events:
received_events = []
def event_listener(event: Event) -> None:
received_events.append(event)
bus.subscribe(event_listener)
bridge.route("a2a", {"task_id": "t2", "skill": "analyze"})
for event in received_events:
print(f" {event.event_type.value}: {event.payload}")
# protocol.a2a.task.received: {'protocol': 'a2a', 'message': {'task_id': 't2', ...}}
Protocol-specific entry events:
| Protocol | Entry Event Type |
|---|---|
a2a |
protocol.a2a.task.received |
mcp |
protocol.mcp.tool.called |
| Unknown | protocol.bridge.error |
Custom Route Handlers¶
Replace the default stubs with real implementations:
def custom_a2a_handler(message: dict) -> dict:
task_id = message.get("task_id", "unknown")
# Process the A2A task...
return {"status": "completed", "task_id": task_id, "result": "..."}
bridge.register_route("a2a", custom_a2a_handler)
Route Management¶
# Check if a route exists
print(bridge.has_route("a2a")) # True
print(bridge.has_route("grpc")) # False
# List all registered routes
print(bridge.list_routes()) # ['a2a', 'mcp']
Error Handling¶
If a handler raises an exception or the protocol is unknown, the bridge
publishes a PROTOCOL_BRIDGE_ERROR event:
# Unknown protocol
result = bridge.route("unknown_protocol", {"data": "test"})
print(result)
# {'status': 'error', 'error': "No route registered for protocol 'unknown_protocol'"}
# Handler exception
def broken_handler(msg: dict) -> dict:
raise ValueError("Something went wrong")
bridge.register_route("broken", broken_handler)
result = bridge.route("broken", {"data": "test"})
print(result)
# {'status': 'error', 'error': 'Something went wrong'}
Full Pipeline: Protocol to Browser¶
Combining all layers, a protocol event flows from the bridge to the browser:
1. bridge.route("a2a", message)
|
2. ProtocolBridge publishes PROTOCOL_A2A_TASK_RECEIVED to EventBus
|
3. EventBus delivers to ws_forwarder subscriber
|
4. WSProtocol.create_event_message() wraps the event
|
5. WebSocket server sends JSON to connected clients
|
6. React useWebSocket hook receives and parses the message
|
7. Protocol Explorer page renders the event in the live feed
See Also¶
- Web Frontend Guided Demo -- React frontend consuming WebSocket events
- Open Science Demo -- governance gate events through the pipeline
- Distiller Bridge Demo -- bridge adapter integration
src/fcc/messaging/bus.py-- EventBus sourcesrc/fcc/messaging/ws_protocol.py-- WSProtocol sourcesrc/fcc/messaging/sse.py-- SSEStreamBridge sourcesrc/fcc/messaging/async_adapter.py-- AsyncEventBusAdapter sourcesrc/fcc/protocols/bridge.py-- ProtocolBridge source