Skip to content

Chapter 23: Advanced Messaging

The FCC event bus provides the foundation for asynchronous communication between framework components. This chapter covers advanced messaging patterns including priority queues, dead-letter queues, circuit breakers, and replay strategies.

The sequence diagram below shows how events flow through the priority queue, circuit breaker, and dead-letter queue to either a handler or quarantine, depending on breaker state and handler success.

sequenceDiagram
    participant P as Publisher
    participant PQ as PriorityQueue
    participant EB as EventBus
    participant H as Handler
    participant CB as CircuitBreaker
    participant DLQ as Dead-Letter Queue

    P->>PQ: push(event, priority=CRITICAL)
    P->>PQ: push(event, priority=NORMAL)
    PQ->>EB: pop() delivers CRITICAL first
    EB->>CB: check state
    alt Circuit CLOSED
        CB->>H: deliver(event)
        alt Handler succeeds
            H-->>EB: ok
        else Handler fails
            H-->>CB: error
            CB->>CB: increment failures
            CB->>DLQ: add(event, error)
        end
    else Circuit OPEN
        CB->>DLQ: bypass to DLQ
    end
    Note over DLQ: Retry later or<br/>inspect forensically

Combining all three is what keeps the bus responsive under partial failure without silently dropping events.

Event Bus Refresher

The core EventBus in fcc.messaging.bus provides:

  • Thread-safe publish/subscribe.
  • 81 event types covering all framework operations.
  • Event filtering by type, source, and custom predicates.
  • Serialization and replay via EventSerializer and EventReplay.
from fcc.messaging.bus import EventBus
from fcc.messaging.events import Event, EventType

bus = EventBus()

def handler(event: Event) -> None:
    print(f"Received: {event.event_type.value}")

bus.subscribe(EventType.SIMULATION_STARTED, handler)
bus.publish(Event(event_type=EventType.SIMULATION_STARTED))

Priority Queues

Not all events are equal. Some require immediate processing (e.g., compliance violations) while others can be deferred (e.g., metrics updates).

Priority Levels

Priority Name Use Case
0 CRITICAL Compliance violations, hard-stop rules
1 HIGH Approval gate failures, session errors
2 NORMAL Standard workflow events
3 LOW Metrics, logging, non-critical updates

Implementation Pattern

import heapq
from dataclasses import dataclass, field

@dataclass(order=True)
class PrioritizedEvent:
    priority: int
    event: Event = field(compare=False)

class PriorityEventQueue:
    def __init__(self):
        self._queue: list[PrioritizedEvent] = []

    def push(self, event: Event, priority: int = 2) -> None:
        heapq.heappush(self._queue, PrioritizedEvent(priority, event))

    def pop(self) -> Event:
        return heapq.heappop(self._queue).event

    def __len__(self) -> int:
        return len(self._queue)

Dead-Letter Queues

When an event handler fails, the event should not be lost. A dead-letter queue (DLQ) captures failed events for later inspection and replay.

DLQ Pattern

class DeadLetterQueue:
    def __init__(self, max_retries: int = 3):
        self._failed: list[tuple[Event, Exception, int]] = []
        self._max_retries = max_retries

    def add(self, event: Event, error: Exception, attempt: int) -> None:
        self._failed.append((event, error, attempt))

    def retry_all(self, bus: EventBus) -> int:
        retried = 0
        remaining = []
        for event, error, attempt in self._failed:
            if attempt < self._max_retries:
                bus.publish(event)
                retried += 1
            else:
                remaining.append((event, error, attempt))
        self._failed = remaining
        return retried

    @property
    def count(self) -> int:
        return len(self._failed)

Integration with EventBus

Wrap subscriber handlers with DLQ-aware error handling:

dlq = DeadLetterQueue()

def safe_handler(event: Event) -> None:
    try:
        process_event(event)
    except Exception as e:
        dlq.add(event, e, attempt=1)

bus.subscribe(EventType.COMPLIANCE_CHECK_COMPLETED, safe_handler)

Circuit Breakers

When a downstream service is unavailable, a circuit breaker prevents cascading failures by short-circuiting event delivery.

Circuit Breaker States

CLOSED ──[failure threshold exceeded]──► OPEN
   ▲                                        │
   │                                        │
   └──[success after half-open]──── HALF_OPEN ◄──[timeout elapsed]
State Behavior
CLOSED Normal operation; events delivered to handlers
OPEN Events bypassed; handler not called
HALF_OPEN One test event allowed; success closes, failure re-opens

Implementation Pattern

import time

class CircuitBreaker:
    def __init__(self, failure_threshold: int = 5, timeout: float = 60.0):
        self._failures = 0
        self._threshold = failure_threshold
        self._timeout = timeout
        self._state = "closed"
        self._opened_at: float = 0.0

    def call(self, func, *args, **kwargs):
        if self._state == "open":
            if time.time() - self._opened_at > self._timeout:
                self._state = "half_open"
            else:
                raise RuntimeError("Circuit breaker is open")

        try:
            result = func(*args, **kwargs)
            if self._state == "half_open":
                self._state = "closed"
                self._failures = 0
            return result
        except Exception:
            self._failures += 1
            if self._failures >= self._threshold:
                self._state = "open"
                self._opened_at = time.time()
            raise

    @property
    def state(self) -> str:
        return self._state

Event Replay Strategies

Full Replay

Replay all events in order for debugging or state reconstruction:

from fcc.messaging.serialization import EventReplay

replay = EventReplay.from_file("events.jsonl")
for event in replay:
    bus.publish(event)

Filtered Replay

Replay only events matching specific criteria:

compliance_events = [
    e for e in replay
    if e.event_type in (
        EventType.COMPLIANCE_CHECK_STARTED,
        EventType.COMPLIANCE_CHECK_COMPLETED,
    )
]

Point-in-Time Replay

Replay events up to a specific timestamp:

cutoff = "2026-03-31T12:00:00Z"
events_before_cutoff = [e for e in replay if e.timestamp < cutoff]

Messaging Anti-Patterns

Anti-Pattern Problem Solution
Fire-and-forget without DLQ Lost events on handler failure Always use DLQ
Unbounded subscriber lists Memory growth Unsubscribe inactive handlers
Synchronous cross-service calls Blocking and fragile Use async events
No event versioning Breaking changes Add version field to events

Lab Exercise

Lab 23.1: Implement a PriorityEventQueue and process 10 events of mixed priority. Verify that CRITICAL events are processed first.

Lab 23.2: Create a DeadLetterQueue, simulate 5 handler failures, and retry them. Verify that events exceeding max retries stay in the DLQ.

Lab 23.3: Build a simple circuit breaker. Trigger 5 failures to open it, wait for the timeout, and verify it enters HALF_OPEN state.

Summary

  • Priority queues ensure critical events (compliance violations, errors) are processed before routine events.
  • Dead-letter queues capture failed events for retry and forensic analysis.
  • Circuit breakers prevent cascading failures when downstream services are unavailable.
  • Event replay supports debugging, state reconstruction, and compliance auditing through full, filtered, and point-in-time strategies.