Messaging & Real-Time -- Phase 15 Addendum¶
This addendum extends the Messaging Real-Time Demo and Phase 14 Addendum with Phase 15 messaging enhancements: priority queues, dead-letter queues (DLQ), and circuit breaker patterns for resilient event delivery.
Priority Queues¶
Overview¶
Phase 15 introduces priority-based event processing. Events carry a
priority field (0 = highest, 9 = lowest) that controls processing
order when multiple events are queued.
Priority Levels¶
| Priority | Level | Use Case |
|---|---|---|
| 0 | Critical | Compliance gate failures, security events |
| 1 | High | Audit findings, benchmark regressions |
| 3 | Normal | Standard workflow events (default) |
| 5 | Low | Informational events, metrics |
| 9 | Background | Housekeeping, cache invalidation |
Usage¶
from fcc.messaging.bus import EventBus
from fcc.messaging.events import Event, EventType
bus = EventBus()
# High-priority compliance event
bus.publish(Event(
event_type=EventType.COMPLIANCE_AUDIT_COMPLETED,
source="compliance_pipeline",
payload={"regulation": "EU_AI_ACT", "priority": 0},
))
# Normal-priority workflow event
bus.publish(Event(
event_type=EventType.SIMULATION_STARTED,
source="simulation_engine",
payload={"scenario": "baseline", "priority": 3},
))
Dead-Letter Queue (DLQ)¶
Overview¶
Events that cannot be delivered to any subscriber -- due to subscriber errors, filter mismatches with required delivery, or timeout -- are routed to the dead-letter queue for inspection and reprocessing.
DLQ Architecture¶
EventBus ──publish──> Subscriber A ── OK
└─> Subscriber B ── ERROR ──> DLQ
└─> Subscriber C ── OK
DLQ:
└─ Event + Error + Timestamp + Retry Count
Inspecting the DLQ¶
# List unprocessed dead-letter events
dlq_events = bus.dead_letter_queue()
for entry in dlq_events:
print(f" Event: {entry.event.event_type.value}")
print(f" Error: {entry.error}")
print(f" Retries: {entry.retry_count}")
Reprocessing¶
# Retry all dead-letter events
retried = bus.retry_dead_letters()
print(f"Retried: {retried} events")
Circuit Breaker¶
Overview¶
The circuit breaker pattern prevents cascading failures when a subscriber repeatedly fails. After a configurable number of consecutive failures, the circuit opens and events bypass the failing subscriber.
Circuit States¶
| State | Description | Behaviour |
|---|---|---|
| Closed | Normal operation | Events delivered to subscriber |
| Open | Subscriber failing | Events bypass subscriber, routed to DLQ |
| Half-Open | Testing recovery | Single event delivered; success closes, failure re-opens |
Configuration¶
bus.subscribe(
EventType.COMPLIANCE_AUDIT_COMPLETED,
handler,
circuit_breaker={
"failure_threshold": 3, # Open after 3 consecutive failures
"recovery_timeout_ms": 5000, # Try half-open after 5 seconds
"success_threshold": 2, # Close after 2 consecutive successes
},
)
Circuit Breaker Events¶
| Event Type | Payload |
|---|---|
messaging.circuit.opened |
subscriber_id, failure_count |
messaging.circuit.half_opened |
subscriber_id, elapsed_ms |
messaging.circuit.closed |
subscriber_id, success_count |
Event Bus Metrics¶
Phase 15 adds built-in metrics for event bus health:
| Metric | Type | Description |
|---|---|---|
events.published |
Counter | Total events published |
events.delivered |
Counter | Total successful deliveries |
events.dropped |
Counter | Events with no matching subscriber |
events.dlq.size |
Gauge | Current DLQ depth |
events.circuit.open |
Gauge | Number of open circuits |
events.delivery.latency_ms |
Histogram | End-to-end delivery time |
Demo Walkthrough Updates¶
The messaging real-time demo now includes:
Step 8: Priority Queue Ordering - Publishes events with mixed priorities - Shows that high-priority events are processed first - Demonstrates priority-based subscriber filtering
Step 9: Dead-Letter Queue - Registers a failing subscriber - Shows events routed to the DLQ - Demonstrates DLQ inspection and retry
Step 10: Circuit Breaker - Registers a subscriber with circuit breaker configuration - Triggers repeated failures to open the circuit - Shows recovery through the half-open state
Tips¶
- Set priority 0 for compliance and security events to ensure they are always processed first
- Monitor DLQ depth as a health indicator -- growing DLQ suggests subscriber issues that need investigation
- Use circuit breakers for external integrations (Slack, PagerDuty) where transient failures are expected
Related¶
- Messaging Real-Time Demo -- Base demo
- Messaging Phase 14 Addendum -- Compliance events
- Messaging Patterns Demo -- Core patterns walkthrough
- Compliance Pipeline Demo -- Event-driven auditing