Skip to content

Event Bus Publish and Subscribe

This diagram traces the two halves of FCC's messaging backbone: plugin-driven subscription at startup and per-event dispatch at runtime. The subscribe entry point is EventBus.subscribe(subscriber, event_filter) at src/fcc/messaging/bus.py:65, and the publish entry point is EventBus.publish(event) at src/fcc/messaging/bus.py:112. Developers read this trace to understand how EventSubscriberPlugin adapters inject handlers into the bus, how EventFilter narrows which subscribers are invoked, and how failed subscriber calls flow into the dead-letter queue. The bus is thread-safe and supports both sync and async subscribers via AsyncAdapter.

The sequence below shows plugin-driven subscription first, then a single publish dispatching to a matching subscriber and demonstrating the DLQ capture on exception.

sequenceDiagram
    participant Bootstrap
    participant PluginRegistry
    participant SubPlugin as EventSubscriberPlugin
    participant EventBus
    participant EventFilter
    participant Subscriber
    participant AsyncAdapter
    participant DLQ
    participant Publisher

    Bootstrap->>PluginRegistry: discover(SUBSCRIBERS)
    PluginRegistry-->>Bootstrap: list[EventSubscriberPlugin]
    loop for each plugin
        Bootstrap->>SubPlugin: get_subscribers()
        SubPlugin-->>Bootstrap: list[(subscriber, EventFilter)]
        Bootstrap->>EventBus: subscribe(subscriber, event_filter)
    end
    Note over EventBus: subscribers registered

    Publisher->>EventBus: publish(event)
    loop for each registered subscriber
        EventBus->>EventFilter: matches(event)
        alt matches
            EventFilter-->>EventBus: true
            alt async subscriber
                EventBus->>AsyncAdapter: dispatch(subscriber, event)
                AsyncAdapter-->>EventBus: ok
            else sync subscriber
                EventBus->>Subscriber: invoke(event)
                alt raises
                    Note over DLQ: captures failed invocation
                    EventBus->>DLQ: capture(event, subscriber, error)
                end
            end
        else no match
            EventFilter-->>EventBus: false
        end
    end
    EventBus-->>Publisher: ack

Failure modes are contained per-subscriber. A sync subscriber that raises is caught by the bus, the event plus error are captured in the DLQ, and dispatch continues to the next subscriber. Async subscribers surface failures through the adapter which wraps them in the same DLQ contract. A malformed EventFilter never raises — the matches method returns False on comparison errors, effectively dropping the subscriber for that event. The DLQ is inspectable via EventBus.dead_letters() and is the canonical diagnostic surface when a subscriber silently stops firing in production. Instrumentation typically tails the DLQ plus exports event_bus_publish_total and event_bus_dlq_total metrics.

Event construction carries event_type, source, payload, and correlation_id. The correlation_id is particularly load-bearing because it lets subscribers reconstruct causal chains across the 81 event types — for example stitching workflow.step events to the compliance.audited event they triggered. Subscribers should not mutate the event payload; the bus does not deep-copy and mutation has bitten integrators in the past.

Steps in detail

  1. Bootstrap to PluginRegistry: discover — At startup the registry is queried for all plugins of type SUBSCRIBERS.
  2. Bootstrap to EventSubscriberPlugin: get_subscribers — Each plugin returns a list of (subscriber, EventFilter) tuples defining what it wants to listen for.
  3. Bootstrap to EventBus: subscribe — Each tuple is registered; the bus stores them keyed for fast dispatch lookup.
  4. Publisher to EventBus: publish — A component constructs an Event and hands it to the bus; this call returns once enqueue is complete, not once all subscribers have run.
  5. EventBus to EventFilter: matches (loop) — For each registered subscriber the bus asks its filter whether the event is of interest, comparing event_types, sources, and correlation_ids.
  6. EventBus to AsyncAdapter: dispatch — If the subscriber is async the adapter schedules it on the bus's worker pool and returns an ack.
  7. EventBus to Subscriber: invoke — If the subscriber is sync the bus calls it inline with the event.
  8. EventBus to DLQ: capture — If the sync subscriber raises, the bus captures the event, subscriber reference, and exception in the dead-letter queue and continues dispatch.
  9. EventBus to Publisher: ack — After all matching subscribers have been dispatched the publish call returns.

See also

  • Subscribe entry point: src/fcc/messaging/bus.py:65
  • Publish entry point: src/fcc/messaging/bus.py:112
  • Related class diagram: ../class-diagrams/messaging-model.md
  • Related event types: src/fcc/messaging/events.pyEventType (all 81 members, discriminator for EventFilter.event_types)