Compliance Pipeline Demo¶
This demo walks through the automated compliance pipeline demo -- an interactive demonstration of the end-to-end compliance workflow with event bus integration, real-time event streaming, and evidence graph construction.
Table of Contents¶
- Introduction and Prerequisites
- Launching the Demo
- Pipeline Architecture
- Event Stream Walkthrough
- Finding and Remediation Events
- Evidence Graph Output
- Dual Pipeline Execution
- Pipeline Result Summary
Introduction and Prerequisites¶
System Requirements¶
- Python 3.10+ with FCC installed (
pip install -e ".[dev]") - No API key required
What This Demo Shows¶
The Compliance Pipeline Demo shows how CompliancePipeline orchestrates
the full audit lifecycle -- from classification through evidence graph
construction -- while emitting structured events to the EventBus. This
enables real-time dashboards, audit log archiving, and CI notifications.
Launching the Demo¶
Or programmatically:
from fcc.compliance.pipeline import CompliancePipeline
from fcc.compliance.auditor import ComplianceAuditor
from fcc.compliance.classifier import AIActClassifier
from fcc.compliance.requirements import RequirementRegistry
from fcc.governance.constitution_registry import ConstitutionRegistry
from fcc.personas.registry import PersonaRegistry
from fcc.messaging.bus import EventBus
registry = PersonaRegistry.from_data_dir()
const_reg = ConstitutionRegistry.from_registry(registry)
req_reg = RequirementRegistry.from_package_data()
classifier = AIActClassifier(constitution_registry=const_reg)
auditor = ComplianceAuditor(
requirement_registry=req_reg,
classifier=classifier,
constitution_registry=const_reg,
)
bus = EventBus()
pipeline = CompliancePipeline(
auditor=auditor,
event_bus=bus,
persona_registry=registry,
)
Pipeline Architecture¶
The compliance pipeline follows a four-stage architecture:
Each stage emits events that can be consumed by subscribers for dashboards, notifications, or archiving.
Event Stream Walkthrough¶
Subscribe to all pipeline events:
events_log = []
bus.subscribe_all(lambda e: events_log.append(e))
result = pipeline.run_full_pipeline("EU_AI_ACT")
print(f"Events captured: {len(events_log)}")
for e in events_log[:5]:
print(f" {e.event_type.value}: {e.payload}")
Finding and Remediation Events¶
The pipeline emits events for each finding and remediation:
Event: compliance.audit.started
payload: {regulation: EU_AI_ACT, persona_count: 102}
Event: compliance.finding.raised
payload: {requirement_id: EU-AI-ACT-ART12-1, status: warning}
Event: compliance.remediation.required
payload: {requirement_id: EU-AI-ACT-ART12-1,
action_id: REM-ART12-DGS-const, priority: high}
Event: compliance.audit.completed
payload: {total_checks: 1224, passed: 1180, failed: 0,
warnings: 44, duration_ms: 120.5}
Evidence Graph Output¶
The pipeline automatically builds an evidence graph:
The graph connects: - Requirement nodes (CONCEPT) - Evidence nodes (DELIVERABLE) - Persona nodes (PERSONA) - Constitution nodes (CONSTITUTION)
Dual Pipeline Execution¶
Run both EU AI Act and NIST AI RMF pipelines:
eu_result, nist_result = pipeline.run_dual_pipeline()
print(f"EU AI Act: {eu_result.report.passed}/{eu_result.report.total_checks}")
print(f"NIST AI RMF: {nist_result.report.passed}/{nist_result.report.total_checks}")
print(f"Total duration: {eu_result.duration_ms + nist_result.duration_ms:.0f} ms")
Pipeline Result Summary¶
The PipelineResult dataclass captures the full execution context:
| Field | Description |
|---|---|
regulation |
Which regulation was audited |
report |
The ComplianceReport with all findings |
duration_ms |
Execution time |
findings_raised |
Number of FINDING_RAISED events |
remediations_required |
Number of REMEDIATION_REQUIRED events |
evidence_graph_nodes |
Node count in the evidence graph |
Tips¶
- Subscribe to specific event types for targeted notifications
- Use
result.report.to_dict()for JSON export - Integrate with the collaboration engine for human-in-the-loop review of WARNING findings