Chapter 23: Advanced Messaging¶
The FCC event bus provides the foundation for asynchronous communication between framework components. This chapter covers advanced messaging patterns including priority queues, dead-letter queues, circuit breakers, and replay strategies.
The sequence diagram below shows how events flow through the priority queue, circuit breaker, and dead-letter queue to either a handler or quarantine, depending on breaker state and handler success.
sequenceDiagram
participant P as Publisher
participant PQ as PriorityQueue
participant EB as EventBus
participant H as Handler
participant CB as CircuitBreaker
participant DLQ as Dead-Letter Queue
P->>PQ: push(event, priority=CRITICAL)
P->>PQ: push(event, priority=NORMAL)
PQ->>EB: pop() delivers CRITICAL first
EB->>CB: check state
alt Circuit CLOSED
CB->>H: deliver(event)
alt Handler succeeds
H-->>EB: ok
else Handler fails
H-->>CB: error
CB->>CB: increment failures
CB->>DLQ: add(event, error)
end
else Circuit OPEN
CB->>DLQ: bypass to DLQ
end
Note over DLQ: Retry later or<br/>inspect forensically
Combining all three is what keeps the bus responsive under partial failure without silently dropping events.
Event Bus Refresher¶
The core EventBus in fcc.messaging.bus provides:
- Thread-safe publish/subscribe.
- 81 event types covering all framework operations.
- Event filtering by type, source, and custom predicates.
- Serialization and replay via
EventSerializerandEventReplay.
from fcc.messaging.bus import EventBus
from fcc.messaging.events import Event, EventType
bus = EventBus()
def handler(event: Event) -> None:
print(f"Received: {event.event_type.value}")
bus.subscribe(EventType.SIMULATION_STARTED, handler)
bus.publish(Event(event_type=EventType.SIMULATION_STARTED))
Priority Queues¶
Not all events are equal. Some require immediate processing (e.g., compliance violations) while others can be deferred (e.g., metrics updates).
Priority Levels¶
| Priority | Name | Use Case |
|---|---|---|
| 0 | CRITICAL | Compliance violations, hard-stop rules |
| 1 | HIGH | Approval gate failures, session errors |
| 2 | NORMAL | Standard workflow events |
| 3 | LOW | Metrics, logging, non-critical updates |
Implementation Pattern¶
import heapq
from dataclasses import dataclass, field
@dataclass(order=True)
class PrioritizedEvent:
priority: int
event: Event = field(compare=False)
class PriorityEventQueue:
def __init__(self):
self._queue: list[PrioritizedEvent] = []
def push(self, event: Event, priority: int = 2) -> None:
heapq.heappush(self._queue, PrioritizedEvent(priority, event))
def pop(self) -> Event:
return heapq.heappop(self._queue).event
def __len__(self) -> int:
return len(self._queue)
Dead-Letter Queues¶
When an event handler fails, the event should not be lost. A dead-letter queue (DLQ) captures failed events for later inspection and replay.
DLQ Pattern¶
class DeadLetterQueue:
def __init__(self, max_retries: int = 3):
self._failed: list[tuple[Event, Exception, int]] = []
self._max_retries = max_retries
def add(self, event: Event, error: Exception, attempt: int) -> None:
self._failed.append((event, error, attempt))
def retry_all(self, bus: EventBus) -> int:
retried = 0
remaining = []
for event, error, attempt in self._failed:
if attempt < self._max_retries:
bus.publish(event)
retried += 1
else:
remaining.append((event, error, attempt))
self._failed = remaining
return retried
@property
def count(self) -> int:
return len(self._failed)
Integration with EventBus¶
Wrap subscriber handlers with DLQ-aware error handling:
dlq = DeadLetterQueue()
def safe_handler(event: Event) -> None:
try:
process_event(event)
except Exception as e:
dlq.add(event, e, attempt=1)
bus.subscribe(EventType.COMPLIANCE_CHECK_COMPLETED, safe_handler)
Circuit Breakers¶
When a downstream service is unavailable, a circuit breaker prevents cascading failures by short-circuiting event delivery.
Circuit Breaker States¶
CLOSED ──[failure threshold exceeded]──► OPEN
▲ │
│ │
└──[success after half-open]──── HALF_OPEN ◄──[timeout elapsed]
| State | Behavior |
|---|---|
| CLOSED | Normal operation; events delivered to handlers |
| OPEN | Events bypassed; handler not called |
| HALF_OPEN | One test event allowed; success closes, failure re-opens |
Implementation Pattern¶
import time
class CircuitBreaker:
def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
self._failures = 0
self._threshold = failure_threshold
self._timeout = timeout
self._state = "closed"
self._opened_at: float = 0.0
def call(self, func, *args, **kwargs):
if self._state == "open":
if time.time() - self._opened_at > self._timeout:
self._state = "half_open"
else:
raise RuntimeError("Circuit breaker is open")
try:
result = func(*args, **kwargs)
if self._state == "half_open":
self._state = "closed"
self._failures = 0
return result
except Exception:
self._failures += 1
if self._failures >= self._threshold:
self._state = "open"
self._opened_at = time.time()
raise
@property
def state(self) -> str:
return self._state
Event Replay Strategies¶
Full Replay¶
Replay all events in order for debugging or state reconstruction:
from fcc.messaging.serialization import EventReplay
replay = EventReplay.from_file("events.jsonl")
for event in replay:
bus.publish(event)
Filtered Replay¶
Replay only events matching specific criteria:
compliance_events = [
e for e in replay
if e.event_type in (
EventType.COMPLIANCE_CHECK_STARTED,
EventType.COMPLIANCE_CHECK_COMPLETED,
)
]
Point-in-Time Replay¶
Replay events up to a specific timestamp:
Messaging Anti-Patterns¶
| Anti-Pattern | Problem | Solution |
|---|---|---|
| Fire-and-forget without DLQ | Lost events on handler failure | Always use DLQ |
| Unbounded subscriber lists | Memory growth | Unsubscribe inactive handlers |
| Synchronous cross-service calls | Blocking and fragile | Use async events |
| No event versioning | Breaking changes | Add version field to events |
Lab Exercise¶
Lab 23.1: Implement a PriorityEventQueue and process 10 events
of mixed priority. Verify that CRITICAL events are processed first.
Lab 23.2: Create a DeadLetterQueue, simulate 5 handler failures,
and retry them. Verify that events exceeding max retries stay in the DLQ.
Lab 23.3: Build a simple circuit breaker. Trigger 5 failures to open it, wait for the timeout, and verify it enters HALF_OPEN state.
Summary¶
- Priority queues ensure critical events (compliance violations, errors) are processed before routine events.
- Dead-letter queues capture failed events for retry and forensic analysis.
- Circuit breakers prevent cascading failures when downstream services are unavailable.
- Event replay supports debugging, state reconstruction, and compliance auditing through full, filtered, and point-in-time strategies.