Skip to content

Protocol Bridge Patterns

This guide covers the architecture and usage of the FCC Protocol Bridge system, which routes protocol messages (A2A, MCP, and custom protocols) through the central EventBus. It is aimed at developers who need to integrate new protocols or extend existing bridge adapters.


Table of Contents

  1. Architecture Overview
  2. ProtocolBridge Core API
  3. Creating a New Bridge: Step-by-Step
  4. Reference Implementations
  5. Event Flow
  6. Error Handling
  7. Testing Patterns
  8. Configuration Patterns
  9. Related Documentation

Architecture Overview

The FCC Protocol Bridge sits between external protocol transports and the internal EventBus. Every inbound protocol message is routed through a registered handler, and the bridge publishes lifecycle events so that the rest of the system can observe protocol traffic.

External System          ProtocolBridge          EventBus
    |                         |                      |
    |-- protocol message ---->|                      |
    |                         |-- entry event ------>|
    |                         |                      |
    |                         |-- handler(msg) ----->|
    |                         |<-- result -----------|
    |                         |                      |
    |<-- response ------------|                      |

The bridge is implemented in src/fcc/protocols/bridge.py as the ProtocolBridge class.

Key principles:

  • Handler isolation -- each protocol registers a single handler callable. The bridge never parses protocol-specific payloads; it delegates entirely.
  • Event-first observability -- every routed message produces at least one EventBus event (PROTOCOL_A2A_TASK_RECEIVED, PROTOCOL_MCP_TOOL_CALLED, or PROTOCOL_BRIDGE_ERROR).
  • Fail-safe dispatch -- handler exceptions are caught and converted to error responses; the caller always gets a dict back.

ProtocolBridge Core API

Constructor

from fcc.messaging.bus import EventBus
from fcc.protocols.bridge import ProtocolBridge

bus = EventBus()
bridge = ProtocolBridge(event_bus=bus)

Route Registration

bridge.register_route("my_protocol", handler)

Where handler has the signature:

Callable[[dict[str, Any]], dict[str, Any]]

Every handler receives a single message dict and must return a response dict.

Route Inspection

bridge.has_route("a2a")        # True / False
bridge.list_routes()           # ["a2a", "mcp", ...]

Message Routing

result = bridge.route("a2a", {"task_id": "t1", "skill": "review"})
# result: {"status": "accepted", "protocol": "a2a", "message_id": "t1"}

Factory

bridge = ProtocolBridge.create_default(bus)

create_default pre-registers stub handlers for "a2a" and "mcp" that return simple acknowledgement dicts. Replace them with production handlers via register_route.


Creating a New Bridge: Step-by-Step

Follow these four steps to integrate a custom protocol.

Step 1: Define the handler signature

Every handler must accept a dict[str, Any] and return a dict[str, Any]. The bridge does not impose any structure on the payload; that is the handler's responsibility.

from typing import Any


def my_protocol_handler(message: dict[str, Any]) -> dict[str, Any]:
    """Handle an incoming my_protocol message."""
    command = message.get("command", "")
    if command == "ping":
        return {"status": "ok", "reply": "pong"}
    return {"status": "ok", "command": command, "processed": True}

Step 2: Register with the bridge

from fcc.messaging.bus import EventBus
from fcc.protocols.bridge import ProtocolBridge

bus = EventBus()
bridge = ProtocolBridge(event_bus=bus)
bridge.register_route("my_protocol", my_protocol_handler)

Once registered, the bridge will dispatch any message routed to "my_protocol" through your handler.

Step 3: Handle errors

Return an error dict from the handler when the message is malformed or processing fails. The bridge also catches unhandled exceptions and wraps them in a PROTOCOL_BRIDGE_ERROR event, but explicit error returns give callers a cleaner contract.

def my_protocol_handler(message: dict[str, Any]) -> dict[str, Any]:
    if "command" not in message:
        return {"status": "error", "error": "Missing required field: command"}
    try:
        result = _do_work(message)
        return {"status": "ok", "result": result}
    except ValueError as exc:
        return {"status": "error", "error": str(exc)}

Step 4: Publish events

For protocol-level observability, publish domain events from your handler. The bridge already publishes entry events for the built-in "a2a" and "mcp" protocols. For custom protocols, publish events explicitly.

from fcc.messaging.events import Event, EventType


def my_protocol_handler(message: dict[str, Any]) -> dict[str, Any]:
    # Publish a domain event for tracking
    bus.publish(
        Event(
            event_type=EventType.WORKFLOW_STARTED,
            source="my_protocol.handler",
            payload={"command": message.get("command")},
        )
    )
    return {"status": "ok"}

To add a dedicated EventType for your protocol, extend the EventType enum in src/fcc/messaging/events.py.


Reference Implementations

The FCC codebase ships three bridge implementations that demonstrate different integration patterns.

bridge.py -- Cross-Protocol Router

Location: src/fcc/protocols/bridge.py

The central ProtocolBridge class. Manages route registration, message dispatch, and EventBus integration. Serves as the single entry-point for all protocol messages in the FCC framework.

Key pattern: the _publish_entry_event method maps known protocol names ("a2a", "mcp") to specific EventType values.

event_type_map: dict[str, EventType] = {
    "a2a": EventType.PROTOCOL_A2A_TASK_RECEIVED,
    "mcp": EventType.PROTOCOL_MCP_TOOL_CALLED,
}

distiller_bridge.py -- External System Adapter

Location: src/fcc/protocols/distiller_bridge.py

Demonstrates the optional dependency pattern. The distiller_available() sentinel checks whether the distiller_ext package is installed at runtime. When it is not, the adapter returns deterministic mock data.

This module provides three adapters:

Adapter Purpose
NanoCubeQueryAdapter Hierarchical data queries with caching
FabricationBridgeAdapter Synthetic data generation requests
DistillerBridgeAdapter High-level facade combining both

Key pattern: every adapter has both a _query_live / _request_live method (for the real engine) and a _query_mock / _request_mock fallback.

ws_bridge.py -- WebSocket Transport

Location: src/fcc/protocols/ws_bridge.py

Bridges EventBus events to browser clients over WebSocket. Demonstrates the async transport pattern: events are enqueued from synchronous code via put_nowait, then broadcast to connected clients in an asyncio loop.

Key pattern: create_event_bus_subscriber() returns a plain callable that the EventBus can invoke synchronously, yet it enqueues events for async delivery.


Event Flow

The following ASCII diagram shows the full lifecycle of a protocol message flowing through the bridge, EventBus, and downstream subscribers.

 Caller                   ProtocolBridge              EventBus          Subscribers
   |                           |                         |                  |
   |-- route("a2a", msg) ----->|                         |                  |
   |                           |                         |                  |
   |                           |-- publish(              |                  |
   |                           |     A2A_TASK_RECEIVED,  |                  |
   |                           |     payload=msg) ------>|                  |
   |                           |                         |-- notify(sub) -->|
   |                           |                         |<-----------------|
   |                           |                         |                  |
   |                           |-- handler(msg) -------->|                  |
   |                           |<-- result --------------|                  |
   |                           |                         |                  |
   |<-- result ----------------|                         |                  |
   |                           |                         |                  |

 On error:
   |                           |                         |                  |
   |                           |-- publish(              |                  |
   |                           |     BRIDGE_ERROR,       |                  |
   |                           |     payload={error}) -->|                  |
   |                           |                         |-- notify(sub) -->|
   |<-- {"status":"error"} ----|                         |                  |

Error Handling

The bridge handles two categories of errors:

Unknown Protocol

When route() is called with a protocol that has no registered handler, the bridge publishes a PROTOCOL_BRIDGE_ERROR event and returns an error dict.

result = bridge.route("unknown", {"key": "value"})
# result: {"status": "error", "error": "No route registered for protocol 'unknown'"}

Handler Exception

When a registered handler raises an exception, the bridge catches it, publishes a PROTOCOL_BRIDGE_ERROR event with the exception message, and returns an error dict.

def bad_handler(msg: dict) -> dict:
    raise RuntimeError("Something went wrong")

bridge.register_route("bad", bad_handler)
result = bridge.route("bad", {})
# result: {"status": "error", "error": "Something went wrong"}

In both cases the error payload published to the EventBus includes the protocol name, the error message, and the original message dict for diagnostic purposes.


Testing Patterns

Mock EventBus

Create a test EventBus and capture published events:

from fcc.messaging.bus import EventBus
from fcc.messaging.events import EventType
from fcc.protocols.bridge import ProtocolBridge


def test_bridge_publishes_entry_event():
    bus = EventBus()
    captured = []
    bus.subscribe(lambda event: captured.append(event))

    bridge = ProtocolBridge(event_bus=bus)
    bridge.register_route("a2a", lambda msg: {"status": "ok"})

    bridge.route("a2a", {"task_id": "t1"})

    assert len(captured) == 1
    assert captured[0].event_type == EventType.PROTOCOL_A2A_TASK_RECEIVED

Verify Error Publication

def test_bridge_error_for_unknown_protocol():
    bus = EventBus()
    errors = []
    bus.subscribe(lambda e: errors.append(e))

    bridge = ProtocolBridge(event_bus=bus)
    result = bridge.route("nonexistent", {"data": 1})

    assert result["status"] == "error"
    assert len(errors) == 1
    assert errors[0].event_type == EventType.PROTOCOL_BRIDGE_ERROR

Handler Isolation

Test handlers in isolation from the bridge by calling them directly:

def test_handler_directly():
    result = my_protocol_handler({"command": "ping"})
    assert result == {"status": "ok", "reply": "pong"}

Distiller Bridge Mock Mode

The distiller bridge always works in tests because it falls back to mock mode when distiller_ext is not installed:

from fcc.protocols.distiller_bridge import (
    DistillerBridgeAdapter,
    NanoCubeQuery,
)


def test_nanocube_mock():
    bridge = DistillerBridgeAdapter()
    query = NanoCubeQuery(dimensions=("region",))
    result = bridge.query_nanocube(query)
    assert result.total_count == 1
    assert "region" in result.data[0]

Configuration Patterns

Enable/Disable Bridges via Config

Use environment variables or a config dict to selectively enable bridges:

import os

from fcc.messaging.bus import EventBus
from fcc.protocols.bridge import ProtocolBridge


def create_configured_bridge(bus: EventBus) -> ProtocolBridge:
    bridge = ProtocolBridge(event_bus=bus)

    if os.getenv("FCC_ENABLE_A2A", "true").lower() == "true":
        from fcc.protocols.a2a.handler import handle_a2a_task
        bridge.register_route("a2a", handle_a2a_task)

    if os.getenv("FCC_ENABLE_MCP", "true").lower() == "true":
        bridge.register_route("mcp", _mcp_handler)

    if os.getenv("FCC_ENABLE_DISTILLER", "false").lower() == "true":
        from fcc.protocols.distiller_bridge import distiller_available
        if distiller_available():
            bridge.register_route("distiller", _distiller_handler)

    return bridge

Route Replacement

Routes can be replaced at any time by calling register_route with the same protocol name. This is useful for hot-swapping between mock and production handlers during development.

# Start with stub
bridge = ProtocolBridge.create_default(bus)

# Replace with production handler when ready
bridge.register_route("a2a", production_a2a_handler)