Messaging & Real-Time -- Phase 14 Addendum¶
This addendum extends the Messaging Real-Time Demo with Phase 14 evaluation and compliance event types that flow through the EventBus and WebSocket infrastructure.
New Event Types¶
Phase 14 introduces 7 new event types to the FCC event bus. These events
are emitted by the BenchmarkRunner and CompliancePipeline and can
be consumed by any EventBus subscriber.
Benchmark Events¶
| Event Type | Source | Payload Fields |
|---|---|---|
benchmark.started |
BenchmarkRunner | suite, spec_count |
benchmark.completed |
BenchmarkRunner | suite, total, passed, failed |
benchmark.regression |
BenchmarkRunner | suite, regressions (list of spec names) |
Compliance Events¶
| Event Type | Source | Payload Fields |
|---|---|---|
compliance.audit.started |
CompliancePipeline | regulation, persona_count |
compliance.finding.raised |
CompliancePipeline | requirement_id, status |
compliance.remediation.required |
CompliancePipeline | requirement_id, action_id, priority |
compliance.audit.completed |
CompliancePipeline | total_checks, passed, failed, warnings, duration_ms |
Subscribing to Phase 14 Events¶
All Benchmark Events¶
from fcc.messaging.bus import EventBus
from fcc.messaging.events import EventType
bus = EventBus()
# Subscribe to all benchmark events
for event_name in ["benchmark.started", "benchmark.completed", "benchmark.regression"]:
bus.subscribe(
EventType(event_name),
lambda e: print(f"Benchmark: {e.event_type.value} -> {e.payload}"),
)
All Compliance Events¶
for event_name in [
"compliance.audit.started",
"compliance.finding.raised",
"compliance.remediation.required",
"compliance.audit.completed",
]:
bus.subscribe(
EventType(event_name),
lambda e: print(f"Compliance: {e.event_type.value} -> {e.payload}"),
)
Filtered Subscription¶
Subscribe only to high-priority compliance findings:
from fcc.messaging.bus import EventFilter
bus.subscribe(
EventType("compliance.remediation.required"),
lambda e: notify_slack(e.payload),
event_filter=EventFilter(
payload_match={"priority": "high"},
),
)
Event Serialization and Replay¶
Phase 14 events support full serialization and replay using the existing
EventSerializer and EventReplay infrastructure:
from fcc.messaging.serialization import EventSerializer, EventReplay
serializer = EventSerializer()
# Capture events
captured = []
bus.subscribe_all(lambda e: captured.append(e))
# Run benchmark suite (events are captured)
runner.run_suite(suite)
# Serialize for storage
data = serializer.serialize_many(captured)
with open("benchmark_events.json", "w") as f:
f.write(data)
# Replay later
replay = EventReplay()
replayed = replay.load("benchmark_events.json")
for event in replayed:
print(f" {event.event_type.value}: {event.timestamp}")
WebSocket Streaming¶
Phase 14 events are automatically streamed to connected WebSocket clients. The frontend event feed (Protocol Explorer) now displays benchmark and compliance events alongside protocol events.
Event Categories in the Frontend¶
| Category | Event Pattern | Colour |
|---|---|---|
| Protocol | protocol.* |
Blue |
| Simulation | simulation.* |
Green |
| Benchmark | benchmark.* |
Orange |
| Compliance | compliance.* |
Red |
Event-Driven Audit Trail¶
Phase 14 events form a complete audit trail when combined with existing governance events:
Timeline:
t0: compliance.audit.started (regulation=EU_AI_ACT, personas=102)
t1: compliance.finding.raised (req=ART9-1, status=pass)
t2: compliance.finding.raised (req=ART12-1, status=warning)
t3: compliance.remediation.required (action=REM-ART12-DGS, priority=high)
...
tN: compliance.audit.completed (checks=1224, passed=1180, warnings=44)
Save this trail using the SessionRecorder for audit evidence:
from fcc.collaboration.recording import SessionRecorder
recorder = SessionRecorder()
# Events are recorded as part of the session context
Plugin Subscriber Support¶
Phase 14 events can be consumed by EventSubscriberPlugin instances
(the 10th plugin type). Create a custom subscriber that reacts to
compliance findings:
from fcc.messaging.plugin_bridge import EventSubscriberPlugin
class ComplianceAlertPlugin(EventSubscriberPlugin):
name = "compliance-alert"
event_types = [
"compliance.finding.raised",
"compliance.remediation.required",
]
def handle(self, event):
if event.payload.get("priority") == "high":
self.send_alert(event.payload)
Demo Walkthrough Updates¶
The messaging real-time demo now includes:
Step 6: Benchmark Event Streaming - Runs a benchmark suite with the event bus connected - Shows benchmark.started, benchmark.completed events in real time - Demonstrates regression event emission
Step 7: Compliance Event Streaming - Runs a compliance pipeline with event bus connected - Shows the full event flow: started, finding.raised, remediation.required, completed - Demonstrates event serialization and replay
Tips¶
- Use event filters to focus on specific regulations or priority levels
- Serialize Phase 14 events alongside protocol events for a complete system audit trail
- Connect compliance events to external alerting systems (Slack, PagerDuty) via custom EventSubscriberPlugin implementations