Collaboration Engine¶
The FCC collaboration engine enables human-in-the-loop workflows with structured session management, turn-taking, quality scoring, approval gates, progress tracking, auditable shared context, and full session recording with replay. It is located in src/fcc/collaboration/.
stateDiagram-v2
[*] --> CREATED : create_session()
CREATED --> ACTIVE : start_session()
ACTIVE --> ACTIVE : take_turn()
ACTIVE --> PAUSED : pause_session()
PAUSED --> ACTIVE : resume_session()
ACTIVE --> COMPLETED : complete_session()
CREATED --> ABORTED : abort_session()
ACTIVE --> ABORTED : abort_session()
PAUSED --> ABORTED : abort_session()
COMPLETED --> [*]
ABORTED --> [*]
Architecture Overview¶
graph TD
CE[CollaborationEngine] --> MS[_MutableSession]
CE --> SE[ScoringEngine]
CE --> EB[EventBus]
MS --> CS[CollaborationSession]
MS --> SC[SharedContext]
CE --> PT[ProgressTracker]
CE --> SR[SessionRecorder]
Data Models¶
The collaboration system uses 11 frozen dataclasses defined in fcc.collaboration.models:
SessionStatus¶
| Status | Description |
|---|---|
CREATED |
Session exists but has not started |
ACTIVE |
Session is in progress |
PAUSED |
Session is temporarily paused |
COMPLETED |
Session finished successfully |
ABORTED |
Session was cancelled |
TurnType¶
| Type | Description |
|---|---|
HUMAN |
Turn taken by a human participant |
AGENT |
Turn taken by an AI persona |
SYSTEM |
System-generated turn (e.g. gate evaluation) |
ApprovalDecision¶
| Decision | Description |
|---|---|
APPROVED |
Deliverable meets quality threshold |
REJECTED |
Deliverable is below acceptable quality |
NEEDS_REVISION |
Deliverable is close but needs improvement |
DEFERRED |
Decision postponed |
ApprovalGate¶
A checkpoint attached to a workflow node for quality approval:
| Field | Type | Description |
|---|---|---|
gate_id |
str |
Unique gate identifier |
workflow_node_id |
str |
Workflow node this gate is attached to |
required_score |
float |
Minimum score (1-5) to pass (default 3.0) |
requires_human |
bool |
Whether human approval is needed (default True) |
rubric |
tuple[str, ...] |
Evaluation criteria descriptions |
QualityScore¶
Records a quality evaluation of a deliverable:
| Field | Type | Description |
|---|---|---|
score_id |
str |
Unique score identifier |
deliverable_id |
str |
What was scored |
scorer |
str |
Who assigned the score (human or persona ID) |
score |
float |
Overall score (1-5) |
rubric_scores |
dict[str, float] |
Per-rubric-item scores |
justification |
str |
Text explanation |
timestamp |
str |
When the score was assigned |
CollaborationSession¶
The top-level session object (frozen snapshot):
| Field | Type | Description |
|---|---|---|
session_id |
str |
Unique session identifier |
workflow_id |
str |
Associated workflow |
status |
SessionStatus |
Current session status |
participants |
tuple[str, ...] |
Participant identifiers |
turns |
tuple[SessionTurn, ...] |
Ordered turn history |
gates |
tuple[ApprovalGate, ...] |
Approval gates |
handoff_protocol |
HandoffProtocol |
Turn transition rules |
shared_context |
dict[str, Any] |
Current shared context snapshot |
CollaborationEngine¶
The CollaborationEngine manages the full lifecycle of human-agent collaboration sessions.
Initialization¶
from fcc.collaboration.engine import CollaborationEngine
from fcc.collaboration.scoring import ScoringEngine
from fcc.messaging.bus import EventBus
engine = CollaborationEngine(
event_bus=EventBus(), # Optional; publishes collaboration events
scoring_engine=ScoringEngine(), # Optional; default ScoringEngine created
)
Creating a Session¶
from fcc.collaboration.models import ApprovalGate, HandoffProtocol
session = engine.create_session(
workflow_id="base_sequence",
participants=["human-reviewer", "RC", "BC", "DE"],
gates=[
ApprovalGate(
gate_id="research-review",
workflow_node_id="n2",
required_score=3.5,
requires_human=True,
rubric=("Completeness", "Accuracy", "Source quality"),
),
],
handoff_protocol=HandoffProtocol(),
)
Session Lifecycle¶
# Start the session (CREATED -> ACTIVE)
session = engine.start_session(session.session_id)
# Take a turn
session = engine.take_turn(
session_id=session.session_id,
participant="RC",
turn_type=TurnType.AGENT,
content="Research findings for the capability matrix...",
metadata={"node_id": "n1", "action": "scaffold"},
)
# Pause and resume
session = engine.pause_session(session.session_id)
session = engine.resume_session(session.session_id)
# Complete the session
session = engine.complete_session(session.session_id)
Querying Sessions¶
# Get current session state
session = engine.get_session(session_id)
# List all sessions
all_sessions = engine.list_sessions()
# Filter by status
active = engine.list_sessions(status=SessionStatus.ACTIVE)
Event Publication¶
When an EventBus is provided, the engine publishes events at lifecycle transitions:
| Event Type | Trigger |
|---|---|
collaboration.session.created |
create_session() |
collaboration.session.started |
start_session() |
collaboration.session.completed |
complete_session() |
collaboration.turn.taken |
take_turn() |
ScoringEngine¶
The ScoringEngine evaluates deliverable quality against rubrics and approval gates.
Scoring a Deliverable¶
from fcc.collaboration.scoring import ScoringEngine
scoring = ScoringEngine()
quality_score = scoring.score_deliverable(
deliverable_id="research-matrix-v1",
scorer="human-reviewer",
score=4.2,
rubric_scores={"Completeness": 4.5, "Accuracy": 4.0, "Sources": 4.1},
justification="Good coverage with minor gaps in section 3.",
)
Evaluating at a Gate¶
from fcc.collaboration.models import ApprovalGate
gate = ApprovalGate(
gate_id="research-review",
workflow_node_id="n2",
required_score=3.5,
)
decision, score = scoring.evaluate_at_gate(
gate=gate,
deliverable_id="research-matrix-v1",
scorer="human-reviewer",
score=4.2,
persona_id="RC",
)
# decision = ApprovalDecision.APPROVED (4.2 >= 3.5)
Decision logic:
score >= required_score->APPROVEDscore >= required_score - 1.0->NEEDS_REVISIONscore < required_score - 1.0->REJECTED
Capability Rating¶
Computes an aggregate capability rating from the persona's scoring history.
SharedContext¶
The SharedContext provides an auditable key-value workspace shared across all turns in a session:
from fcc.collaboration.context import SharedContext
ctx = SharedContext()
# Set a value (records change in history)
ctx.set("research_status", "complete", actor="RC")
# Get a value
status = ctx.get("research_status") # "complete"
status = ctx.get("missing_key", default="unknown") # "unknown"
# Delete a value
removed = ctx.delete("research_status", actor="system") # True
# List all keys
keys = ctx.keys()
# Export current state
data = ctx.to_dict()
# Access change history
history = ctx.history # List of {action, key, old_value, new_value, actor, timestamp}
Every set and delete operation is recorded with a timestamp and actor for full auditability.
ProgressTracker¶
The ProgressTracker monitors completion progress for entities (sessions, workflows, deliverables):
from fcc.collaboration.progress import ProgressTracker
tracker = ProgressTracker()
# Register an entity for tracking
state = tracker.register("session-1", entity_type="session", total_steps=10)
# state.completed_steps == 0, state.status == "in_progress"
# Advance progress
state = tracker.advance("session-1", steps=3)
# state.completed_steps == 3, state.percentage == 30.0
# Mark as completed
state = tracker.complete("session-1")
# state.completed_steps == 10, state.status == "completed"
# Check progress
state = tracker.get("session-1")
print(state.percentage) # 100.0
SessionRecorder¶
The SessionRecorder provides persistence and replay for collaboration sessions:
Saving and Loading¶
from fcc.collaboration.recording import SessionRecorder
# Save session to JSON file
SessionRecorder.save_json(session, "output/session.json")
# Load session from JSON file
restored = SessionRecorder.load_json("output/session.json")
Replaying Sessions¶
Sessions can be replayed through an EventBus, re-emitting all turn and lifecycle events:
from fcc.messaging.bus import EventBus
bus = EventBus()
bus.subscribe(my_handler)
total_deliveries = SessionRecorder.replay_session(session, bus)
The replay emits:
- A
COLLABORATION_SESSION_CREATEDevent - A
COLLABORATION_TURN_TAKENevent for each turn in sequence
This enables testing, auditing, and debugging of collaboration workflows against recorded sessions.