Skip to content

Creating Event Subscribers

The EventSubscriberPlugin is the 10th plugin type in FCC, bridging the plugin system and the event bus. Subscriber plugins contribute event handlers that are automatically wired to the EventBus when the plugin is discovered. This tutorial shows how to create a custom subscriber that logs events, filter for specific event types, and package it as an installable plugin.

The Subscriber Protocol

The EventSubscriberPlugin ABC requires two methods:

from abc import ABC, abstractmethod
from fcc.messaging.bus import EventFilter, Subscriber
from fcc.plugins.base import PluginMeta


class EventSubscriberPlugin(ABC):

    @abstractmethod
    def plugin_meta(self) -> PluginMeta: ...

    @abstractmethod
    def get_subscribers(self) -> list[tuple[Subscriber, EventFilter | None]]:
        """Return (subscriber_callable, optional_filter) pairs."""
        ...

Each pair returned by get_subscribers consists of:

  1. A subscriber callable -- any function that accepts an Event and returns nothing.
  2. An optional EventFilter -- restricts which events are delivered. None means the subscriber receives all events.

Step 1: Create a Logging Subscriber

Here is a complete subscriber plugin that logs events to a file:

import json
import logging
from pathlib import Path

from fcc.messaging.bus import EventFilter, Subscriber
from fcc.messaging.events import Event, EventType
from fcc.messaging.plugin_bridge import EventSubscriberPlugin
from fcc.plugins.base import PluginMeta, PluginType

logger = logging.getLogger("fcc.audit")


class AuditLogSubscriber(EventSubscriberPlugin):
    """Subscriber plugin that writes audit-grade event logs."""

    def __init__(self) -> None:
        self._log_path = Path("fcc_audit.jsonl")

    def plugin_meta(self) -> PluginMeta:
        return PluginMeta(
            id="fcc-audit-logger",
            name="FCC Audit Logger",
            version="0.1.0",
            plugin_type=PluginType.SUBSCRIBERS,
            description="Writes all FCC events to a JSONL audit log",
            author="Your Name",
            source_package="fcc-audit-logger",
        )

    def get_subscribers(self) -> list[tuple[Subscriber, EventFilter | None]]:
        return [
            (self._log_all_events, None),  # Global: receives everything
            (self._log_governance_events, EventFilter(
                event_types=(
                    EventType.GOVERNANCE_GATE_EVALUATED,
                    EventType.GOVERNANCE_GATE_PASSED,
                    EventType.GOVERNANCE_GATE_FAILED,
                ),
            )),
        ]

    def _log_all_events(self, event: Event) -> None:
        """Append every event to the JSONL audit log."""
        with self._log_path.open("a") as f:
            f.write(json.dumps(event.to_dict()) + "\n")

    def _log_governance_events(self, event: Event) -> None:
        """Log governance events at WARNING level for visibility."""
        logger.warning(
            "Governance event: %s from %s%s",
            event.event_type.value,
            event.source,
            event.payload,
        )

Step 2: Register as an Entry Point

In your package's pyproject.toml:

[project.entry-points."fcc.plugins.subscribers"]
audit-logger = "fcc_audit_logger.plugin:AuditLogSubscriber"

Step 3: Wire Subscribers to the Event Bus

When your application discovers plugins, wire the subscribers to the bus:

from fcc.messaging.bus import EventBus
from fcc.plugins.registry import PluginRegistry
from fcc.plugins.base import PluginType

# Discover all plugins
plugin_registry = PluginRegistry()
plugin_registry.discover()

# Create the event bus
bus = EventBus()

# Wire subscriber plugins
subscriber_plugins = plugin_registry.get_plugins(PluginType.SUBSCRIBERS)
for plugin in subscriber_plugins:
    for subscriber, event_filter in plugin.get_subscribers():
        bus.subscribe(subscriber, event_filter)
        print(f"Wired subscriber from {plugin.plugin_meta().name}")

Using Subscribers Without the Plugin System

You can also use the subscriber pattern directly without packaging as a plugin:

from fcc.messaging.bus import EventBus, EventFilter
from fcc.messaging.events import Event, EventType

bus = EventBus()

# A simple metrics counter
action_counts: dict[str, int] = {}

def count_actions(event: Event) -> None:
    persona = event.payload.get("persona_id", "unknown")
    action_counts[persona] = action_counts.get(persona, 0) + 1

# Subscribe only to action completion events
bus.subscribe(
    count_actions,
    EventFilter(event_types=(EventType.ACTION_COMPLETED,)),
)

# Simulate some events
for persona_id in ["RC", "BC", "RC", "DE", "RC"]:
    bus.publish(Event(
        event_type=EventType.ACTION_COMPLETED,
        source="ActionEngine",
        payload={"persona_id": persona_id, "action_type": "scaffold"},
    ))

print(action_counts)
# {'RC': 3, 'BC': 1, 'DE': 1}

Advanced: Multi-Filter Subscribers

A single plugin can return multiple subscriber/filter pairs to handle different event categories with different logic:

def get_subscribers(self) -> list[tuple[Subscriber, EventFilter | None]]:
    return [
        # Track simulation lifecycle
        (self._on_simulation, EventFilter(
            event_types=(
                EventType.SIMULATION_STARTED,
                EventType.SIMULATION_COMPLETED,
            ),
        )),
        # Track collaboration turns
        (self._on_collaboration, EventFilter(
            event_types=(EventType.COLLABORATION_TURN_TAKEN,),
        )),
        # Track all plugin errors globally
        (self._on_plugin_error, EventFilter(
            event_types=(EventType.PLUGIN_ERROR,),
        )),
    ]

Advanced: Source Filtering

Filter events by the component that emitted them:

# Only receive events from the CollaborationEngine
collab_filter = EventFilter(sources=("CollaborationEngine",))

def on_collab_event(event: Event) -> None:
    session_id = event.payload.get("session_id")
    print(f"Collaboration [{event.event_type.value}]: session {session_id}")

bus.subscribe(on_collab_event, collab_filter)

Testing Subscribers

Test your subscriber by creating a bus, wiring the subscriber, and publishing events:

def test_audit_subscriber():
    from fcc.messaging.bus import EventBus
    from fcc.messaging.events import Event, EventType

    bus = EventBus()
    plugin = AuditLogSubscriber()

    # Wire subscribers
    for subscriber, event_filter in plugin.get_subscribers():
        bus.subscribe(subscriber, event_filter)

    # Publish a test event
    bus.publish(Event(
        event_type=EventType.GOVERNANCE_GATE_PASSED,
        source="test",
        payload={"gate_id": "gate-001", "score": 4.5},
    ))

    # Verify the audit log was written
    import json
    from pathlib import Path
    log_path = Path("fcc_audit.jsonl")
    assert log_path.exists()
    lines = log_path.read_text().strip().split("\n")
    assert len(lines) >= 1
    entry = json.loads(lines[-1])
    assert entry["event_type"] == "governance.gate.passed"

Next Steps