Skip to content

Traces Module

The traces module provides full interaction-level recording for the learning system. Every agent interaction can produce a Trace capturing the sequence of steps (route, retrieve, generate, tool_call, respond) with timing, inputs, outputs, and outcomes. The TraceCollector wraps any agent to record traces automatically, while TraceAnalyzer provides aggregated statistics.

TraceStore

TraceStore

TraceStore(db_path: str | Path)

Append-only SQLite store for interaction traces.

Source code in src/openjarvis/traces/store.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path)
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute(_CREATE_TRACES)
    self._conn.execute(_CREATE_STEPS)
    self._conn.commit()

Functions

save

save(trace: Trace) -> None

Persist a complete trace with all its steps.

Source code in src/openjarvis/traces/store.py
def save(self, trace: Trace) -> None:
    """Persist a complete trace with all its steps."""
    self._conn.execute(
        _INSERT_TRACE,
        (
            trace.trace_id,
            trace.query,
            trace.agent,
            trace.model,
            trace.engine,
            trace.result,
            trace.outcome,
            trace.feedback,
            trace.started_at,
            trace.ended_at,
            trace.total_tokens,
            trace.total_latency_seconds,
            json.dumps(trace.metadata),
        ),
    )
    for idx, step in enumerate(trace.steps):
        self._conn.execute(
            _INSERT_STEP,
            (
                trace.trace_id,
                idx,
                step.step_type.value
                if isinstance(step.step_type, StepType)
                else step.step_type,
                step.timestamp,
                step.duration_seconds,
                json.dumps(step.input),
                json.dumps(step.output),
                json.dumps(step.metadata),
            ),
        )
    self._conn.commit()

get

get(trace_id: str) -> Optional[Trace]

Retrieve a trace by id, or None if not found.

Source code in src/openjarvis/traces/store.py
def get(self, trace_id: str) -> Optional[Trace]:
    """Retrieve a trace by id, or ``None`` if not found."""
    row = self._conn.execute(
        "SELECT * FROM traces WHERE trace_id = ?", (trace_id,)
    ).fetchone()
    if row is None:
        return None
    return self._row_to_trace(row)

list_traces

list_traces(*, agent: Optional[str] = None, model: Optional[str] = None, outcome: Optional[str] = None, since: Optional[float] = None, until: Optional[float] = None, limit: int = 100) -> List[Trace]

Query traces with optional filters.

Source code in src/openjarvis/traces/store.py
def list_traces(
    self,
    *,
    agent: Optional[str] = None,
    model: Optional[str] = None,
    outcome: Optional[str] = None,
    since: Optional[float] = None,
    until: Optional[float] = None,
    limit: int = 100,
) -> List[Trace]:
    """Query traces with optional filters."""
    clauses: List[str] = []
    params: List[Any] = []
    if agent is not None:
        clauses.append("agent = ?")
        params.append(agent)
    if model is not None:
        clauses.append("model = ?")
        params.append(model)
    if outcome is not None:
        clauses.append("outcome = ?")
        params.append(outcome)
    if since is not None:
        clauses.append("started_at >= ?")
        params.append(since)
    if until is not None:
        clauses.append("started_at <= ?")
        params.append(until)
    where = " AND ".join(clauses) if clauses else "1=1"
    sql = f"SELECT * FROM traces WHERE {where} ORDER BY started_at DESC LIMIT ?"
    params.append(limit)
    rows = self._conn.execute(sql, params).fetchall()
    return [self._row_to_trace(r) for r in rows]

count

count() -> int

Return the total number of stored traces.

Source code in src/openjarvis/traces/store.py
def count(self) -> int:
    """Return the total number of stored traces."""
    row = self._conn.execute("SELECT COUNT(*) FROM traces").fetchone()
    return row[0] if row else 0

subscribe_to_bus

subscribe_to_bus(bus: EventBus) -> None

Subscribe to TRACE_COMPLETE events on bus.

Source code in src/openjarvis/traces/store.py
def subscribe_to_bus(self, bus: EventBus) -> None:
    """Subscribe to ``TRACE_COMPLETE`` events on *bus*."""
    bus.subscribe(EventType.TRACE_COMPLETE, self._on_event)

close

close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/traces/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    self._conn.close()

TraceCollector

TraceCollector

TraceCollector(agent: BaseAgent, *, store: Optional[TraceStore] = None, bus: Optional[EventBus] = None)

Wraps a BaseAgent and records a :class:Trace for every run().

The collector subscribes to the EventBus to capture inference, tool, and memory events emitted during agent execution, converting them into TraceStep objects. When the agent finishes, the complete Trace is persisted to the TraceStore and published on the bus.

Usage::

agent = OrchestratorAgent(engine, model, tools=tools, bus=bus)
collector = TraceCollector(agent, store=trace_store, bus=bus)
result = collector.run("What is 2+2?")
# Trace is automatically saved to trace_store
Source code in src/openjarvis/traces/collector.py
def __init__(
    self,
    agent: BaseAgent,
    *,
    store: Optional[TraceStore] = None,
    bus: Optional[EventBus] = None,
) -> None:
    self._agent = agent
    self._store = store
    self._bus = bus
    self._current_steps: list[TraceStep] = []
    self._current_model: str = ""
    self._current_engine: str = ""

Attributes

last_trace property

last_trace: Optional[Trace]

Return the trace from the most recent run(), if available.

Functions

run

run(input: str, context: Optional[AgentContext] = None, **kwargs: Any) -> AgentResult

Execute the wrapped agent and record a trace.

Source code in src/openjarvis/traces/collector.py
def run(
    self,
    input: str,
    context: Optional[AgentContext] = None,
    **kwargs: Any,
) -> AgentResult:
    """Execute the wrapped agent and record a trace."""
    self._current_steps = []
    self._current_model = ""
    self._current_engine = ""

    # Subscribe to events for trace collection
    unsubs = self._subscribe()

    started_at = time.time()
    try:
        result = self._agent.run(input, context=context, **kwargs)
    finally:
        self._unsubscribe(unsubs)

    ended_at = time.time()

    # Add final respond step
    self._current_steps.append(
        TraceStep(
            step_type=StepType.RESPOND,
            timestamp=ended_at,
            duration_seconds=0.0,
            output={"content": result.content, "turns": result.turns},
        )
    )

    # Build and persist the trace
    trace = Trace(
        query=input,
        agent=getattr(self._agent, "agent_id", "unknown"),
        model=self._current_model,
        engine=self._current_engine,
        steps=list(self._current_steps),
        result=result.content,
        started_at=started_at,
        ended_at=ended_at,
    )
    # Recompute totals from steps
    for step in trace.steps:
        trace.total_latency_seconds += step.duration_seconds
        trace.total_tokens += step.output.get("tokens", 0)

    if self._store is not None:
        self._store.save(trace)

    if self._bus is not None:
        self._bus.publish(EventType.TRACE_COMPLETE, {"trace": trace})

    return result

TraceAnalyzer

TraceAnalyzer

TraceAnalyzer(store: TraceStore)

Read-only query layer over a :class:TraceStore.

Computes aggregated statistics from stored traces, providing the inputs that the learning system needs to update routing policies.

Source code in src/openjarvis/traces/analyzer.py
def __init__(self, store: TraceStore) -> None:
    self._store = store

Functions

summary

summary(*, since: Optional[float] = None, until: Optional[float] = None) -> TraceSummary

Compute an overall summary of all traces in the time range.

Source code in src/openjarvis/traces/analyzer.py
def summary(
    self,
    *,
    since: Optional[float] = None,
    until: Optional[float] = None,
) -> TraceSummary:
    """Compute an overall summary of all traces in the time range."""
    traces = self._store.list_traces(since=since, until=until, limit=10_000)
    if not traces:
        return TraceSummary()

    total_steps = sum(len(t.steps) for t in traces)
    evaluated = [t for t in traces if t.outcome is not None]
    successes = [t for t in evaluated if t.outcome == "success"]

    step_dist: Dict[str, int] = {}
    for t in traces:
        for s in t.steps:
            key = _step_type_str(s)
            step_dist[key] = step_dist.get(key, 0) + 1

    return TraceSummary(
        total_traces=len(traces),
        total_steps=total_steps,
        avg_steps_per_trace=total_steps / len(traces) if traces else 0.0,
        avg_latency=_avg([t.total_latency_seconds for t in traces]),
        avg_tokens=_avg([float(t.total_tokens) for t in traces]),
        success_rate=len(successes) / len(evaluated) if evaluated else 0.0,
        step_type_distribution=step_dist,
    )

per_route_stats

per_route_stats(*, since: Optional[float] = None, until: Optional[float] = None) -> List[RouteStats]

Compute stats grouped by (model, agent) routing decisions.

Source code in src/openjarvis/traces/analyzer.py
def per_route_stats(
    self,
    *,
    since: Optional[float] = None,
    until: Optional[float] = None,
) -> List[RouteStats]:
    """Compute stats grouped by (model, agent) routing decisions."""
    traces = self._store.list_traces(since=since, until=until, limit=10_000)
    groups: Dict[tuple, list[Trace]] = {}
    for t in traces:
        key = (t.model, t.agent)
        groups.setdefault(key, []).append(t)

    results = []
    for (model, agent), group in sorted(groups.items()):
        evaluated = [t for t in group if t.outcome is not None]
        successes = [t for t in evaluated if t.outcome == "success"]
        feedbacks = [t.feedback for t in group if t.feedback is not None]
        results.append(
            RouteStats(
                model=model,
                agent=agent,
                count=len(group),
                avg_latency=_avg([t.total_latency_seconds for t in group]),
                avg_tokens=_avg([float(t.total_tokens) for t in group]),
                success_rate=len(successes) / len(evaluated) if evaluated else 0.0,
                avg_feedback=_avg(feedbacks) if feedbacks else None,
            )
        )
    return results

per_tool_stats

per_tool_stats(*, since: Optional[float] = None, until: Optional[float] = None) -> List[ToolStats]

Compute stats grouped by tool name.

Source code in src/openjarvis/traces/analyzer.py
def per_tool_stats(
    self,
    *,
    since: Optional[float] = None,
    until: Optional[float] = None,
) -> List[ToolStats]:
    """Compute stats grouped by tool name."""
    traces = self._store.list_traces(since=since, until=until, limit=10_000)
    tools: Dict[str, Dict[str, Any]] = {}
    for t in traces:
        for s in t.steps:
            stype = _step_type_str(s)
            if stype != "tool_call":
                continue
            name = s.input.get("tool", "unknown")
            if name not in tools:
                tools[name] = {"count": 0, "latencies": [], "successes": 0}
            tools[name]["count"] += 1
            tools[name]["latencies"].append(s.duration_seconds)
            if s.output.get("success"):
                tools[name]["successes"] += 1

    return [
        ToolStats(
            tool_name=name,
            call_count=data["count"],
            avg_latency=_avg(data["latencies"]),
            success_rate=(
                data["successes"] / data["count"]
                if data["count"] else 0.0
            ),
        )
        for name, data in sorted(tools.items())
    ]

traces_for_query_type

traces_for_query_type(*, has_code: bool = False, min_length: Optional[int] = None, max_length: Optional[int] = None, since: Optional[float] = None, until: Optional[float] = None) -> List[Trace]

Retrieve traces matching query characteristics.

Useful for the learning system to find traces similar to a new query and learn which routing decisions worked best.

Source code in src/openjarvis/traces/analyzer.py
def traces_for_query_type(
    self,
    *,
    has_code: bool = False,
    min_length: Optional[int] = None,
    max_length: Optional[int] = None,
    since: Optional[float] = None,
    until: Optional[float] = None,
) -> List[Trace]:
    """Retrieve traces matching query characteristics.

    Useful for the learning system to find traces similar to a new
    query and learn which routing decisions worked best.
    """
    traces = self._store.list_traces(since=since, until=until, limit=10_000)
    filtered = []
    for t in traces:
        if has_code and not _looks_like_code(t.query):
            continue
        if min_length is not None and len(t.query) < min_length:
            continue
        if max_length is not None and len(t.query) > max_length:
            continue
        filtered.append(t)
    return filtered

export_traces

export_traces(*, since: Optional[float] = None, until: Optional[float] = None, limit: int = 1000) -> List[Dict[str, Any]]

Export traces as plain dicts (for JSON serialization).

Source code in src/openjarvis/traces/analyzer.py
def export_traces(
    self,
    *,
    since: Optional[float] = None,
    until: Optional[float] = None,
    limit: int = 1000,
) -> List[Dict[str, Any]]:
    """Export traces as plain dicts (for JSON serialization)."""
    traces = self._store.list_traces(since=since, until=until, limit=limit)
    return [_trace_to_dict(t) for t in traces]

RouteStats

RouteStats dataclass

RouteStats(model: str, agent: str, count: int = 0, avg_latency: float = 0.0, avg_tokens: float = 0.0, success_rate: float = 0.0, avg_feedback: Optional[float] = None)

Aggregated statistics for a specific routing decision (model+agent).

ToolStats

ToolStats dataclass

ToolStats(tool_name: str, call_count: int = 0, avg_latency: float = 0.0, success_rate: float = 0.0)

Aggregated statistics for a specific tool.

TraceSummary

TraceSummary dataclass

TraceSummary(total_traces: int = 0, total_steps: int = 0, avg_steps_per_trace: float = 0.0, avg_latency: float = 0.0, avg_tokens: float = 0.0, success_rate: float = 0.0, step_type_distribution: Dict[str, int] = dict())

Overall summary statistics across all traces.