Skip to content

Performance Tuning Guide

This guide covers optimization strategies for FCC deployments, from simulation batch sizing to knowledge graph query tuning and RAG pipeline configuration.


Simulation Batch Size Optimization

The simulation engine processes workflow nodes sequentially, making API calls at each step. For AI-powered simulations (non-mock mode), the primary bottleneck is API latency and token consumption.

Token Budget Management

Use SimulationBudget to control costs:

budget = SimulationBudget(
    max_tokens_per_turn=2048,    # Reduce for faster turns
    max_total_tokens=50_000,     # Total budget cap
    max_turns=30,                # Limit iteration count
    retry_limit=2,               # Fewer retries
    retry_backoff_seconds=0.5,   # Shorter backoff
)

Workflow Graph Selection

Graph Nodes API Calls (approx.) Use Case
base_sequence.json 5 5 Quick iteration, testing
extended_sequence.json 20 20 Standard workflow
complete_sequence.json 24 24 Full pipeline
extended_84.json 55 55 Comprehensive evaluation

Choose the smallest graph that satisfies your evaluation requirements.

Mock Mode for Development

Use mock mode (--mock) during development to avoid API calls entirely. Mock simulation uses deterministic responses and runs in milliseconds.


Event Bus Threading Configuration

The EventBus is thread-safe and uses in-process pub/sub. For high-throughput scenarios, consider these patterns.

Subscriber Performance

Each subscriber callback blocks the publish call. For slow subscribers, offload work to a separate thread:

import threading
from queue import Queue

work_queue = Queue()

def fast_subscriber(event):
    work_queue.put_nowait(event)

def worker():
    while True:
        event = work_queue.get()
        # Slow processing here
        work_queue.task_done()

threading.Thread(target=worker, daemon=True).start()
bus.subscribe(fast_subscriber)

Event Filtering

Apply EventFilter to reduce the number of events each subscriber processes:

from fcc.messaging.bus import EventFilter

# Only receive simulation events
filter = EventFilter(event_types={EventType.SIMULATION_STARTED, EventType.SIMULATION_COMPLETED})
bus.subscribe(handler, event_filter=filter)

Replay Performance

When replaying large event logs (thousands of events), batch replays into groups to avoid memory pressure:

from fcc.messaging.serialization import EventReplay

replay = EventReplay.from_file("large_session.json")
for event in replay.events:
    bus.publish(event)

Embedding Throughput Tuning

Provider Selection

Provider Throughput Quality Dependencies
MockEmbeddingProvider Very high Deterministic (testing only) None
SentenceTransformerProvider Medium Production-grade sentence-transformers

Batch Embedding

Always prefer embed_batch() over multiple embed() calls. The SentenceTransformerProvider processes batches in a single GPU/CPU pass:

# Slow: N separate calls
for text in texts:
    provider.embed(text)

# Fast: 1 batched call
provider.embed_batch(texts)

Backend Selection

Backend Best For Memory Search Speed
InMemoryBackend < 10,000 docs O(n) vectors in Python dicts O(n) per query
NumpyBackend 10,000 -- 1,000,000 docs Compact numpy array O(n) with vectorized ops

For datasets larger than 1 million documents, consider an external vector database and implement a custom SearchBackend.

Embedding Cache

SearchIndex automatically caches embeddings by content hash. Re-indexing unchanged documents reuses cached vectors:

# Adding the same text again reuses the cached embedding
index.add_document("doc1", "Same text", {"v": 2})  # No re-embedding

Knowledge Graph Query Optimization

Adjacency-Based Traversal

KnowledgeGraph uses adjacency lists for O(1) neighbor lookup. Prefer neighbors() over filtering all_edges():

# Fast: O(degree) lookup
neighbors = graph.neighbors("RC")

# Slow: O(E) full scan
edges = [e for e in graph.all_edges() if e.source_id == "RC"]

Type-Filtered Queries

Use nodes_by_type() and edges_by_type() for filtered access. These scan the full collections but are efficient for small-to-medium graphs:

personas = graph.nodes_by_type(NodeType.PERSONA)
governs = graph.edges_by_type(EdgeType.GOVERNS)

Subgraph Extraction

Extract minimal subgraphs before running expensive operations (serialization, analysis):

# Extract only the nodes you need
relevant_ids = {"RC", "SA", "cat_core"}
sub = graph.subgraph(relevant_ids)
ttl = serialize_turtle(sub)  # Much faster than serializing full graph

Serialization Performance

Format Relative Speed Output Size
N-Triples Fastest Largest
Turtle Fast Medium
JSON-LD Medium Medium
SKOS Slowest (extra logic) Medium

For large graphs (10,000+ nodes), prefer N-Triples for bulk export.


RAG Pipeline Tuning

Chunk Size vs Quality Trade-offs

Chunk Size Retrieval Precision Context Window Usage Best For
128 chars High precision Low Short Q&A
512 chars Balanced Medium General use
1024 chars More context High Deep analysis
2048 chars Broad context Very high Summary tasks

Strategy Selection by Content

Content Type Strategy Rationale
Persona YAML YAML_BLOCK Each top-level key is a natural unit
Python source CODE_FUNCTION Functions/classes are self-contained
Documentation PARAGRAPH Paragraphs preserve semantic units
Large manuals FIXED_SIZE with overlap Guaranteed coverage
Hierarchical docs PARENT_CHILD Preserves section context

Retrieval Depth

Increasing k improves recall but adds noise. Start with k=5 and increase only if answers miss relevant context:

result = pipeline.query("What is governance?", k=5)   # Default
result = pipeline.query("What is governance?", k=10)   # More context

Memory Management

Large Persona Registries

PersonaRegistry loads all personas into memory. For registries with 100+ personas, memory usage is typically under 50 MB. If memory is constrained:

  • Load only the categories you need using by_category()
  • Avoid keeping multiple registry copies

SearchIndex Persistence

Save and load indexes instead of re-embedding on each startup:

# First run: build and save
index = SearchIndex()
# ... add documents ...
index.save("persona_index.json")

# Subsequent runs: load from disk
index = SearchIndex.load("persona_index.json")

Profiling with cProfile

Use Python's built-in profiler to identify bottlenecks:

import cProfile
import pstats

profiler = cProfile.Profile()
profiler.enable()

# Code to profile
result = pipeline.query("What is the FCC workflow?", k=10)

profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats("cumulative")
stats.print_stats(20)

Common Hotspots

Hotspot Cause Mitigation
embed() calls Embedding computation Use batch embedding, caching
cosine_similarity() O(n) backend search Switch to NumpyBackend
serialize_*() Graph serialization Extract subgraph first
API calls LLM inference latency Use mock mode, reduce k

Observability Overhead

The observability layer (tracing, metrics) adds minimal overhead when using console exporters. For production:

  • Disable span export if not needed: do not call instrument_simulation_engine()
  • Use JSON file exporters for async, non-blocking output
  • OpenTelemetry integration adds approximately 1-2% overhead per instrumented function
from fcc.observability.tracing import FccTracer

# Lightweight: only trace when needed
tracer = FccTracer(service_name="fcc-production")

@tracer.traced
def critical_function():
    pass

Benchmarking Methodology

When benchmarking FCC components:

  1. Warm up -- Run the operation once before timing to populate caches
  2. Multiple iterations -- Run at least 10 iterations and report median
  3. Isolate variables -- Test one component at a time
  4. Use mock mode -- Remove API latency from non-API benchmarks
  5. Report environment -- Python version, CPU, RAM, OS
import timeit

setup = """
from fcc.search.index import SearchIndex
from fcc.search.embeddings import MockEmbeddingProvider
index = SearchIndex(provider=MockEmbeddingProvider())
for i in range(1000):
    index.add_document(f"doc_{i}", f"Document text number {i}")
"""

result = timeit.timeit("index.search('research', k=10)", setup=setup, number=100)
print(f"Average search time: {result / 100:.4f}s")

See Also