Skip to content

Telemetry Module

The telemetry module provides append-only recording and read-only aggregation of inference metrics. Every engine call records timing, token counts, energy usage, and cost to SQLite via the event bus. The TelemetryAggregator provides per-model and per-engine statistics with time-range filtering.

TelemetryStore

TelemetryStore

TelemetryStore(db_path: str | Path)

Append-only SQLite store for inference telemetry records.

Source code in src/openjarvis/telemetry/store.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
    self._conn.execute(_CREATE_TABLE)
    self._conn.commit()
    self._migrate_schema()

Functions

record

record(rec: TelemetryRecord) -> None

Persist a single telemetry record.

Source code in src/openjarvis/telemetry/store.py
def record(self, rec: TelemetryRecord) -> None:
    """Persist a single telemetry record."""
    self._conn.execute(
        _INSERT,
        (
            rec.timestamp,
            rec.model_id,
            rec.engine,
            rec.agent,
            rec.prompt_tokens,
            rec.completion_tokens,
            rec.total_tokens,
            rec.latency_seconds,
            rec.ttft,
            rec.cost_usd,
            rec.energy_joules,
            rec.power_watts,
            rec.gpu_utilization_pct,
            rec.gpu_memory_used_gb,
            rec.gpu_temperature_c,
            rec.throughput_tok_per_sec,
            rec.prefill_latency_seconds,
            rec.decode_latency_seconds,
            json.dumps(rec.metadata),
        ),
    )
    self._conn.commit()

subscribe_to_bus

subscribe_to_bus(bus: EventBus) -> None

Subscribe to TELEMETRY_RECORD events on bus.

Source code in src/openjarvis/telemetry/store.py
def subscribe_to_bus(self, bus: EventBus) -> None:
    """Subscribe to ``TELEMETRY_RECORD`` events on *bus*."""
    bus.subscribe(EventType.TELEMETRY_RECORD, self._on_event)

close

close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/telemetry/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    self._conn.close()

TelemetryAggregator

TelemetryAggregator

TelemetryAggregator(db_path: str | Path)

Read-only query layer over the telemetry SQLite database.

Source code in src/openjarvis/telemetry/aggregator.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path)
    self._conn.row_factory = sqlite3.Row

ModelStats

ModelStats dataclass

ModelStats(model_id: str = '', call_count: int = 0, total_tokens: int = 0, prompt_tokens: int = 0, completion_tokens: int = 0, total_latency: float = 0.0, avg_latency: float = 0.0, total_cost: float = 0.0, avg_ttft: float = 0.0, total_energy_joules: float = 0.0, avg_gpu_utilization_pct: float = 0.0, avg_throughput_tok_per_sec: float = 0.0)

Aggregated statistics for a single model.

EngineStats

EngineStats dataclass

EngineStats(engine: str = '', call_count: int = 0, total_tokens: int = 0, total_latency: float = 0.0, avg_latency: float = 0.0, total_cost: float = 0.0, avg_ttft: float = 0.0, total_energy_joules: float = 0.0, avg_gpu_utilization_pct: float = 0.0, avg_throughput_tok_per_sec: float = 0.0)

Aggregated statistics for a single engine backend.

AggregatedStats

AggregatedStats dataclass

AggregatedStats(total_calls: int = 0, total_tokens: int = 0, total_cost: float = 0.0, total_latency: float = 0.0, per_model: List[ModelStats] = list(), per_engine: List[EngineStats] = list())

Top-level summary combining per-model and per-engine stats.


Instrumented Wrapper

instrumented_generate

instrumented_generate

instrumented_generate(engine: InferenceEngine, messages: Sequence[Message], *, model: str, bus: EventBus, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> Dict[str, Any]

Call engine.generate() and publish telemetry events on bus.

Returns the raw result dict from the engine.

Source code in src/openjarvis/telemetry/wrapper.py
def instrumented_generate(
    engine: InferenceEngine,
    messages: Sequence[Message],
    *,
    model: str,
    bus: EventBus,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Call ``engine.generate()`` and publish telemetry events on *bus*.

    Returns the raw result dict from the engine.
    """
    bus.publish(EventType.INFERENCE_START, {"model": model, "engine": engine.engine_id})

    t0 = time.time()
    result = engine.generate(
        messages, model=model, temperature=temperature, max_tokens=max_tokens, **kwargs
    )
    latency = time.time() - t0

    usage = result.get("usage", {})
    rec = TelemetryRecord(
        timestamp=t0,
        model_id=model,
        engine=engine.engine_id,
        prompt_tokens=usage.get("prompt_tokens", 0),
        completion_tokens=usage.get("completion_tokens", 0),
        total_tokens=usage.get("total_tokens", 0),
        latency_seconds=latency,
        cost_usd=result.get("cost_usd", 0.0),
    )

    bus.publish(
        EventType.INFERENCE_END,
        {"model": model, "engine": engine.engine_id, "latency": latency},
    )
    bus.publish(EventType.TELEMETRY_RECORD, {"record": rec})

    return result