Skip to content

WebSocket Architecture

This guide describes the three-layer real-time messaging architecture in FCC: the synchronous EventBus, the WebSocket protocol layer, and the async transport bridges. It covers connection lifecycle management, subscription filtering, SSE fallback, and performance considerations.


Table of Contents

  1. Three-Layer Messaging Model
  2. WSProtocol: Message Types and Handling
  3. WSMessage and WSSubscription Dataclasses
  4. AsyncEventBusAdapter
  5. Connection Lifecycle
  6. SSEStreamBridge: HTTP-Only Fallback
  7. Event Stream Configuration
  8. Error Handling and Reconnection
  9. Performance
  10. Related Documentation

Three-Layer Messaging Model

FCC real-time messaging is organized in three layers, each with a clear responsibility:

Layer 3: Transport        WebSocketBridge / SSEStreamBridge
                          (network I/O, client management)
                                     |
Layer 2: Protocol         WSProtocol
                          (message parsing, subscription matching,
                           request/response routing)
                                     |
Layer 1: Event System     EventBus / AsyncEventBusAdapter
                          (in-process pub/sub, 81 event types,
                           thread-safe, filtering)

Module locations

Layer Module Key Classes
Transport src/fcc/protocols/ws_bridge.py WebSocketBridge
Transport src/fcc/messaging/sse.py SSEStreamBridge
Protocol src/fcc/messaging/ws_protocol.py WSProtocol, WSMessage, WSSubscription
Config src/fcc/protocols/streaming.py EventStreamConfig, StreamFilter
Async src/fcc/messaging/async_adapter.py AsyncEventBusAdapter
Events src/fcc/messaging/bus.py EventBus

WSProtocol: Message Types and Handling

The WSProtocol class in src/fcc/messaging/ws_protocol.py manages subscriptions and provides message parsing, routing, and response generation.

Nine Message Types

class WSMessageType(Enum):
    # Client -> Server
    SUBSCRIBE = "subscribe"
    UNSUBSCRIBE = "unsubscribe"
    REQUEST = "request"
    PING = "ping"

    # Server -> Client
    EVENT = "event"
    RESPONSE = "response"
    ERROR = "error"
    PONG = "pong"

    # Bidirectional
    ACK = "ack"

Message Handling

from fcc.messaging.ws_protocol import WSProtocol

protocol = WSProtocol()

# Handle a ping
response = protocol.handle_message('{"type": "ping"}')
# response.type == WSMessageType.PONG

# Handle a subscribe
response = protocol.handle_message(json.dumps({
    "type": "subscribe",
    "payload": {
        "subscription_id": "sub-001",
        "event_types": ["workflow.started", "workflow.completed"],
    }
}))
# response.type == WSMessageType.ACK
# response.payload == {"subscription_id": "sub-001", "status": "subscribed"}

# Handle an unsubscribe
response = protocol.handle_message(json.dumps({
    "type": "unsubscribe",
    "payload": {"subscription_id": "sub-001"}
}))
# response.type == WSMessageType.ACK

Request Actions

The protocol supports two built-in request actions:

# List active subscriptions
response = protocol.handle_message(json.dumps({
    "type": "request",
    "payload": {"action": "list_subscriptions"}
}))
# response.payload == {"subscriptions": [...]}

# Get subscription count
response = protocol.handle_message(json.dumps({
    "type": "request",
    "payload": {"action": "subscription_count"}
}))
# response.payload == {"count": 0}

Unknown actions return an ERROR response.

Event Delivery Filtering

# Check which subscriptions should receive an event
matching_ids = protocol.should_deliver({
    "event_type": "workflow.started",
    "source": "simulation.engine",
})

# Wrap an event for delivery
ws_message = protocol.create_event_message(event_data)

WSMessage and WSSubscription Dataclasses

WSMessage

A frozen dataclass representing a single WebSocket message.

from fcc.messaging.ws_protocol import WSMessage, WSMessageType

msg = WSMessage(
    type=WSMessageType.EVENT,
    payload={"event_type": "workflow.started", "data": {"step": 1}},
    message_id="msg-uuid-here",
    timestamp="2026-03-29T12:00:00+00:00",
    correlation_id="corr-uuid",
)
Field Type Description
type WSMessageType One of the 9 message types
payload dict[str, Any] Arbitrary message data
message_id str Auto-generated UUID
timestamp str ISO 8601 UTC timestamp
correlation_id str or None Links request/response pairs

Serialization:

json_str = msg.to_json()
restored = WSMessage.from_json(json_str)

msg_dict = msg.to_dict()
restored = WSMessage.from_dict(msg_dict)

WSSubscription

A frozen dataclass representing a client's event subscription with filtering.

from fcc.messaging.ws_protocol import WSSubscription

sub = WSSubscription(
    subscription_id="sub-001",
    event_types=("workflow.started", "workflow.completed"),
    sources=("simulation.engine",),
    correlation_id=None,
)
Field Type Description
subscription_id str Unique subscription ID
event_types tuple[str, ...] Filter by event type (empty = all)
sources tuple[str, ...] Filter by source (empty = all)
correlation_id str or None Filter by correlation ID

Matching:

event_data = {"event_type": "workflow.started", "source": "simulation.engine"}
assert sub.matches(event_data) is True

event_data = {"event_type": "persona.loaded", "source": "registry"}
assert sub.matches(event_data) is False

AsyncEventBusAdapter

The AsyncEventBusAdapter in src/fcc/messaging/async_adapter.py wraps the synchronous EventBus for asyncio usage. It uses asyncio.to_thread() to avoid blocking the event loop.

Basic Usage

import asyncio
from fcc.messaging.bus import EventBus
from fcc.messaging.async_adapter import AsyncEventBusAdapter
from fcc.messaging.events import Event, EventType

bus = EventBus()
adapter = AsyncEventBusAdapter(bus)


async def main():
    # Publish asynchronously (delegates to thread)
    count = await adapter.publish(
        Event(event_type=EventType.WORKFLOW_STARTED, source="demo")
    )

    # Register an async subscriber
    async def on_event(event: Event) -> None:
        print(f"Received: {event.event_type}")

    adapter.subscribe_async(on_event)

Queue-Based Forwarding

The adapter can forward all synchronous EventBus events to an async queue:

async def consumer():
    await adapter.start_forwarding()

    while True:
        event = await adapter.get_event(timeout=5.0)
        if event is None:
            break
        print(f"Forwarded event: {event.event_type}")

    adapter.stop_forwarding()

Properties

Property Type Description
bus EventBus The underlying synchronous bus
queue_size int Events waiting in the async queue
async_subscriber_count int Registered async subscribers
running bool Whether forwarding is active

Connection Lifecycle

The WebSocketBridge in src/fcc/protocols/ws_bridge.py manages the full WebSocket connection lifecycle.

Browser Client           WebSocketBridge              EventBus
    |                         |                           |
    |-- connect ------------->|                           |
    |                         |-- register(ws) -------->  |
    |                         |                           |
    |   (subscribe via        |                           |
    |    WSProtocol)          |                           |
    |                         |                           |
    |                         |<-- event published -------|
    |                         |-- enqueue_event() ------> |
    |                         |                           |
    |<-- broadcast(json) -----|                           |
    |                         |                           |
    |-- disconnect ---------->|                           |
    |                         |-- unregister(ws) ------->  |

Starting the Bridge

from fcc.protocols.ws_bridge import WebSocketBridge

bridge = WebSocketBridge(host="localhost", port=8765)

# Wire to EventBus
subscriber = bridge.create_event_bus_subscriber()
bus.subscribe(subscriber)

# Start serving (async)
await bridge.start()

EventBus Integration

The create_event_bus_subscriber() method returns a synchronous callable suitable for EventBus.subscribe(). It serializes each event to a dict and enqueues it for async broadcast.

def subscriber(event: Event) -> None:
    if hasattr(event, "to_dict"):
        bridge.enqueue_event(event.to_dict())
    elif isinstance(event, dict):
        bridge.enqueue_event(event)

Properties

Property Type Description
host str Bound hostname
port int Bound port
client_count int Connected clients
is_running bool Server status

SSEStreamBridge: HTTP-Only Fallback

For environments where WebSocket is unavailable (proxies, firewalls), the SSEStreamBridge in src/fcc/messaging/sse.py provides a Server-Sent Events alternative.

SSE Message Format

Each event is formatted per the SSE specification:

event: workflow.started
data: {"event_id": "...", "event_type": "workflow.started", ...}
id: evt-uuid

Usage

from fcc.messaging.sse import SSEStreamBridge

sse = SSEStreamBridge(max_buffer=1000)

# Wire to EventBus
bus.subscribe(sse.create_subscriber())

# Later, stream buffered events to an HTTP response
for chunk in sse.stream():
    response.write(chunk)

HTTP Headers

headers = sse.headers()
# {
#   "Content-Type": "text/event-stream",
#   "Cache-Control": "no-cache",
#   "Connection": "keep-alive",
#   "X-Accel-Buffering": "no",
# }

Properties

Property Type Description
buffer_size int Current buffered messages
event_count int Total events ever added
content_type str "text/event-stream"

Utility Functions

from fcc.messaging.sse import format_sse_event, parse_sse_line

# Format a single event
sse_str = format_sse_event(event, event_name="custom.name")

# Parse a single SSE line
parsed = parse_sse_line("event: workflow.started")
# {"event": "workflow.started"}

Event Stream Configuration

The EventStreamConfig and StreamFilter dataclasses in src/fcc/protocols/streaming.py provide configuration for event streams.

StreamFilter

from fcc.protocols.streaming import StreamFilter

# Match only workflow events from the simulation engine
f = StreamFilter(
    event_types=("workflow.started", "workflow.completed"),
    sources=("simulation.engine",),
)

assert f.matches({"event_type": "workflow.started", "source": "simulation.engine"})
assert not f.matches({"event_type": "persona.loaded", "source": "registry"})

An empty StreamFilter() matches every event.

EventStreamConfig

from fcc.protocols.streaming import EventStreamConfig

config = EventStreamConfig(
    stream_id="stream-001",
    format="websocket",          # or "sse"
    filter=StreamFilter(event_types=("workflow.started",)),
    buffer_size=500,
    heartbeat_interval_seconds=15.0,
    reconnect_delay_seconds=2.0,
)

Both classes support to_dict() / from_dict() round-trip serialization.


Error Handling and Reconnection

WebSocket Errors

The WebSocketBridge handles transport errors gracefully:

  • Failed sends: When broadcast() fails to send to a client, the client is automatically unregistered. The error is logged but does not propagate.
  • Queue overflow: When the event queue is full, enqueue_event() drops the event and logs a warning. This prevents memory exhaustion under burst load.
  • Connection closure: The _handler method catches all exceptions during the connection lifecycle and calls unregister() in a finally block.

Client Reconnection

Clients should implement exponential backoff when reconnecting. The EventStreamConfig.reconnect_delay_seconds field provides a server- recommended starting delay.

// Client-side reconnection (JavaScript)
let delay = config.reconnect_delay_seconds * 1000;
const maxDelay = 30000;

function reconnect() {
    setTimeout(() => {
        const ws = new WebSocket(url);
        ws.onclose = () => {
            delay = Math.min(delay * 2, maxDelay);
            reconnect();
        };
        ws.onopen = () => {
            delay = config.reconnect_delay_seconds * 1000;
        };
    }, delay);
}

SSE Reconnection

SSE has built-in reconnection via the id: field. Browsers automatically reconnect and send the Last-Event-ID header. The SSEStreamBridge includes the event ID in every formatted message.


Performance

Message Batching

For high-throughput scenarios, batch events before broadcasting:

# Collect events over a time window
batch: list[dict] = []
batch_interval = 0.1  # seconds

async def batch_processor():
    while True:
        await asyncio.sleep(batch_interval)
        if batch:
            message = json.dumps({"type": "batch", "events": batch.copy()})
            await bridge.broadcast(message)
            batch.clear()

Selective Forwarding

Use StreamFilter to reduce traffic to each client:

from fcc.protocols.streaming import StreamFilter

# Only forward workflow events
workflow_filter = StreamFilter(
    event_types=("workflow.started", "workflow.completed", "workflow.step"),
)

def filtered_subscriber(event):
    event_data = event.to_dict()
    if workflow_filter.matches(event_data):
        bridge.enqueue_event(event_data)

WSProtocol Subscription Filtering

The WSProtocol.should_deliver() method efficiently checks which subscriptions match an event, avoiding unnecessary serialization and transmission.

# Server-side: only send to interested subscriptions
matching = protocol.should_deliver(event_data)
if matching:
    ws_msg = protocol.create_event_message(event_data)
    for sub_id in matching:
        # Route to the specific client's WebSocket
        await send_to_subscriber(sub_id, ws_msg.to_json())

Buffer Management

Both SSEStreamBridge and EventStreamConfig support configurable buffer sizes. Choose buffer sizes based on expected event rates:

Scenario Recommended Buffer
Development / low traffic 100-500
Production / moderate traffic 1,000
High-throughput simulation 5,000-10,000