Skip to content

Core Module

The core module contains the foundational abstractions shared across all OpenJarvis pillars: the decorator-based registry system for runtime discovery, canonical data types, configuration loading with hardware detection, and the pub/sub event bus for inter-pillar communication.

Registry System

The registry pattern is the primary extension mechanism. Each pillar has its own typed registry subclass. New implementations are added by decorating a class with @XRegistry.register("name").

RegistryBase

RegistryBase

Bases: Generic[T]

Generic registry base class with class-specific entry isolation.

Functions

register classmethod

register(key: str) -> Callable[[T], T]

Decorator that registers entry under key.

Source code in src/openjarvis/core/registry.py
@classmethod
def register(cls, key: str) -> Callable[[T], T]:
    """Decorator that registers *entry* under *key*."""

    def decorator(entry: T) -> T:
        entries = cls._entries()
        if key in entries:
            raise ValueError(f"{cls.__name__} already has an entry for '{key}'")
        entries[key] = entry
        return entry

    return decorator

register_value classmethod

register_value(key: str, value: T) -> T

Imperatively register a value under key.

Source code in src/openjarvis/core/registry.py
@classmethod
def register_value(cls, key: str, value: T) -> T:
    """Imperatively register a *value* under *key*."""
    entries = cls._entries()
    if key in entries:
        raise ValueError(f"{cls.__name__} already has an entry for '{key}'")
    entries[key] = value
    return value

get classmethod

get(key: str) -> T

Retrieve the entry for key, raising KeyError if missing.

Source code in src/openjarvis/core/registry.py
@classmethod
def get(cls, key: str) -> T:
    """Retrieve the entry for *key*, raising ``KeyError`` if missing."""
    try:
        return cls._entries()[key]
    except KeyError as exc:
        raise KeyError(
            f"{cls.__name__} does not have an entry for '{key}'"
        ) from exc

create classmethod

create(key: str, *args: Any, **kwargs: Any) -> Any

Look up key and instantiate it with the given arguments.

Source code in src/openjarvis/core/registry.py
@classmethod
def create(cls, key: str, *args: Any, **kwargs: Any) -> Any:
    """Look up *key* and instantiate it with the given arguments."""
    entry = cls.get(key)
    if not callable(entry):
        raise TypeError(
            f"{cls.__name__} entry '{key}' is not callable"
            " and cannot be instantiated"
        )
    return entry(*args, **kwargs)

items classmethod

items() -> Tuple[Tuple[str, T], ...]

Return all (key, entry) pairs as a tuple.

Source code in src/openjarvis/core/registry.py
@classmethod
def items(cls) -> Tuple[Tuple[str, T], ...]:
    """Return all ``(key, entry)`` pairs as a tuple."""
    return tuple(cls._entries().items())

keys classmethod

keys() -> Tuple[str, ...]

Return all registered keys as a tuple.

Source code in src/openjarvis/core/registry.py
@classmethod
def keys(cls) -> Tuple[str, ...]:
    """Return all registered keys as a tuple."""
    return tuple(cls._entries().keys())

contains classmethod

contains(key: str) -> bool

Check whether key is registered.

Source code in src/openjarvis/core/registry.py
@classmethod
def contains(cls, key: str) -> bool:
    """Check whether *key* is registered."""
    return key in cls._entries()

clear classmethod

clear() -> None

Remove all entries (useful in tests).

Source code in src/openjarvis/core/registry.py
@classmethod
def clear(cls) -> None:
    """Remove all entries (useful in tests)."""
    cls._entries().clear()

ModelRegistry

ModelRegistry

Bases: RegistryBase[Any]

Registry for ModelSpec objects.

EngineRegistry

EngineRegistry

Bases: RegistryBase[Type['InferenceEngine']]

Registry for inference engine backends.

MemoryRegistry

MemoryRegistry

Bases: RegistryBase[Type['MemoryBackend']]

Registry for memory / retrieval backends.

AgentRegistry

AgentRegistry

Bases: RegistryBase[Type['BaseAgent']]

Registry for agent implementations.

ToolRegistry

ToolRegistry

Bases: RegistryBase[Any]

Registry for tool specifications.

RouterPolicyRegistry

RouterPolicyRegistry

Bases: RegistryBase[Any]

Registry for router policy implementations.

BenchmarkRegistry

BenchmarkRegistry

Bases: RegistryBase[Any]

Registry for benchmark implementations.

ChannelRegistry

ChannelRegistry

Bases: RegistryBase[Any]

Registry for channel implementations.

LearningRegistry

LearningRegistry

Bases: RegistryBase[Any]

Registry for learning policies.


Types

Canonical data types shared across all OpenJarvis pillars, including message structures, model specifications, telemetry records, and trace objects.

Role

Role

Bases: str, Enum

Chat message roles (OpenAI-compatible).

Quantization

Quantization

Bases: str, Enum

Model quantization formats.

StepType

StepType

Bases: str, Enum

Types of steps within an agent trace.

ToolCall

ToolCall dataclass

ToolCall(id: str, name: str, arguments: str)

A single tool invocation request embedded in an assistant message.

Message

Message dataclass

Message(role: Role, content: str = '', name: Optional[str] = None, tool_calls: Optional[List[ToolCall]] = None, tool_call_id: Optional[str] = None, metadata: Dict[str, Any] = dict())

A single chat message (OpenAI-compatible structure).

Conversation

Conversation dataclass

Conversation(messages: List[Message] = list(), max_messages: Optional[int] = None)

Ordered list of messages with an optional sliding-window cap.

Functions

add

add(message: Message) -> None

Append a message, trimming oldest if max_messages is set.

Source code in src/openjarvis/core/types.py
def add(self, message: Message) -> None:
    """Append a message, trimming oldest if *max_messages* is set."""
    self.messages.append(message)
    if self.max_messages is not None and len(self.messages) > self.max_messages:
        self.messages = self.messages[-self.max_messages :]

window

window(n: int) -> List[Message]

Return the last n messages.

Source code in src/openjarvis/core/types.py
def window(self, n: int) -> List[Message]:
    """Return the last *n* messages."""
    return self.messages[-n:]

ModelSpec

ModelSpec dataclass

ModelSpec(model_id: str, name: str, parameter_count_b: float, context_length: int, active_parameter_count_b: Optional[float] = None, quantization: Quantization = NONE, min_vram_gb: float = 0.0, supported_engines: Sequence[str] = (), provider: str = '', requires_api_key: bool = False, metadata: Dict[str, Any] = dict())

Metadata describing a language model.

ToolResult

ToolResult dataclass

ToolResult(tool_name: str, content: str, success: bool = True, usage: Dict[str, Any] = dict(), cost_usd: float = 0.0, latency_seconds: float = 0.0, metadata: Dict[str, Any] = dict())

Result returned by a tool invocation.

TelemetryRecord

TelemetryRecord dataclass

TelemetryRecord(timestamp: float, model_id: str, prompt_tokens: int = 0, completion_tokens: int = 0, total_tokens: int = 0, latency_seconds: float = 0.0, ttft: float = 0.0, cost_usd: float = 0.0, energy_joules: float = 0.0, power_watts: float = 0.0, gpu_utilization_pct: float = 0.0, gpu_memory_used_gb: float = 0.0, gpu_temperature_c: float = 0.0, throughput_tok_per_sec: float = 0.0, prefill_latency_seconds: float = 0.0, decode_latency_seconds: float = 0.0, engine: str = '', agent: str = '', metadata: Dict[str, Any] = dict())

Single telemetry observation recorded after an inference call.

TraceStep

TraceStep dataclass

TraceStep(step_type: StepType, timestamp: float, duration_seconds: float = 0.0, input: Dict[str, Any] = dict(), output: Dict[str, Any] = dict(), metadata: Dict[str, Any] = dict())

A single step within an agent trace.

Each step records what the agent did (route, retrieve, generate, tool_call, respond), its inputs and outputs, and timing.

Trace

Trace dataclass

Trace(trace_id: str = _trace_id(), query: str = '', agent: str = '', model: str = '', engine: str = '', steps: List[TraceStep] = list(), result: str = '', outcome: Optional[str] = None, feedback: Optional[float] = None, started_at: float = 0.0, ended_at: float = 0.0, total_tokens: int = 0, total_latency_seconds: float = 0.0, metadata: Dict[str, Any] = dict())

Complete trace of an agent handling a query.

A trace captures the full sequence of steps an agent took to handle a query — which model was selected, what memory was retrieved, which tools were called, and the final response. Traces are the primary input to the learning system: by analyzing which decisions led to good outcomes, the system can improve routing, tool selection, and memory strategies.

Functions

add_step

add_step(step: TraceStep) -> None

Append a step and update running totals.

Source code in src/openjarvis/core/types.py
def add_step(self, step: TraceStep) -> None:
    """Append a step and update running totals."""
    self.steps.append(step)
    self.total_latency_seconds += step.duration_seconds
    self.total_tokens += step.output.get("tokens", 0)

RoutingContext

RoutingContext dataclass

RoutingContext(query: str = '', query_length: int = 0, has_code: bool = False, has_math: bool = False, language: str = 'en', urgency: float = 0.5, metadata: Dict[str, Any] = dict())

Context describing a query for model routing decisions.


Configuration

Configuration loading, hardware detection, and engine recommendation. User configuration lives at ~/.openjarvis/config.toml. The load_config() function detects hardware, fills sensible defaults, then overlays any user overrides from the TOML file.

JarvisConfig

JarvisConfig dataclass

JarvisConfig(hardware: HardwareInfo = HardwareInfo(), engine: EngineConfig = EngineConfig(), intelligence: IntelligenceConfig = IntelligenceConfig(), learning: LearningConfig = LearningConfig(), tools: ToolsConfig = ToolsConfig(), agent: AgentConfig = AgentConfig(), server: ServerConfig = ServerConfig(), telemetry: TelemetryConfig = TelemetryConfig(), traces: TracesConfig = TracesConfig(), channel: ChannelConfig = ChannelConfig(), security: SecurityConfig = SecurityConfig(), sandbox: SandboxConfig = SandboxConfig(), scheduler: SchedulerConfig = SchedulerConfig())

Top-level configuration for OpenJarvis.

Attributes

memory property writable

memory: StorageConfig

Backward-compatible accessor — canonical location is tools.storage.

EngineConfig

EngineConfig dataclass

EngineConfig(default: str = 'ollama', ollama: OllamaEngineConfig = OllamaEngineConfig(), vllm: VLLMEngineConfig = VLLMEngineConfig(), sglang: SGLangEngineConfig = SGLangEngineConfig(), llamacpp: LlamaCppEngineConfig = LlamaCppEngineConfig())

Inference engine settings with nested per-engine configs.

Attributes

ollama_host property writable

ollama_host: str

Deprecated: use engine.ollama.host.

vllm_host property writable

vllm_host: str

Deprecated: use engine.vllm.host.

llamacpp_host property writable

llamacpp_host: str

Deprecated: use engine.llamacpp.host.

llamacpp_path property writable

llamacpp_path: str

Deprecated: use engine.llamacpp.binary_path.

sglang_host property writable

sglang_host: str

Deprecated: use engine.sglang.host.

IntelligenceConfig

IntelligenceConfig dataclass

IntelligenceConfig(default_model: str = '', fallback_model: str = '', model_path: str = '', checkpoint_path: str = '', quantization: str = 'none', preferred_engine: str = '', provider: str = '', temperature: float = 0.7, max_tokens: int = 1024, top_p: float = 0.9, top_k: int = 40, repetition_penalty: float = 1.0, stop_sequences: str = '')

The model — identity, paths, quantization, and generation defaults.

LearningConfig

LearningConfig dataclass

LearningConfig(enabled: bool = False, update_interval: int = 100, auto_update: bool = False, routing: RoutingLearningConfig = RoutingLearningConfig(), intelligence: IntelligenceLearningConfig = IntelligenceLearningConfig(), agent: AgentLearningConfig = AgentLearningConfig(), metrics: MetricsConfig = MetricsConfig())

Learning system settings with per-pillar sub-policies.

Attributes

default_policy property writable

default_policy: str

Deprecated: use learning.routing.policy.

intelligence_policy property writable

intelligence_policy: str

Deprecated: use learning.intelligence.policy.

agent_policy property writable

agent_policy: str

Deprecated: use learning.agent.policy.

reward_weights property writable

reward_weights: str

Deprecated: use learning.metrics.*.

MemoryConfig

MemoryConfig module-attribute

MemoryConfig = StorageConfig

AgentConfig

AgentConfig dataclass

AgentConfig(default_agent: str = 'simple', max_turns: int = 10, tools: str = '', objective: str = '', system_prompt: str = '', system_prompt_path: str = '', context_from_memory: bool = True)

Agent harness settings — orchestration, tools, system prompt.

Attributes

default_tools property writable

default_tools: str

Deprecated: use agent.tools.

ServerConfig

ServerConfig dataclass

ServerConfig(host: str = '0.0.0.0', port: int = 8000, agent: str = 'orchestrator', model: str = '', workers: int = 1)

API server settings.

TelemetryConfig

TelemetryConfig dataclass

TelemetryConfig(enabled: bool = True, db_path: str = str(DEFAULT_CONFIG_DIR / 'telemetry.db'), gpu_metrics: bool = False, gpu_poll_interval_ms: int = 50)

Telemetry persistence settings.

TracesConfig

TracesConfig dataclass

TracesConfig(enabled: bool = False, db_path: str = str(DEFAULT_CONFIG_DIR / 'traces.db'))

Trace system settings.

StorageConfig

StorageConfig dataclass

StorageConfig(default_backend: str = 'sqlite', db_path: str = str(DEFAULT_CONFIG_DIR / 'memory.db'), context_top_k: int = 5, context_min_score: float = 0.1, context_max_tokens: int = 2048, chunk_size: int = 512, chunk_overlap: int = 64)

Storage (memory) backend settings.

MCPConfig

MCPConfig dataclass

MCPConfig(enabled: bool = True, servers: str = '')

MCP (Model Context Protocol) settings.

ToolsConfig

ToolsConfig dataclass

ToolsConfig(storage: StorageConfig = StorageConfig(), mcp: MCPConfig = MCPConfig(), enabled: str = '')

Tools pillar settings — wraps storage and MCP configuration.

HardwareInfo

HardwareInfo dataclass

HardwareInfo(platform: str = '', cpu_brand: str = '', cpu_count: int = 0, ram_gb: float = 0.0, gpu: Optional[GpuInfo] = None)

Detected system hardware.

GpuInfo

GpuInfo dataclass

GpuInfo(vendor: str = '', name: str = '', vram_gb: float = 0.0, compute_capability: str = '', count: int = 0)

Detected GPU metadata.

detect_hardware

detect_hardware

detect_hardware() -> HardwareInfo

Auto-detect hardware capabilities with graceful fallbacks.

Source code in src/openjarvis/core/config.py
def detect_hardware() -> HardwareInfo:
    """Auto-detect hardware capabilities with graceful fallbacks."""
    gpu = _detect_nvidia_gpu() or _detect_amd_gpu() or _detect_apple_gpu()
    return HardwareInfo(
        platform=platform.system().lower(),
        cpu_brand=_detect_cpu_brand(),
        cpu_count=os.cpu_count() or 1,
        ram_gb=_total_ram_gb(),
        gpu=gpu,
    )

load_config

load_config

load_config(path: Optional[Path] = None) -> JarvisConfig

Detect hardware, build defaults, overlay TOML overrides.

PARAMETER DESCRIPTION
path

Explicit config file. Falls back to ~/.openjarvis/config.toml.

TYPE: Optional[Path] DEFAULT: None

Source code in src/openjarvis/core/config.py
def load_config(path: Optional[Path] = None) -> JarvisConfig:
    """Detect hardware, build defaults, overlay TOML overrides.

    Parameters
    ----------
    path:
        Explicit config file.  Falls back to ``~/.openjarvis/config.toml``.
    """
    hw = detect_hardware()
    cfg = JarvisConfig(hardware=hw)
    cfg.engine.default = recommend_engine(hw)

    config_path = path or DEFAULT_CONFIG_PATH
    if config_path.exists():
        with open(config_path, "rb") as fh:
            data = tomllib.load(fh)

        # Run backward-compat migrations before applying
        _migrate_toml_data(data, cfg)

        # All top-level sections — recursive _apply_toml_section handles
        # nested sub-configs (engine.ollama, learning.routing, channel.*, etc.)
        top_sections = (
            "engine", "intelligence", "learning", "agent",
            "server", "telemetry", "traces", "security",
            "channel", "tools", "sandbox", "scheduler",
        )
        for section_name in top_sections:
            if section_name in data:
                _apply_toml_section(
                    getattr(cfg, section_name), data[section_name],
                )

        # Memory: accept [memory] (old) → maps to tools.storage
        if "memory" in data:
            _apply_toml_section(cfg.tools.storage, data["memory"])

    return cfg

recommend_engine

recommend_engine

recommend_engine(hw: HardwareInfo) -> str

Suggest the best inference engine for the detected hardware.

Source code in src/openjarvis/core/config.py
def recommend_engine(hw: HardwareInfo) -> str:
    """Suggest the best inference engine for the detected hardware."""
    gpu = hw.gpu
    if gpu is None:
        return "llamacpp"
    if gpu.vendor == "apple":
        return "ollama"
    if gpu.vendor == "nvidia":
        # Datacenter cards (A100, H100, L40, etc.) → vllm; consumer → ollama
        datacenter_keywords = ("A100", "H100", "H200", "L40", "A10", "A30")
        if any(kw in gpu.name for kw in datacenter_keywords):
            return "vllm"
        return "ollama"
    if gpu.vendor == "amd":
        return "vllm"
    return "llamacpp"

Event Bus

Thread-safe pub/sub event bus for inter-pillar telemetry. Any pillar can emit events (e.g. INFERENCE_END) and any other pillar can react without direct coupling.

EventType

EventType

Bases: str, Enum

Supported event categories.

Event

Event dataclass

Event(event_type: EventType, timestamp: float, data: Dict[str, Any] = dict())

A single event published on the bus.

EventBus

EventBus

EventBus(*, record_history: bool = False)

Thread-safe publish/subscribe event bus.

Subscribers are called synchronously in registration order within the publishing thread. An optional record_history flag retains all published events for later inspection (useful in tests/telemetry).

Source code in src/openjarvis/core/events.py
def __init__(self, *, record_history: bool = False) -> None:
    self._subscribers: Dict[EventType, List[Subscriber]] = {}
    self._lock = threading.Lock()
    self._record_history = record_history
    self._history: List[Event] = []

Attributes

history property

history: List[Event]

Return a copy of all recorded events (empty if recording is off).

Functions

subscribe

subscribe(event_type: EventType, callback: Subscriber) -> None

Register callback to be called whenever event_type is published.

Source code in src/openjarvis/core/events.py
def subscribe(self, event_type: EventType, callback: Subscriber) -> None:
    """Register *callback* to be called whenever *event_type* is published."""
    with self._lock:
        self._subscribers.setdefault(event_type, []).append(callback)

unsubscribe

unsubscribe(event_type: EventType, callback: Subscriber) -> None

Remove callback from listeners for event_type.

Source code in src/openjarvis/core/events.py
def unsubscribe(self, event_type: EventType, callback: Subscriber) -> None:
    """Remove *callback* from listeners for *event_type*."""
    with self._lock:
        listeners = self._subscribers.get(event_type, [])
        try:
            listeners.remove(callback)
        except ValueError:
            pass

publish

publish(event_type: EventType, data: Optional[Dict[str, Any]] = None) -> Event

Create and dispatch an event to all subscribers.

Returns the published Event instance.

Source code in src/openjarvis/core/events.py
def publish(
    self,
    event_type: EventType,
    data: Optional[Dict[str, Any]] = None,
) -> Event:
    """Create and dispatch an event to all subscribers.

    Returns the published ``Event`` instance.
    """
    event = Event(event_type=event_type, timestamp=time.time(), data=data or {})

    with self._lock:
        if self._record_history:
            self._history.append(event)
        listeners = list(self._subscribers.get(event_type, []))

    for callback in listeners:
        callback(event)

    return event

clear_history

clear_history() -> None

Discard all recorded events.

Source code in src/openjarvis/core/events.py
def clear_history(self) -> None:
    """Discard all recorded events."""
    with self._lock:
        self._history.clear()

get_event_bus

get_event_bus

get_event_bus(*, record_history: bool = False) -> EventBus

Return the module-level EventBus singleton, creating it if needed.

Source code in src/openjarvis/core/events.py
def get_event_bus(*, record_history: bool = False) -> EventBus:
    """Return the module-level ``EventBus`` singleton, creating it if needed."""
    global _bus
    with _bus_lock:
        if _bus is None:
            _bus = EventBus(record_history=record_history)
        return _bus

reset_event_bus

reset_event_bus

reset_event_bus() -> None

Replace the singleton with a fresh instance (for tests).

Source code in src/openjarvis/core/events.py
def reset_event_bus() -> None:
    """Replace the singleton with a fresh instance (for tests)."""
    global _bus
    with _bus_lock:
        _bus = None