Skip to content

Messaging & Real-Time -- Phase 15 Addendum

This addendum extends the Messaging Real-Time Demo and Phase 14 Addendum with Phase 15 messaging enhancements: priority queues, dead-letter queues (DLQ), and circuit breaker patterns for resilient event delivery.


Priority Queues

Overview

Phase 15 introduces priority-based event processing. Events carry a priority field (0 = highest, 9 = lowest) that controls processing order when multiple events are queued.

Priority Levels

Priority Level Use Case
0 Critical Compliance gate failures, security events
1 High Audit findings, benchmark regressions
3 Normal Standard workflow events (default)
5 Low Informational events, metrics
9 Background Housekeeping, cache invalidation

Usage

from fcc.messaging.bus import EventBus
from fcc.messaging.events import Event, EventType

bus = EventBus()

# High-priority compliance event
bus.publish(Event(
    event_type=EventType.COMPLIANCE_AUDIT_COMPLETED,
    source="compliance_pipeline",
    payload={"regulation": "EU_AI_ACT", "priority": 0},
))

# Normal-priority workflow event
bus.publish(Event(
    event_type=EventType.SIMULATION_STARTED,
    source="simulation_engine",
    payload={"scenario": "baseline", "priority": 3},
))

Dead-Letter Queue (DLQ)

Overview

Events that cannot be delivered to any subscriber -- due to subscriber errors, filter mismatches with required delivery, or timeout -- are routed to the dead-letter queue for inspection and reprocessing.

DLQ Architecture

EventBus ──publish──> Subscriber A ── OK
                   └─> Subscriber B ── ERROR ──> DLQ
                   └─> Subscriber C ── OK

DLQ:
  └─ Event + Error + Timestamp + Retry Count

Inspecting the DLQ

# List unprocessed dead-letter events
dlq_events = bus.dead_letter_queue()
for entry in dlq_events:
    print(f"  Event: {entry.event.event_type.value}")
    print(f"  Error: {entry.error}")
    print(f"  Retries: {entry.retry_count}")

Reprocessing

# Retry all dead-letter events
retried = bus.retry_dead_letters()
print(f"Retried: {retried} events")

Circuit Breaker

Overview

The circuit breaker pattern prevents cascading failures when a subscriber repeatedly fails. After a configurable number of consecutive failures, the circuit opens and events bypass the failing subscriber.

Circuit States

State Description Behaviour
Closed Normal operation Events delivered to subscriber
Open Subscriber failing Events bypass subscriber, routed to DLQ
Half-Open Testing recovery Single event delivered; success closes, failure re-opens

Configuration

bus.subscribe(
    EventType.COMPLIANCE_AUDIT_COMPLETED,
    handler,
    circuit_breaker={
        "failure_threshold": 3,       # Open after 3 consecutive failures
        "recovery_timeout_ms": 5000,  # Try half-open after 5 seconds
        "success_threshold": 2,       # Close after 2 consecutive successes
    },
)

Circuit Breaker Events

Event Type Payload
messaging.circuit.opened subscriber_id, failure_count
messaging.circuit.half_opened subscriber_id, elapsed_ms
messaging.circuit.closed subscriber_id, success_count

Event Bus Metrics

Phase 15 adds built-in metrics for event bus health:

Metric Type Description
events.published Counter Total events published
events.delivered Counter Total successful deliveries
events.dropped Counter Events with no matching subscriber
events.dlq.size Gauge Current DLQ depth
events.circuit.open Gauge Number of open circuits
events.delivery.latency_ms Histogram End-to-end delivery time

Demo Walkthrough Updates

The messaging real-time demo now includes:

Step 8: Priority Queue Ordering - Publishes events with mixed priorities - Shows that high-priority events are processed first - Demonstrates priority-based subscriber filtering

Step 9: Dead-Letter Queue - Registers a failing subscriber - Shows events routed to the DLQ - Demonstrates DLQ inspection and retry

Step 10: Circuit Breaker - Registers a subscriber with circuit breaker configuration - Triggers repeated failures to open the circuit - Shows recovery through the half-open state


Tips

  • Set priority 0 for compliance and security events to ensure they are always processed first
  • Monitor DLQ depth as a health indicator -- growing DLQ suggests subscriber issues that need investigation
  • Use circuit breakers for external integrations (Slack, PagerDuty) where transient failures are expected