Protocol Bridge Patterns¶
This guide covers the architecture and usage of the FCC Protocol Bridge system, which routes protocol messages (A2A, MCP, and custom protocols) through the central EventBus. It is aimed at developers who need to integrate new protocols or extend existing bridge adapters.
Table of Contents¶
- Architecture Overview
- ProtocolBridge Core API
- Creating a New Bridge: Step-by-Step
- Reference Implementations
- Event Flow
- Error Handling
- Testing Patterns
- Configuration Patterns
- Related Documentation
Architecture Overview¶
The FCC Protocol Bridge sits between external protocol transports and the internal EventBus. Every inbound protocol message is routed through a registered handler, and the bridge publishes lifecycle events so that the rest of the system can observe protocol traffic.
External System ProtocolBridge EventBus
| | |
|-- protocol message ---->| |
| |-- entry event ------>|
| | |
| |-- handler(msg) ----->|
| |<-- result -----------|
| | |
|<-- response ------------| |
The bridge is implemented in src/fcc/protocols/bridge.py as the
ProtocolBridge class.
Key principles:
- Handler isolation -- each protocol registers a single handler callable. The bridge never parses protocol-specific payloads; it delegates entirely.
- Event-first observability -- every routed message produces at least one
EventBus event (
PROTOCOL_A2A_TASK_RECEIVED,PROTOCOL_MCP_TOOL_CALLED, orPROTOCOL_BRIDGE_ERROR). - Fail-safe dispatch -- handler exceptions are caught and converted to error responses; the caller always gets a dict back.
ProtocolBridge Core API¶
Constructor¶
from fcc.messaging.bus import EventBus
from fcc.protocols.bridge import ProtocolBridge
bus = EventBus()
bridge = ProtocolBridge(event_bus=bus)
Route Registration¶
Where handler has the signature:
Every handler receives a single message dict and must return a response dict.
Route Inspection¶
Message Routing¶
result = bridge.route("a2a", {"task_id": "t1", "skill": "review"})
# result: {"status": "accepted", "protocol": "a2a", "message_id": "t1"}
Factory¶
create_default pre-registers stub handlers for "a2a" and "mcp" that
return simple acknowledgement dicts. Replace them with production handlers
via register_route.
Creating a New Bridge: Step-by-Step¶
Follow these four steps to integrate a custom protocol.
Step 1: Define the handler signature¶
Every handler must accept a dict[str, Any] and return a dict[str, Any].
The bridge does not impose any structure on the payload; that is the handler's
responsibility.
from typing import Any
def my_protocol_handler(message: dict[str, Any]) -> dict[str, Any]:
"""Handle an incoming my_protocol message."""
command = message.get("command", "")
if command == "ping":
return {"status": "ok", "reply": "pong"}
return {"status": "ok", "command": command, "processed": True}
Step 2: Register with the bridge¶
from fcc.messaging.bus import EventBus
from fcc.protocols.bridge import ProtocolBridge
bus = EventBus()
bridge = ProtocolBridge(event_bus=bus)
bridge.register_route("my_protocol", my_protocol_handler)
Once registered, the bridge will dispatch any message routed to
"my_protocol" through your handler.
Step 3: Handle errors¶
Return an error dict from the handler when the message is malformed or
processing fails. The bridge also catches unhandled exceptions and wraps
them in a PROTOCOL_BRIDGE_ERROR event, but explicit error returns give
callers a cleaner contract.
def my_protocol_handler(message: dict[str, Any]) -> dict[str, Any]:
if "command" not in message:
return {"status": "error", "error": "Missing required field: command"}
try:
result = _do_work(message)
return {"status": "ok", "result": result}
except ValueError as exc:
return {"status": "error", "error": str(exc)}
Step 4: Publish events¶
For protocol-level observability, publish domain events from your handler.
The bridge already publishes entry events for the built-in "a2a" and "mcp"
protocols. For custom protocols, publish events explicitly.
from fcc.messaging.events import Event, EventType
def my_protocol_handler(message: dict[str, Any]) -> dict[str, Any]:
# Publish a domain event for tracking
bus.publish(
Event(
event_type=EventType.WORKFLOW_STARTED,
source="my_protocol.handler",
payload={"command": message.get("command")},
)
)
return {"status": "ok"}
To add a dedicated EventType for your protocol, extend the EventType enum
in src/fcc/messaging/events.py.
Reference Implementations¶
The FCC codebase ships three bridge implementations that demonstrate different integration patterns.
bridge.py -- Cross-Protocol Router¶
Location: src/fcc/protocols/bridge.py
The central ProtocolBridge class. Manages route registration, message
dispatch, and EventBus integration. Serves as the single entry-point for
all protocol messages in the FCC framework.
Key pattern: the _publish_entry_event method maps known protocol names
("a2a", "mcp") to specific EventType values.
event_type_map: dict[str, EventType] = {
"a2a": EventType.PROTOCOL_A2A_TASK_RECEIVED,
"mcp": EventType.PROTOCOL_MCP_TOOL_CALLED,
}
distiller_bridge.py -- External System Adapter¶
Location: src/fcc/protocols/distiller_bridge.py
Demonstrates the optional dependency pattern. The distiller_available()
sentinel checks whether the distiller_ext package is installed at runtime.
When it is not, the adapter returns deterministic mock data.
This module provides three adapters:
| Adapter | Purpose |
|---|---|
NanoCubeQueryAdapter |
Hierarchical data queries with caching |
FabricationBridgeAdapter |
Synthetic data generation requests |
DistillerBridgeAdapter |
High-level facade combining both |
Key pattern: every adapter has both a _query_live / _request_live method
(for the real engine) and a _query_mock / _request_mock fallback.
ws_bridge.py -- WebSocket Transport¶
Location: src/fcc/protocols/ws_bridge.py
Bridges EventBus events to browser clients over WebSocket. Demonstrates
the async transport pattern: events are enqueued from synchronous code
via put_nowait, then broadcast to connected clients in an asyncio loop.
Key pattern: create_event_bus_subscriber() returns a plain callable that
the EventBus can invoke synchronously, yet it enqueues events for async
delivery.
Event Flow¶
The following ASCII diagram shows the full lifecycle of a protocol message flowing through the bridge, EventBus, and downstream subscribers.
Caller ProtocolBridge EventBus Subscribers
| | | |
|-- route("a2a", msg) ----->| | |
| | | |
| |-- publish( | |
| | A2A_TASK_RECEIVED, | |
| | payload=msg) ------>| |
| | |-- notify(sub) -->|
| | |<-----------------|
| | | |
| |-- handler(msg) -------->| |
| |<-- result --------------| |
| | | |
|<-- result ----------------| | |
| | | |
On error:
| | | |
| |-- publish( | |
| | BRIDGE_ERROR, | |
| | payload={error}) -->| |
| | |-- notify(sub) -->|
|<-- {"status":"error"} ----| | |
Error Handling¶
The bridge handles two categories of errors:
Unknown Protocol¶
When route() is called with a protocol that has no registered handler, the
bridge publishes a PROTOCOL_BRIDGE_ERROR event and returns an error dict.
result = bridge.route("unknown", {"key": "value"})
# result: {"status": "error", "error": "No route registered for protocol 'unknown'"}
Handler Exception¶
When a registered handler raises an exception, the bridge catches it, publishes
a PROTOCOL_BRIDGE_ERROR event with the exception message, and returns an
error dict.
def bad_handler(msg: dict) -> dict:
raise RuntimeError("Something went wrong")
bridge.register_route("bad", bad_handler)
result = bridge.route("bad", {})
# result: {"status": "error", "error": "Something went wrong"}
In both cases the error payload published to the EventBus includes the protocol name, the error message, and the original message dict for diagnostic purposes.
Testing Patterns¶
Mock EventBus¶
Create a test EventBus and capture published events:
from fcc.messaging.bus import EventBus
from fcc.messaging.events import EventType
from fcc.protocols.bridge import ProtocolBridge
def test_bridge_publishes_entry_event():
bus = EventBus()
captured = []
bus.subscribe(lambda event: captured.append(event))
bridge = ProtocolBridge(event_bus=bus)
bridge.register_route("a2a", lambda msg: {"status": "ok"})
bridge.route("a2a", {"task_id": "t1"})
assert len(captured) == 1
assert captured[0].event_type == EventType.PROTOCOL_A2A_TASK_RECEIVED
Verify Error Publication¶
def test_bridge_error_for_unknown_protocol():
bus = EventBus()
errors = []
bus.subscribe(lambda e: errors.append(e))
bridge = ProtocolBridge(event_bus=bus)
result = bridge.route("nonexistent", {"data": 1})
assert result["status"] == "error"
assert len(errors) == 1
assert errors[0].event_type == EventType.PROTOCOL_BRIDGE_ERROR
Handler Isolation¶
Test handlers in isolation from the bridge by calling them directly:
def test_handler_directly():
result = my_protocol_handler({"command": "ping"})
assert result == {"status": "ok", "reply": "pong"}
Distiller Bridge Mock Mode¶
The distiller bridge always works in tests because it falls back to mock
mode when distiller_ext is not installed:
from fcc.protocols.distiller_bridge import (
DistillerBridgeAdapter,
NanoCubeQuery,
)
def test_nanocube_mock():
bridge = DistillerBridgeAdapter()
query = NanoCubeQuery(dimensions=("region",))
result = bridge.query_nanocube(query)
assert result.total_count == 1
assert "region" in result.data[0]
Configuration Patterns¶
Enable/Disable Bridges via Config¶
Use environment variables or a config dict to selectively enable bridges:
import os
from fcc.messaging.bus import EventBus
from fcc.protocols.bridge import ProtocolBridge
def create_configured_bridge(bus: EventBus) -> ProtocolBridge:
bridge = ProtocolBridge(event_bus=bus)
if os.getenv("FCC_ENABLE_A2A", "true").lower() == "true":
from fcc.protocols.a2a.handler import handle_a2a_task
bridge.register_route("a2a", handle_a2a_task)
if os.getenv("FCC_ENABLE_MCP", "true").lower() == "true":
bridge.register_route("mcp", _mcp_handler)
if os.getenv("FCC_ENABLE_DISTILLER", "false").lower() == "true":
from fcc.protocols.distiller_bridge import distiller_available
if distiller_available():
bridge.register_route("distiller", _distiller_handler)
return bridge
Route Replacement¶
Routes can be replaced at any time by calling register_route with the same
protocol name. This is useful for hot-swapping between mock and production
handlers during development.
# Start with stub
bridge = ProtocolBridge.create_default(bus)
# Replace with production handler when ready
bridge.register_route("a2a", production_a2a_handler)
Related Documentation¶
- Extending A2A Agent Cards -- mapping personas to A2A cards
- Custom MCP Tools -- adding tools and resources to the MCP layer
- WebSocket Architecture -- real-time messaging via WebSocket
- D3 Visualization Patterns -- frontend data visualization
- Understanding the FCC Ecosystem -- project overview