Skip to content

Adding Observability to Workflows

The FCC observability layer provides structured tracing and metrics collection for workflow executions. It uses an OpenTelemetry-compatible span model with a built-in fallback implementation, so you get full observability without requiring external dependencies. This tutorial covers the FccTracer, FccMetrics, the @traced decorator, and exporters.

Architecture Overview

┌─────────────────┐     ┌───────────────┐
│   FccTracer     │     │  FccMetrics   │
│  (span model)   │     │  (counters/   │
│                 │     │   gauges)     │
└────────┬────────┘     └──────┬────────┘
         │                     │
         ▼                     ▼
┌────────────────────────────────────────┐
│            Exporters                   │
│  ConsoleSpanExporter  │ JsonFileSpan   │
│  ConsoleMetricExporter│ JsonFileMetric │
└────────────────────────────────────────┘

Creating a Tracer

The FccTracer collects spans -- structured records of operations with start/end times, status, and arbitrary attributes:

from fcc.observability.tracing import FccTracer

tracer = FccTracer(service_name="my-fcc-app")

# Check if OpenTelemetry is available (optional enhancement)
print(f"OTel available: {tracer.otel_available}")

Recording Spans

Use the span context manager to record an operation:

with tracer.span("simulation.run", attributes={"scenario_id": "GEN-001"}) as ctx:
    # Do work...
    ctx.set_attribute("persona_count", 5)
    ctx.set_attribute("workflow", "base_sequence")

# Inspect completed spans
spans = tracer.completed_spans
print(f"Completed spans: {len(spans)}")

span = spans[0]
print(f"Name: {span.name}")
print(f"Status: {span.status}")
print(f"Duration: {span.duration_ms:.1f}ms")
print(f"Attributes: {span.attributes}")

Expected output:

Completed spans: 1
Name: simulation.run
Status: ok
Duration: 2.3ms
Attributes: {'scenario_id': 'GEN-001', 'persona_count': 5, 'workflow': 'base_sequence'}

Nested Spans

Spans nest automatically. Child spans record the parent's span ID:

with tracer.span("workflow.execute") as parent:
    parent.set_attribute("graph", "base_sequence")

    with tracer.span("node.process", attributes={"persona_id": "RC"}) as child1:
        child1.set_attribute("phase", "find")

    with tracer.span("node.process", attributes={"persona_id": "BC"}) as child2:
        child2.set_attribute("phase", "create")

# All 3 spans share the same trace_id
spans = tracer.completed_spans
trace_ids = {s.trace_id for s in spans}
print(f"Unique traces: {len(trace_ids)}")  # 1

# Children reference the parent
for s in spans:
    parent_info = f"parent={s.parent_span_id[:8]}..." if s.parent_span_id else "root"
    print(f"  {s.name} ({parent_info})")

Expected output:

Unique traces: 1
  node.process (parent=a1b2c3d4...)
  node.process (parent=a1b2c3d4...)
  workflow.execute (root)

Error Tracking

When an exception occurs inside a span, the status is automatically set to "error":

try:
    with tracer.span("risky.operation") as ctx:
        ctx.set_attribute("step", "validation")
        raise ValueError("Missing required field")
except ValueError:
    pass

error_span = tracer.completed_spans[-1]
print(f"Status: {error_span.status}")
print(f"Error: {error_span.attributes.get('error.message')}")
Status: error
Error: Missing required field

The @traced Decorator

For functions that should always be traced, use the @traced decorator. The decorated function must accept a tracer keyword argument:

from fcc.observability.tracing import traced

@traced("research.analyze")
def analyze_sources(sources: list[str], tracer: FccTracer = None) -> dict:
    return {"source_count": len(sources), "quality": "high"}

# With tracing
result = analyze_sources(["doc1", "doc2"], tracer=tracer)

# Without tracing (tracer=None, runs normally)
result = analyze_sources(["doc1", "doc2"])

Collecting Metrics

FccMetrics collects counter and gauge data points with labels:

from fcc.observability.metrics import FccMetrics

metrics = FccMetrics()

# Generic counter increment
metrics.increment("fcc.requests.total", labels={"endpoint": "/simulate"})

# Generic gauge observation
metrics.observe("fcc.queue.depth", 42.0, labels={"queue": "actions"})

# Pre-defined FCC metrics
metrics.record_simulation_step(step=1, node_id="node-RC")
metrics.record_persona_activation("RC")
metrics.record_gate_result("gate-research", passed=True)
metrics.record_deliverable("RC", "scaffold")
metrics.record_action_execution("RC", "scaffold", success=True, duration_ms=150.0)
metrics.record_ai_call(
    provider="anthropic",
    model="claude-sonnet-4-20250514",
    latency_ms=1200.0,
    tokens=2048,
)

Querying Metrics

# All collected points
all_points = metrics.points
print(f"Total data points: {len(all_points)}")

# Filter by metric name
sim_steps = metrics.points_by_name("fcc.simulation.step")
print(f"Simulation steps recorded: {len(sim_steps)}")

# Sum a counter
total_activations = metrics.total("fcc.persona.activation")
print(f"Total persona activations: {total_activations}")

Exporting Spans and Metrics

Console Exporters

Print spans and metrics to stderr (or any file-like object):

import io
from fcc.observability.exporters import ConsoleSpanExporter, ConsoleMetricExporter

# Export spans to a string buffer for inspection
buf = io.StringIO()
span_exporter = ConsoleSpanExporter(file=buf)
exported = span_exporter.export(tracer.completed_spans)
print(f"Exported {exported} spans:")
print(buf.getvalue())

Output format:

[SPAN] simulation.run trace=a1b2c3d4... 2.3ms status=ok scenario_id=GEN-001
[SPAN] node.process trace=a1b2c3d4... 0.5ms status=ok persona_id=RC

JSON File Exporters

Persist spans and metrics to JSON files for later analysis:

from fcc.observability.exporters import JsonFileSpanExporter, JsonFileMetricExporter

# Export spans
span_exporter = JsonFileSpanExporter("traces/spans.json")
span_exporter.export(tracer.completed_spans)

# Export metrics
metric_exporter = JsonFileMetricExporter("traces/metrics.json")
metric_exporter.export(metrics.points)

The JSON span file format:

{
  "span_count": 3,
  "spans": [
    {
      "span_id": "...",
      "trace_id": "...",
      "name": "simulation.run",
      "start_time": "2026-03-26T...",
      "end_time": "2026-03-26T...",
      "status": "ok",
      "attributes": {"scenario_id": "GEN-001"}
    }
  ]
}

Instrumenting FCC Engines

The integration module provides helper functions to attach observability to FCC engines:

from fcc.observability.integration import (
    instrument_simulation_engine,
    instrument_action_engine,
)
from fcc.observability.tracing import FccTracer
from fcc.observability.metrics import FccMetrics
from fcc.messaging.bus import EventBus

tracer = FccTracer(service_name="fcc-production")
metrics = FccMetrics()
bus = EventBus()

# Instrument a simulation engine
# engine = AISimulationEngine(...)
# instrument_simulation_engine(engine, tracer=tracer, metrics=metrics, event_bus=bus)

# Instrument an action engine
# action_engine = ActionEngine(...)
# instrument_action_engine(action_engine, tracer=tracer, metrics=metrics, event_bus=bus)

What Instrumentation Does

instrument_simulation_engine and instrument_action_engine set the _tracer, _metrics, and _event_bus attributes on the engine instance. The engines check for these at runtime to emit spans, record metrics, and publish events.

Complete Example

from fcc.observability.tracing import FccTracer
from fcc.observability.metrics import FccMetrics
from fcc.observability.exporters import (
    ConsoleSpanExporter,
    JsonFileSpanExporter,
    ConsoleMetricExporter,
)

# Set up observability
tracer = FccTracer(service_name="tutorial")
metrics = FccMetrics()

# Simulate a workflow with tracing
with tracer.span("tutorial.workflow") as root:
    root.set_attribute("graph", "base_sequence")

    for i, persona in enumerate(["RC", "BC", "DE", "RB", "UG"]):
        with tracer.span(f"node.{persona}") as node:
            node.set_attribute("persona_id", persona)
            node.set_attribute("step", i + 1)
            metrics.record_persona_activation(persona)
            metrics.record_simulation_step(i + 1, f"node-{persona}")

# Export results
print(f"\nSpans: {len(tracer.completed_spans)}")
print(f"Metrics: {len(metrics.points)}")

ConsoleSpanExporter().export(tracer.completed_spans)
JsonFileSpanExporter("tutorial_spans.json").export(tracer.completed_spans)

Next Steps