WebSocket Architecture¶
This guide describes the three-layer real-time messaging architecture in FCC: the synchronous EventBus, the WebSocket protocol layer, and the async transport bridges. It covers connection lifecycle management, subscription filtering, SSE fallback, and performance considerations.
Table of Contents¶
- Three-Layer Messaging Model
- WSProtocol: Message Types and Handling
- WSMessage and WSSubscription Dataclasses
- AsyncEventBusAdapter
- Connection Lifecycle
- SSEStreamBridge: HTTP-Only Fallback
- Event Stream Configuration
- Error Handling and Reconnection
- Performance
- Related Documentation
Three-Layer Messaging Model¶
FCC real-time messaging is organized in three layers, each with a clear responsibility:
Layer 3: Transport WebSocketBridge / SSEStreamBridge
(network I/O, client management)
|
Layer 2: Protocol WSProtocol
(message parsing, subscription matching,
request/response routing)
|
Layer 1: Event System EventBus / AsyncEventBusAdapter
(in-process pub/sub, 81 event types,
thread-safe, filtering)
Module locations¶
| Layer | Module | Key Classes |
|---|---|---|
| Transport | src/fcc/protocols/ws_bridge.py |
WebSocketBridge |
| Transport | src/fcc/messaging/sse.py |
SSEStreamBridge |
| Protocol | src/fcc/messaging/ws_protocol.py |
WSProtocol, WSMessage, WSSubscription |
| Config | src/fcc/protocols/streaming.py |
EventStreamConfig, StreamFilter |
| Async | src/fcc/messaging/async_adapter.py |
AsyncEventBusAdapter |
| Events | src/fcc/messaging/bus.py |
EventBus |
WSProtocol: Message Types and Handling¶
The WSProtocol class in src/fcc/messaging/ws_protocol.py manages
subscriptions and provides message parsing, routing, and response generation.
Nine Message Types¶
class WSMessageType(Enum):
# Client -> Server
SUBSCRIBE = "subscribe"
UNSUBSCRIBE = "unsubscribe"
REQUEST = "request"
PING = "ping"
# Server -> Client
EVENT = "event"
RESPONSE = "response"
ERROR = "error"
PONG = "pong"
# Bidirectional
ACK = "ack"
Message Handling¶
from fcc.messaging.ws_protocol import WSProtocol
protocol = WSProtocol()
# Handle a ping
response = protocol.handle_message('{"type": "ping"}')
# response.type == WSMessageType.PONG
# Handle a subscribe
response = protocol.handle_message(json.dumps({
"type": "subscribe",
"payload": {
"subscription_id": "sub-001",
"event_types": ["workflow.started", "workflow.completed"],
}
}))
# response.type == WSMessageType.ACK
# response.payload == {"subscription_id": "sub-001", "status": "subscribed"}
# Handle an unsubscribe
response = protocol.handle_message(json.dumps({
"type": "unsubscribe",
"payload": {"subscription_id": "sub-001"}
}))
# response.type == WSMessageType.ACK
Request Actions¶
The protocol supports two built-in request actions:
# List active subscriptions
response = protocol.handle_message(json.dumps({
"type": "request",
"payload": {"action": "list_subscriptions"}
}))
# response.payload == {"subscriptions": [...]}
# Get subscription count
response = protocol.handle_message(json.dumps({
"type": "request",
"payload": {"action": "subscription_count"}
}))
# response.payload == {"count": 0}
Unknown actions return an ERROR response.
Event Delivery Filtering¶
# Check which subscriptions should receive an event
matching_ids = protocol.should_deliver({
"event_type": "workflow.started",
"source": "simulation.engine",
})
# Wrap an event for delivery
ws_message = protocol.create_event_message(event_data)
WSMessage and WSSubscription Dataclasses¶
WSMessage¶
A frozen dataclass representing a single WebSocket message.
from fcc.messaging.ws_protocol import WSMessage, WSMessageType
msg = WSMessage(
type=WSMessageType.EVENT,
payload={"event_type": "workflow.started", "data": {"step": 1}},
message_id="msg-uuid-here",
timestamp="2026-03-29T12:00:00+00:00",
correlation_id="corr-uuid",
)
| Field | Type | Description |
|---|---|---|
type |
WSMessageType |
One of the 9 message types |
payload |
dict[str, Any] |
Arbitrary message data |
message_id |
str |
Auto-generated UUID |
timestamp |
str |
ISO 8601 UTC timestamp |
correlation_id |
str or None |
Links request/response pairs |
Serialization:
json_str = msg.to_json()
restored = WSMessage.from_json(json_str)
msg_dict = msg.to_dict()
restored = WSMessage.from_dict(msg_dict)
WSSubscription¶
A frozen dataclass representing a client's event subscription with filtering.
from fcc.messaging.ws_protocol import WSSubscription
sub = WSSubscription(
subscription_id="sub-001",
event_types=("workflow.started", "workflow.completed"),
sources=("simulation.engine",),
correlation_id=None,
)
| Field | Type | Description |
|---|---|---|
subscription_id |
str |
Unique subscription ID |
event_types |
tuple[str, ...] |
Filter by event type (empty = all) |
sources |
tuple[str, ...] |
Filter by source (empty = all) |
correlation_id |
str or None |
Filter by correlation ID |
Matching:
event_data = {"event_type": "workflow.started", "source": "simulation.engine"}
assert sub.matches(event_data) is True
event_data = {"event_type": "persona.loaded", "source": "registry"}
assert sub.matches(event_data) is False
AsyncEventBusAdapter¶
The AsyncEventBusAdapter in src/fcc/messaging/async_adapter.py wraps the
synchronous EventBus for asyncio usage. It uses asyncio.to_thread() to
avoid blocking the event loop.
Basic Usage¶
import asyncio
from fcc.messaging.bus import EventBus
from fcc.messaging.async_adapter import AsyncEventBusAdapter
from fcc.messaging.events import Event, EventType
bus = EventBus()
adapter = AsyncEventBusAdapter(bus)
async def main():
# Publish asynchronously (delegates to thread)
count = await adapter.publish(
Event(event_type=EventType.WORKFLOW_STARTED, source="demo")
)
# Register an async subscriber
async def on_event(event: Event) -> None:
print(f"Received: {event.event_type}")
adapter.subscribe_async(on_event)
Queue-Based Forwarding¶
The adapter can forward all synchronous EventBus events to an async queue:
async def consumer():
await adapter.start_forwarding()
while True:
event = await adapter.get_event(timeout=5.0)
if event is None:
break
print(f"Forwarded event: {event.event_type}")
adapter.stop_forwarding()
Properties¶
| Property | Type | Description |
|---|---|---|
bus |
EventBus |
The underlying synchronous bus |
queue_size |
int |
Events waiting in the async queue |
async_subscriber_count |
int |
Registered async subscribers |
running |
bool |
Whether forwarding is active |
Connection Lifecycle¶
The WebSocketBridge in src/fcc/protocols/ws_bridge.py manages the full
WebSocket connection lifecycle.
Browser Client WebSocketBridge EventBus
| | |
|-- connect ------------->| |
| |-- register(ws) --------> |
| | |
| (subscribe via | |
| WSProtocol) | |
| | |
| |<-- event published -------|
| |-- enqueue_event() ------> |
| | |
|<-- broadcast(json) -----| |
| | |
|-- disconnect ---------->| |
| |-- unregister(ws) -------> |
Starting the Bridge¶
from fcc.protocols.ws_bridge import WebSocketBridge
bridge = WebSocketBridge(host="localhost", port=8765)
# Wire to EventBus
subscriber = bridge.create_event_bus_subscriber()
bus.subscribe(subscriber)
# Start serving (async)
await bridge.start()
EventBus Integration¶
The create_event_bus_subscriber() method returns a synchronous callable
suitable for EventBus.subscribe(). It serializes each event to a dict and
enqueues it for async broadcast.
def subscriber(event: Event) -> None:
if hasattr(event, "to_dict"):
bridge.enqueue_event(event.to_dict())
elif isinstance(event, dict):
bridge.enqueue_event(event)
Properties¶
| Property | Type | Description |
|---|---|---|
host |
str |
Bound hostname |
port |
int |
Bound port |
client_count |
int |
Connected clients |
is_running |
bool |
Server status |
SSEStreamBridge: HTTP-Only Fallback¶
For environments where WebSocket is unavailable (proxies, firewalls), the
SSEStreamBridge in src/fcc/messaging/sse.py provides a Server-Sent Events
alternative.
SSE Message Format¶
Each event is formatted per the SSE specification:
event: workflow.started
data: {"event_id": "...", "event_type": "workflow.started", ...}
id: evt-uuid
Usage¶
from fcc.messaging.sse import SSEStreamBridge
sse = SSEStreamBridge(max_buffer=1000)
# Wire to EventBus
bus.subscribe(sse.create_subscriber())
# Later, stream buffered events to an HTTP response
for chunk in sse.stream():
response.write(chunk)
HTTP Headers¶
headers = sse.headers()
# {
# "Content-Type": "text/event-stream",
# "Cache-Control": "no-cache",
# "Connection": "keep-alive",
# "X-Accel-Buffering": "no",
# }
Properties¶
| Property | Type | Description |
|---|---|---|
buffer_size |
int |
Current buffered messages |
event_count |
int |
Total events ever added |
content_type |
str |
"text/event-stream" |
Utility Functions¶
from fcc.messaging.sse import format_sse_event, parse_sse_line
# Format a single event
sse_str = format_sse_event(event, event_name="custom.name")
# Parse a single SSE line
parsed = parse_sse_line("event: workflow.started")
# {"event": "workflow.started"}
Event Stream Configuration¶
The EventStreamConfig and StreamFilter dataclasses in
src/fcc/protocols/streaming.py provide configuration for event streams.
StreamFilter¶
from fcc.protocols.streaming import StreamFilter
# Match only workflow events from the simulation engine
f = StreamFilter(
event_types=("workflow.started", "workflow.completed"),
sources=("simulation.engine",),
)
assert f.matches({"event_type": "workflow.started", "source": "simulation.engine"})
assert not f.matches({"event_type": "persona.loaded", "source": "registry"})
An empty StreamFilter() matches every event.
EventStreamConfig¶
from fcc.protocols.streaming import EventStreamConfig
config = EventStreamConfig(
stream_id="stream-001",
format="websocket", # or "sse"
filter=StreamFilter(event_types=("workflow.started",)),
buffer_size=500,
heartbeat_interval_seconds=15.0,
reconnect_delay_seconds=2.0,
)
Both classes support to_dict() / from_dict() round-trip serialization.
Error Handling and Reconnection¶
WebSocket Errors¶
The WebSocketBridge handles transport errors gracefully:
- Failed sends: When
broadcast()fails to send to a client, the client is automatically unregistered. The error is logged but does not propagate. - Queue overflow: When the event queue is full,
enqueue_event()drops the event and logs a warning. This prevents memory exhaustion under burst load. - Connection closure: The
_handlermethod catches all exceptions during the connection lifecycle and callsunregister()in afinallyblock.
Client Reconnection¶
Clients should implement exponential backoff when reconnecting. The
EventStreamConfig.reconnect_delay_seconds field provides a server-
recommended starting delay.
// Client-side reconnection (JavaScript)
let delay = config.reconnect_delay_seconds * 1000;
const maxDelay = 30000;
function reconnect() {
setTimeout(() => {
const ws = new WebSocket(url);
ws.onclose = () => {
delay = Math.min(delay * 2, maxDelay);
reconnect();
};
ws.onopen = () => {
delay = config.reconnect_delay_seconds * 1000;
};
}, delay);
}
SSE Reconnection¶
SSE has built-in reconnection via the id: field. Browsers automatically
reconnect and send the Last-Event-ID header. The SSEStreamBridge includes
the event ID in every formatted message.
Performance¶
Message Batching¶
For high-throughput scenarios, batch events before broadcasting:
# Collect events over a time window
batch: list[dict] = []
batch_interval = 0.1 # seconds
async def batch_processor():
while True:
await asyncio.sleep(batch_interval)
if batch:
message = json.dumps({"type": "batch", "events": batch.copy()})
await bridge.broadcast(message)
batch.clear()
Selective Forwarding¶
Use StreamFilter to reduce traffic to each client:
from fcc.protocols.streaming import StreamFilter
# Only forward workflow events
workflow_filter = StreamFilter(
event_types=("workflow.started", "workflow.completed", "workflow.step"),
)
def filtered_subscriber(event):
event_data = event.to_dict()
if workflow_filter.matches(event_data):
bridge.enqueue_event(event_data)
WSProtocol Subscription Filtering¶
The WSProtocol.should_deliver() method efficiently checks which
subscriptions match an event, avoiding unnecessary serialization and
transmission.
# Server-side: only send to interested subscriptions
matching = protocol.should_deliver(event_data)
if matching:
ws_msg = protocol.create_event_message(event_data)
for sub_id in matching:
# Route to the specific client's WebSocket
await send_to_subscriber(sub_id, ws_msg.to_json())
Buffer Management¶
Both SSEStreamBridge and EventStreamConfig support configurable buffer
sizes. Choose buffer sizes based on expected event rates:
| Scenario | Recommended Buffer |
|---|---|
| Development / low traffic | 100-500 |
| Production / moderate traffic | 1,000 |
| High-throughput simulation | 5,000-10,000 |
Related Documentation¶
- Protocol Bridge Patterns -- routing protocol messages
- Extending A2A Agent Cards -- persona-to-card mapping
- Custom MCP Tools -- MCP tool and resource definitions
- D3 Visualization Patterns -- consuming events in the frontend
- Understanding the FCC Ecosystem -- ecosystem overview