Performance Tuning Guide¶
This guide covers optimization strategies for FCC deployments, from simulation batch sizing to knowledge graph query tuning and RAG pipeline configuration.
Simulation Batch Size Optimization¶
The simulation engine processes workflow nodes sequentially, making API calls at each step. For AI-powered simulations (non-mock mode), the primary bottleneck is API latency and token consumption.
Token Budget Management¶
Use SimulationBudget to control costs:
budget = SimulationBudget(
max_tokens_per_turn=2048, # Reduce for faster turns
max_total_tokens=50_000, # Total budget cap
max_turns=30, # Limit iteration count
retry_limit=2, # Fewer retries
retry_backoff_seconds=0.5, # Shorter backoff
)
Workflow Graph Selection¶
| Graph | Nodes | API Calls (approx.) | Use Case |
|---|---|---|---|
base_sequence.json |
5 | 5 | Quick iteration, testing |
extended_sequence.json |
20 | 20 | Standard workflow |
complete_sequence.json |
24 | 24 | Full pipeline |
extended_84.json |
55 | 55 | Comprehensive evaluation |
Choose the smallest graph that satisfies your evaluation requirements.
Mock Mode for Development¶
Use mock mode (--mock) during development to avoid API calls entirely. Mock simulation uses deterministic responses and runs in milliseconds.
Event Bus Threading Configuration¶
The EventBus is thread-safe and uses in-process pub/sub. For high-throughput scenarios, consider these patterns.
Subscriber Performance¶
Each subscriber callback blocks the publish call. For slow subscribers, offload work to a separate thread:
import threading
from queue import Queue
work_queue = Queue()
def fast_subscriber(event):
work_queue.put_nowait(event)
def worker():
while True:
event = work_queue.get()
# Slow processing here
work_queue.task_done()
threading.Thread(target=worker, daemon=True).start()
bus.subscribe(fast_subscriber)
Event Filtering¶
Apply EventFilter to reduce the number of events each subscriber processes:
from fcc.messaging.bus import EventFilter
# Only receive simulation events
filter = EventFilter(event_types={EventType.SIMULATION_STARTED, EventType.SIMULATION_COMPLETED})
bus.subscribe(handler, event_filter=filter)
Replay Performance¶
When replaying large event logs (thousands of events), batch replays into groups to avoid memory pressure:
from fcc.messaging.serialization import EventReplay
replay = EventReplay.from_file("large_session.json")
for event in replay.events:
bus.publish(event)
Embedding Throughput Tuning¶
Provider Selection¶
| Provider | Throughput | Quality | Dependencies |
|---|---|---|---|
MockEmbeddingProvider |
Very high | Deterministic (testing only) | None |
SentenceTransformerProvider |
Medium | Production-grade | sentence-transformers |
Batch Embedding¶
Always prefer embed_batch() over multiple embed() calls. The SentenceTransformerProvider processes batches in a single GPU/CPU pass:
# Slow: N separate calls
for text in texts:
provider.embed(text)
# Fast: 1 batched call
provider.embed_batch(texts)
Backend Selection¶
| Backend | Best For | Memory | Search Speed |
|---|---|---|---|
InMemoryBackend |
< 10,000 docs | O(n) vectors in Python dicts | O(n) per query |
NumpyBackend |
10,000 -- 1,000,000 docs | Compact numpy array | O(n) with vectorized ops |
For datasets larger than 1 million documents, consider an external vector database and implement a custom SearchBackend.
Embedding Cache¶
SearchIndex automatically caches embeddings by content hash. Re-indexing unchanged documents reuses cached vectors:
# Adding the same text again reuses the cached embedding
index.add_document("doc1", "Same text", {"v": 2}) # No re-embedding
Knowledge Graph Query Optimization¶
Adjacency-Based Traversal¶
KnowledgeGraph uses adjacency lists for O(1) neighbor lookup. Prefer neighbors() over filtering all_edges():
# Fast: O(degree) lookup
neighbors = graph.neighbors("RC")
# Slow: O(E) full scan
edges = [e for e in graph.all_edges() if e.source_id == "RC"]
Type-Filtered Queries¶
Use nodes_by_type() and edges_by_type() for filtered access. These scan the full collections but are efficient for small-to-medium graphs:
Subgraph Extraction¶
Extract minimal subgraphs before running expensive operations (serialization, analysis):
# Extract only the nodes you need
relevant_ids = {"RC", "SA", "cat_core"}
sub = graph.subgraph(relevant_ids)
ttl = serialize_turtle(sub) # Much faster than serializing full graph
Serialization Performance¶
| Format | Relative Speed | Output Size |
|---|---|---|
| N-Triples | Fastest | Largest |
| Turtle | Fast | Medium |
| JSON-LD | Medium | Medium |
| SKOS | Slowest (extra logic) | Medium |
For large graphs (10,000+ nodes), prefer N-Triples for bulk export.
RAG Pipeline Tuning¶
Chunk Size vs Quality Trade-offs¶
| Chunk Size | Retrieval Precision | Context Window Usage | Best For |
|---|---|---|---|
| 128 chars | High precision | Low | Short Q&A |
| 512 chars | Balanced | Medium | General use |
| 1024 chars | More context | High | Deep analysis |
| 2048 chars | Broad context | Very high | Summary tasks |
Strategy Selection by Content¶
| Content Type | Strategy | Rationale |
|---|---|---|
| Persona YAML | YAML_BLOCK |
Each top-level key is a natural unit |
| Python source | CODE_FUNCTION |
Functions/classes are self-contained |
| Documentation | PARAGRAPH |
Paragraphs preserve semantic units |
| Large manuals | FIXED_SIZE with overlap |
Guaranteed coverage |
| Hierarchical docs | PARENT_CHILD |
Preserves section context |
Retrieval Depth¶
Increasing k improves recall but adds noise. Start with k=5 and increase only if answers miss relevant context:
result = pipeline.query("What is governance?", k=5) # Default
result = pipeline.query("What is governance?", k=10) # More context
Memory Management¶
Large Persona Registries¶
PersonaRegistry loads all personas into memory. For registries with 100+ personas, memory usage is typically under 50 MB. If memory is constrained:
- Load only the categories you need using
by_category() - Avoid keeping multiple registry copies
SearchIndex Persistence¶
Save and load indexes instead of re-embedding on each startup:
# First run: build and save
index = SearchIndex()
# ... add documents ...
index.save("persona_index.json")
# Subsequent runs: load from disk
index = SearchIndex.load("persona_index.json")
Profiling with cProfile¶
Use Python's built-in profiler to identify bottlenecks:
import cProfile
import pstats
profiler = cProfile.Profile()
profiler.enable()
# Code to profile
result = pipeline.query("What is the FCC workflow?", k=10)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats("cumulative")
stats.print_stats(20)
Common Hotspots¶
| Hotspot | Cause | Mitigation |
|---|---|---|
embed() calls |
Embedding computation | Use batch embedding, caching |
cosine_similarity() |
O(n) backend search | Switch to NumpyBackend |
serialize_*() |
Graph serialization | Extract subgraph first |
| API calls | LLM inference latency | Use mock mode, reduce k |
Observability Overhead¶
The observability layer (tracing, metrics) adds minimal overhead when using console exporters. For production:
- Disable span export if not needed: do not call
instrument_simulation_engine() - Use JSON file exporters for async, non-blocking output
- OpenTelemetry integration adds approximately 1-2% overhead per instrumented function
from fcc.observability.tracing import FccTracer
# Lightweight: only trace when needed
tracer = FccTracer(service_name="fcc-production")
@tracer.traced
def critical_function():
pass
Benchmarking Methodology¶
When benchmarking FCC components:
- Warm up -- Run the operation once before timing to populate caches
- Multiple iterations -- Run at least 10 iterations and report median
- Isolate variables -- Test one component at a time
- Use mock mode -- Remove API latency from non-API benchmarks
- Report environment -- Python version, CPU, RAM, OS
import timeit
setup = """
from fcc.search.index import SearchIndex
from fcc.search.embeddings import MockEmbeddingProvider
index = SearchIndex(provider=MockEmbeddingProvider())
for i in range(1000):
index.add_document(f"doc_{i}", f"Document text number {i}")
"""
result = timeit.timeit("index.search('research', k=10)", setup=setup, number=100)
print(f"Average search time: {result / 100:.4f}s")
See Also¶
- Testing Guide -- Running and writing tests
- Architecture -- System architecture overview
- Integration Patterns Guide -- External integrations