Creating Event Subscribers¶
The EventSubscriberPlugin is the 10th plugin type in FCC, bridging the plugin system and the event bus. Subscriber plugins contribute event handlers that are automatically wired to the EventBus when the plugin is discovered. This tutorial shows how to create a custom subscriber that logs events, filter for specific event types, and package it as an installable plugin.
The Subscriber Protocol¶
The EventSubscriberPlugin ABC requires two methods:
from abc import ABC, abstractmethod
from fcc.messaging.bus import EventFilter, Subscriber
from fcc.plugins.base import PluginMeta
class EventSubscriberPlugin(ABC):
@abstractmethod
def plugin_meta(self) -> PluginMeta: ...
@abstractmethod
def get_subscribers(self) -> list[tuple[Subscriber, EventFilter | None]]:
"""Return (subscriber_callable, optional_filter) pairs."""
...
Each pair returned by get_subscribers consists of:
- A subscriber callable -- any function that accepts an
Eventand returns nothing. - An optional
EventFilter-- restricts which events are delivered.Nonemeans the subscriber receives all events.
Step 1: Create a Logging Subscriber¶
Here is a complete subscriber plugin that logs events to a file:
import json
import logging
from pathlib import Path
from fcc.messaging.bus import EventFilter, Subscriber
from fcc.messaging.events import Event, EventType
from fcc.messaging.plugin_bridge import EventSubscriberPlugin
from fcc.plugins.base import PluginMeta, PluginType
logger = logging.getLogger("fcc.audit")
class AuditLogSubscriber(EventSubscriberPlugin):
"""Subscriber plugin that writes audit-grade event logs."""
def __init__(self) -> None:
self._log_path = Path("fcc_audit.jsonl")
def plugin_meta(self) -> PluginMeta:
return PluginMeta(
id="fcc-audit-logger",
name="FCC Audit Logger",
version="0.1.0",
plugin_type=PluginType.SUBSCRIBERS,
description="Writes all FCC events to a JSONL audit log",
author="Your Name",
source_package="fcc-audit-logger",
)
def get_subscribers(self) -> list[tuple[Subscriber, EventFilter | None]]:
return [
(self._log_all_events, None), # Global: receives everything
(self._log_governance_events, EventFilter(
event_types=(
EventType.GOVERNANCE_GATE_EVALUATED,
EventType.GOVERNANCE_GATE_PASSED,
EventType.GOVERNANCE_GATE_FAILED,
),
)),
]
def _log_all_events(self, event: Event) -> None:
"""Append every event to the JSONL audit log."""
with self._log_path.open("a") as f:
f.write(json.dumps(event.to_dict()) + "\n")
def _log_governance_events(self, event: Event) -> None:
"""Log governance events at WARNING level for visibility."""
logger.warning(
"Governance event: %s from %s — %s",
event.event_type.value,
event.source,
event.payload,
)
Step 2: Register as an Entry Point¶
In your package's pyproject.toml:
[project.entry-points."fcc.plugins.subscribers"]
audit-logger = "fcc_audit_logger.plugin:AuditLogSubscriber"
Step 3: Wire Subscribers to the Event Bus¶
When your application discovers plugins, wire the subscribers to the bus:
from fcc.messaging.bus import EventBus
from fcc.plugins.registry import PluginRegistry
from fcc.plugins.base import PluginType
# Discover all plugins
plugin_registry = PluginRegistry()
plugin_registry.discover()
# Create the event bus
bus = EventBus()
# Wire subscriber plugins
subscriber_plugins = plugin_registry.get_plugins(PluginType.SUBSCRIBERS)
for plugin in subscriber_plugins:
for subscriber, event_filter in plugin.get_subscribers():
bus.subscribe(subscriber, event_filter)
print(f"Wired subscriber from {plugin.plugin_meta().name}")
Using Subscribers Without the Plugin System¶
You can also use the subscriber pattern directly without packaging as a plugin:
from fcc.messaging.bus import EventBus, EventFilter
from fcc.messaging.events import Event, EventType
bus = EventBus()
# A simple metrics counter
action_counts: dict[str, int] = {}
def count_actions(event: Event) -> None:
persona = event.payload.get("persona_id", "unknown")
action_counts[persona] = action_counts.get(persona, 0) + 1
# Subscribe only to action completion events
bus.subscribe(
count_actions,
EventFilter(event_types=(EventType.ACTION_COMPLETED,)),
)
# Simulate some events
for persona_id in ["RC", "BC", "RC", "DE", "RC"]:
bus.publish(Event(
event_type=EventType.ACTION_COMPLETED,
source="ActionEngine",
payload={"persona_id": persona_id, "action_type": "scaffold"},
))
print(action_counts)
# {'RC': 3, 'BC': 1, 'DE': 1}
Advanced: Multi-Filter Subscribers¶
A single plugin can return multiple subscriber/filter pairs to handle different event categories with different logic:
def get_subscribers(self) -> list[tuple[Subscriber, EventFilter | None]]:
return [
# Track simulation lifecycle
(self._on_simulation, EventFilter(
event_types=(
EventType.SIMULATION_STARTED,
EventType.SIMULATION_COMPLETED,
),
)),
# Track collaboration turns
(self._on_collaboration, EventFilter(
event_types=(EventType.COLLABORATION_TURN_TAKEN,),
)),
# Track all plugin errors globally
(self._on_plugin_error, EventFilter(
event_types=(EventType.PLUGIN_ERROR,),
)),
]
Advanced: Source Filtering¶
Filter events by the component that emitted them:
# Only receive events from the CollaborationEngine
collab_filter = EventFilter(sources=("CollaborationEngine",))
def on_collab_event(event: Event) -> None:
session_id = event.payload.get("session_id")
print(f"Collaboration [{event.event_type.value}]: session {session_id}")
bus.subscribe(on_collab_event, collab_filter)
Testing Subscribers¶
Test your subscriber by creating a bus, wiring the subscriber, and publishing events:
def test_audit_subscriber():
from fcc.messaging.bus import EventBus
from fcc.messaging.events import Event, EventType
bus = EventBus()
plugin = AuditLogSubscriber()
# Wire subscribers
for subscriber, event_filter in plugin.get_subscribers():
bus.subscribe(subscriber, event_filter)
# Publish a test event
bus.publish(Event(
event_type=EventType.GOVERNANCE_GATE_PASSED,
source="test",
payload={"gate_id": "gate-001", "score": 4.5},
))
# Verify the audit log was written
import json
from pathlib import Path
log_path = Path("fcc_audit.jsonl")
assert log_path.exists()
lines = log_path.read_text().strip().split("\n")
assert len(lines) >= 1
entry = json.loads(lines[-1])
assert entry["event_type"] == "governance.gate.passed"
Next Steps¶
- Understanding the Event Bus -- Event model and pub/sub patterns
- Building Your First FCC Plugin -- Plugin packaging fundamentals
- Adding Observability to Workflows -- Connect events to tracing