Skip to content

Collaboration Engine

The FCC collaboration engine enables human-in-the-loop workflows with structured session management, turn-taking, quality scoring, approval gates, progress tracking, auditable shared context, and full session recording with replay. It is located in src/fcc/collaboration/.

stateDiagram-v2
    [*] --> CREATED : create_session()
    CREATED --> ACTIVE : start_session()
    ACTIVE --> ACTIVE : take_turn()
    ACTIVE --> PAUSED : pause_session()
    PAUSED --> ACTIVE : resume_session()
    ACTIVE --> COMPLETED : complete_session()
    CREATED --> ABORTED : abort_session()
    ACTIVE --> ABORTED : abort_session()
    PAUSED --> ABORTED : abort_session()
    COMPLETED --> [*]
    ABORTED --> [*]

Architecture Overview

graph TD
    CE[CollaborationEngine] --> MS[_MutableSession]
    CE --> SE[ScoringEngine]
    CE --> EB[EventBus]
    MS --> CS[CollaborationSession]
    MS --> SC[SharedContext]
    CE --> PT[ProgressTracker]
    CE --> SR[SessionRecorder]

Data Models

The collaboration system uses 11 frozen dataclasses defined in fcc.collaboration.models:

SessionStatus

Status Description
CREATED Session exists but has not started
ACTIVE Session is in progress
PAUSED Session is temporarily paused
COMPLETED Session finished successfully
ABORTED Session was cancelled

TurnType

Type Description
HUMAN Turn taken by a human participant
AGENT Turn taken by an AI persona
SYSTEM System-generated turn (e.g. gate evaluation)

ApprovalDecision

Decision Description
APPROVED Deliverable meets quality threshold
REJECTED Deliverable is below acceptable quality
NEEDS_REVISION Deliverable is close but needs improvement
DEFERRED Decision postponed

ApprovalGate

A checkpoint attached to a workflow node for quality approval:

Field Type Description
gate_id str Unique gate identifier
workflow_node_id str Workflow node this gate is attached to
required_score float Minimum score (1-5) to pass (default 3.0)
requires_human bool Whether human approval is needed (default True)
rubric tuple[str, ...] Evaluation criteria descriptions

QualityScore

Records a quality evaluation of a deliverable:

Field Type Description
score_id str Unique score identifier
deliverable_id str What was scored
scorer str Who assigned the score (human or persona ID)
score float Overall score (1-5)
rubric_scores dict[str, float] Per-rubric-item scores
justification str Text explanation
timestamp str When the score was assigned

CollaborationSession

The top-level session object (frozen snapshot):

Field Type Description
session_id str Unique session identifier
workflow_id str Associated workflow
status SessionStatus Current session status
participants tuple[str, ...] Participant identifiers
turns tuple[SessionTurn, ...] Ordered turn history
gates tuple[ApprovalGate, ...] Approval gates
handoff_protocol HandoffProtocol Turn transition rules
shared_context dict[str, Any] Current shared context snapshot

CollaborationEngine

The CollaborationEngine manages the full lifecycle of human-agent collaboration sessions.

Initialization

from fcc.collaboration.engine import CollaborationEngine
from fcc.collaboration.scoring import ScoringEngine
from fcc.messaging.bus import EventBus

engine = CollaborationEngine(
    event_bus=EventBus(),           # Optional; publishes collaboration events
    scoring_engine=ScoringEngine(),  # Optional; default ScoringEngine created
)

Creating a Session

from fcc.collaboration.models import ApprovalGate, HandoffProtocol

session = engine.create_session(
    workflow_id="base_sequence",
    participants=["human-reviewer", "RC", "BC", "DE"],
    gates=[
        ApprovalGate(
            gate_id="research-review",
            workflow_node_id="n2",
            required_score=3.5,
            requires_human=True,
            rubric=("Completeness", "Accuracy", "Source quality"),
        ),
    ],
    handoff_protocol=HandoffProtocol(),
)

Session Lifecycle

# Start the session (CREATED -> ACTIVE)
session = engine.start_session(session.session_id)

# Take a turn
session = engine.take_turn(
    session_id=session.session_id,
    participant="RC",
    turn_type=TurnType.AGENT,
    content="Research findings for the capability matrix...",
    metadata={"node_id": "n1", "action": "scaffold"},
)

# Pause and resume
session = engine.pause_session(session.session_id)
session = engine.resume_session(session.session_id)

# Complete the session
session = engine.complete_session(session.session_id)

Querying Sessions

# Get current session state
session = engine.get_session(session_id)

# List all sessions
all_sessions = engine.list_sessions()

# Filter by status
active = engine.list_sessions(status=SessionStatus.ACTIVE)

Event Publication

When an EventBus is provided, the engine publishes events at lifecycle transitions:

Event Type Trigger
collaboration.session.created create_session()
collaboration.session.started start_session()
collaboration.session.completed complete_session()
collaboration.turn.taken take_turn()

ScoringEngine

The ScoringEngine evaluates deliverable quality against rubrics and approval gates.

Scoring a Deliverable

from fcc.collaboration.scoring import ScoringEngine

scoring = ScoringEngine()

quality_score = scoring.score_deliverable(
    deliverable_id="research-matrix-v1",
    scorer="human-reviewer",
    score=4.2,
    rubric_scores={"Completeness": 4.5, "Accuracy": 4.0, "Sources": 4.1},
    justification="Good coverage with minor gaps in section 3.",
)

Evaluating at a Gate

from fcc.collaboration.models import ApprovalGate

gate = ApprovalGate(
    gate_id="research-review",
    workflow_node_id="n2",
    required_score=3.5,
)

decision, score = scoring.evaluate_at_gate(
    gate=gate,
    deliverable_id="research-matrix-v1",
    scorer="human-reviewer",
    score=4.2,
    persona_id="RC",
)
# decision = ApprovalDecision.APPROVED (4.2 >= 3.5)

Decision logic:

  • score >= required_score -> APPROVED
  • score >= required_score - 1.0 -> NEEDS_REVISION
  • score < required_score - 1.0 -> REJECTED

Capability Rating

rating = scoring.compute_capability_rating(persona_id="RC")

Computes an aggregate capability rating from the persona's scoring history.

SharedContext

The SharedContext provides an auditable key-value workspace shared across all turns in a session:

from fcc.collaboration.context import SharedContext

ctx = SharedContext()

# Set a value (records change in history)
ctx.set("research_status", "complete", actor="RC")

# Get a value
status = ctx.get("research_status")  # "complete"
status = ctx.get("missing_key", default="unknown")  # "unknown"

# Delete a value
removed = ctx.delete("research_status", actor="system")  # True

# List all keys
keys = ctx.keys()

# Export current state
data = ctx.to_dict()

# Access change history
history = ctx.history  # List of {action, key, old_value, new_value, actor, timestamp}

Every set and delete operation is recorded with a timestamp and actor for full auditability.

ProgressTracker

The ProgressTracker monitors completion progress for entities (sessions, workflows, deliverables):

from fcc.collaboration.progress import ProgressTracker

tracker = ProgressTracker()

# Register an entity for tracking
state = tracker.register("session-1", entity_type="session", total_steps=10)
# state.completed_steps == 0, state.status == "in_progress"

# Advance progress
state = tracker.advance("session-1", steps=3)
# state.completed_steps == 3, state.percentage == 30.0

# Mark as completed
state = tracker.complete("session-1")
# state.completed_steps == 10, state.status == "completed"

# Check progress
state = tracker.get("session-1")
print(state.percentage)  # 100.0

SessionRecorder

The SessionRecorder provides persistence and replay for collaboration sessions:

Saving and Loading

from fcc.collaboration.recording import SessionRecorder

# Save session to JSON file
SessionRecorder.save_json(session, "output/session.json")

# Load session from JSON file
restored = SessionRecorder.load_json("output/session.json")

Replaying Sessions

Sessions can be replayed through an EventBus, re-emitting all turn and lifecycle events:

from fcc.messaging.bus import EventBus

bus = EventBus()
bus.subscribe(my_handler)

total_deliveries = SessionRecorder.replay_session(session, bus)

The replay emits:

  1. A COLLABORATION_SESSION_CREATED event
  2. A COLLABORATION_TURN_TAKEN event for each turn in sequence

This enables testing, auditing, and debugging of collaboration workflows against recorded sessions.