Skip to content

Messaging & Real-Time -- Phase 14 Addendum

This addendum extends the Messaging Real-Time Demo with Phase 14 evaluation and compliance event types that flow through the EventBus and WebSocket infrastructure.


New Event Types

Phase 14 introduces 7 new event types to the FCC event bus. These events are emitted by the BenchmarkRunner and CompliancePipeline and can be consumed by any EventBus subscriber.

Benchmark Events

Event Type Source Payload Fields
benchmark.started BenchmarkRunner suite, spec_count
benchmark.completed BenchmarkRunner suite, total, passed, failed
benchmark.regression BenchmarkRunner suite, regressions (list of spec names)

Compliance Events

Event Type Source Payload Fields
compliance.audit.started CompliancePipeline regulation, persona_count
compliance.finding.raised CompliancePipeline requirement_id, status
compliance.remediation.required CompliancePipeline requirement_id, action_id, priority
compliance.audit.completed CompliancePipeline total_checks, passed, failed, warnings, duration_ms

Subscribing to Phase 14 Events

All Benchmark Events

from fcc.messaging.bus import EventBus
from fcc.messaging.events import EventType

bus = EventBus()

# Subscribe to all benchmark events
for event_name in ["benchmark.started", "benchmark.completed", "benchmark.regression"]:
    bus.subscribe(
        EventType(event_name),
        lambda e: print(f"Benchmark: {e.event_type.value} -> {e.payload}"),
    )

All Compliance Events

for event_name in [
    "compliance.audit.started",
    "compliance.finding.raised",
    "compliance.remediation.required",
    "compliance.audit.completed",
]:
    bus.subscribe(
        EventType(event_name),
        lambda e: print(f"Compliance: {e.event_type.value} -> {e.payload}"),
    )

Filtered Subscription

Subscribe only to high-priority compliance findings:

from fcc.messaging.bus import EventFilter

bus.subscribe(
    EventType("compliance.remediation.required"),
    lambda e: notify_slack(e.payload),
    event_filter=EventFilter(
        payload_match={"priority": "high"},
    ),
)

Event Serialization and Replay

Phase 14 events support full serialization and replay using the existing EventSerializer and EventReplay infrastructure:

from fcc.messaging.serialization import EventSerializer, EventReplay

serializer = EventSerializer()

# Capture events
captured = []
bus.subscribe_all(lambda e: captured.append(e))

# Run benchmark suite (events are captured)
runner.run_suite(suite)

# Serialize for storage
data = serializer.serialize_many(captured)
with open("benchmark_events.json", "w") as f:
    f.write(data)

# Replay later
replay = EventReplay()
replayed = replay.load("benchmark_events.json")
for event in replayed:
    print(f"  {event.event_type.value}: {event.timestamp}")

WebSocket Streaming

Phase 14 events are automatically streamed to connected WebSocket clients. The frontend event feed (Protocol Explorer) now displays benchmark and compliance events alongside protocol events.

Event Categories in the Frontend

Category Event Pattern Colour
Protocol protocol.* Blue
Simulation simulation.* Green
Benchmark benchmark.* Orange
Compliance compliance.* Red

Event-Driven Audit Trail

Phase 14 events form a complete audit trail when combined with existing governance events:

Timeline:
  t0: compliance.audit.started (regulation=EU_AI_ACT, personas=102)
  t1: compliance.finding.raised (req=ART9-1, status=pass)
  t2: compliance.finding.raised (req=ART12-1, status=warning)
  t3: compliance.remediation.required (action=REM-ART12-DGS, priority=high)
  ...
  tN: compliance.audit.completed (checks=1224, passed=1180, warnings=44)

Save this trail using the SessionRecorder for audit evidence:

from fcc.collaboration.recording import SessionRecorder

recorder = SessionRecorder()
# Events are recorded as part of the session context

Plugin Subscriber Support

Phase 14 events can be consumed by EventSubscriberPlugin instances (the 10th plugin type). Create a custom subscriber that reacts to compliance findings:

from fcc.messaging.plugin_bridge import EventSubscriberPlugin

class ComplianceAlertPlugin(EventSubscriberPlugin):
    name = "compliance-alert"
    event_types = [
        "compliance.finding.raised",
        "compliance.remediation.required",
    ]

    def handle(self, event):
        if event.payload.get("priority") == "high":
            self.send_alert(event.payload)

Demo Walkthrough Updates

The messaging real-time demo now includes:

Step 6: Benchmark Event Streaming - Runs a benchmark suite with the event bus connected - Shows benchmark.started, benchmark.completed events in real time - Demonstrates regression event emission

Step 7: Compliance Event Streaming - Runs a compliance pipeline with event bus connected - Shows the full event flow: started, finding.raised, remediation.required, completed - Demonstrates event serialization and replay


Tips

  • Use event filters to focus on specific regulations or priority levels
  • Serialize Phase 14 events alongside protocol events for a complete system audit trail
  • Connect compliance events to external alerting systems (Slack, PagerDuty) via custom EventSubscriberPlugin implementations