Skip to content

Memory Module

The memory module implements persistent searchable storage for document retrieval. All backends implement the MemoryBackend ABC with store(), retrieve(), delete(), and clear() methods. The module also includes a document ingestion pipeline (chunking, file reading) and context injection for augmenting prompts with retrieved knowledge.

Canonical import location

Memory backends have moved to openjarvis.tools.storage.*. The old openjarvis.memory.* imports still work via backward-compatibility shims.

Abstract Base Class

MemoryBackend

MemoryBackend

Bases: ABC

Base class for all memory / retrieval backends.

Subclasses must be registered via @MemoryRegistry.register("name") to become discoverable.

Functions

store abstractmethod

store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str

Persist content and return a unique document id.

Source code in src/openjarvis/tools/storage/_stubs.py
@abstractmethod
def store(
    self,
    content: str,
    *,
    source: str = "",
    metadata: Optional[Dict[str, Any]] = None,
) -> str:
    """Persist *content* and return a unique document id."""

retrieve abstractmethod

retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]

Search for query and return the top-k results.

Source code in src/openjarvis/tools/storage/_stubs.py
@abstractmethod
def retrieve(
    self,
    query: str,
    *,
    top_k: int = 5,
    **kwargs: Any,
) -> List[RetrievalResult]:
    """Search for *query* and return the top-k results."""

delete abstractmethod

delete(doc_id: str) -> bool

Delete a document by id. Return True if it existed.

Source code in src/openjarvis/tools/storage/_stubs.py
@abstractmethod
def delete(self, doc_id: str) -> bool:
    """Delete a document by id. Return ``True`` if it existed."""

clear abstractmethod

clear() -> None

Remove all stored documents.

Source code in src/openjarvis/tools/storage/_stubs.py
@abstractmethod
def clear(self) -> None:
    """Remove all stored documents."""

RetrievalResult

RetrievalResult dataclass

RetrievalResult(content: str, score: float = 0.0, source: str = '', metadata: Dict[str, Any] = dict())

A single result returned by a memory backend query.


Backend Implementations

SQLiteMemory

SQLiteMemory

SQLiteMemory(db_path: str | Path = '')

Bases: MemoryBackend

Full-text search memory backend using SQLite FTS5.

Uses the built-in sqlite3 module — no extra dependencies.

Source code in src/openjarvis/tools/storage/sqlite.py
def __init__(self, db_path: str | Path = "") -> None:
    if not db_path:
        from openjarvis.core.config import DEFAULT_CONFIG_DIR
        db_path = str(DEFAULT_CONFIG_DIR / "memory.db")

    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path)
    self._conn.row_factory = sqlite3.Row

    if not _check_fts5(self._conn):
        raise RuntimeError(
            "SQLite FTS5 extension is not available. "
            "Upgrade your Python or install a SQLite build "
            "with FTS5 enabled."
        )

    self._create_tables()

Functions

store

store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str

Persist content and return a unique document id.

Source code in src/openjarvis/tools/storage/sqlite.py
def store(
    self,
    content: str,
    *,
    source: str = "",
    metadata: Optional[Dict[str, Any]] = None,
) -> str:
    """Persist *content* and return a unique document id."""
    import time

    doc_id = uuid.uuid4().hex
    meta_json = json.dumps(metadata or {})
    self._conn.execute(
        "INSERT INTO documents (id, content, source, metadata, created_at)"
        " VALUES (?, ?, ?, ?, ?)",
        (doc_id, content, source, meta_json, time.time()),
    )
    self._conn.commit()

    bus = get_event_bus()
    bus.publish(EventType.MEMORY_STORE, {
        "backend": self.backend_id,
        "doc_id": doc_id,
        "source": source,
    })
    return doc_id

retrieve

retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]

Search via FTS5 MATCH with BM25 ranking.

Source code in src/openjarvis/tools/storage/sqlite.py
def retrieve(
    self,
    query: str,
    *,
    top_k: int = 5,
    **kwargs: Any,
) -> List[RetrievalResult]:
    """Search via FTS5 MATCH with BM25 ranking."""
    if not query.strip():
        return []

    # Escape FTS5 special characters
    safe_query = self._escape_fts_query(query)
    if not safe_query:
        return []

    try:
        rows = self._conn.execute(
            "SELECT d.id, d.content, d.source, d.metadata,"
            "       rank AS score"
            "  FROM documents_fts f"
            "  JOIN documents d ON d.rowid = f.rowid"
            " WHERE documents_fts MATCH ?"
            " ORDER BY rank"
            " LIMIT ?",
            (safe_query, top_k),
        ).fetchall()
    except sqlite3.OperationalError:
        return []

    results = []
    for row in rows:
        # FTS5 rank is negative (more negative = better match)
        # Convert to positive score for consistency
        score = -float(row["score"]) if row["score"] else 0.0
        results.append(RetrievalResult(
            content=row["content"],
            score=score,
            source=row["source"],
            metadata=json.loads(row["metadata"]),
        ))

    bus = get_event_bus()
    bus.publish(EventType.MEMORY_RETRIEVE, {
        "backend": self.backend_id,
        "query": query,
        "num_results": len(results),
    })
    return results

delete

delete(doc_id: str) -> bool

Delete a document by id. Return True if it existed.

Source code in src/openjarvis/tools/storage/sqlite.py
def delete(self, doc_id: str) -> bool:
    """Delete a document by id. Return True if it existed."""
    cursor = self._conn.execute(
        "DELETE FROM documents WHERE id = ?", (doc_id,)
    )
    self._conn.commit()
    return cursor.rowcount > 0

clear

clear() -> None

Remove all stored documents.

Source code in src/openjarvis/tools/storage/sqlite.py
def clear(self) -> None:
    """Remove all stored documents."""
    self._conn.execute("DELETE FROM documents")
    self._conn.commit()

count

count() -> int

Return the number of stored documents.

Source code in src/openjarvis/tools/storage/sqlite.py
def count(self) -> int:
    """Return the number of stored documents."""
    row = self._conn.execute(
        "SELECT COUNT(*) FROM documents"
    ).fetchone()
    return row[0] if row else 0

close

close() -> None

Close the database connection.

Source code in src/openjarvis/tools/storage/sqlite.py
def close(self) -> None:
    """Close the database connection."""
    self._conn.close()

FAISSMemory

FAISSMemory

FAISSMemory(*, embedder: Embedder | None = None)

Bases: MemoryBackend

Dense retrieval backend powered by FAISS.

Stores document embeddings in a faiss.IndexFlatIP index (inner-product, which equals cosine similarity when vectors are L2-normalised before insertion/search).

Source code in src/openjarvis/tools/storage/faiss_backend.py
def __init__(
    self,
    *,
    embedder: Embedder | None = None,
) -> None:
    if embedder is None:
        embedder = SentenceTransformerEmbedder()
    self._embedder = embedder
    self._index = faiss.IndexFlatIP(self._embedder.dim())
    self._documents: Dict[
        str, Tuple[str, str, Dict[str, Any]]
    ] = {}
    self._id_map: List[str] = []
    self._deleted: Set[str] = set()

Functions

store

store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str

Embed and store content, returning a unique doc id.

Source code in src/openjarvis/tools/storage/faiss_backend.py
def store(
    self,
    content: str,
    *,
    source: str = "",
    metadata: Optional[Dict[str, Any]] = None,
) -> str:
    """Embed and store *content*, returning a unique doc id."""
    doc_id = uuid.uuid4().hex
    meta = metadata if metadata is not None else {}

    vec = self._embedder.embed([content])
    faiss.normalize_L2(vec)
    self._index.add(vec)

    self._documents[doc_id] = (content, source, meta)
    self._id_map.append(doc_id)

    bus = get_event_bus()
    bus.publish(
        EventType.MEMORY_STORE,
        {
            "backend": self.backend_id,
            "doc_id": doc_id,
            "source": source,
        },
    )
    return doc_id

retrieve

retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]

Embed query and return the top-k most similar docs.

Source code in src/openjarvis/tools/storage/faiss_backend.py
def retrieve(
    self,
    query: str,
    *,
    top_k: int = 5,
    **kwargs: Any,
) -> List[RetrievalResult]:
    """Embed *query* and return the top-k most similar docs."""
    if not query.strip() or self._index.ntotal == 0:
        bus = get_event_bus()
        bus.publish(
            EventType.MEMORY_RETRIEVE,
            {
                "backend": self.backend_id,
                "query": query,
                "num_results": 0,
            },
        )
        return []

    vec = self._embedder.embed([query])
    faiss.normalize_L2(vec)

    # Request more results to compensate for deleted docs
    k = min(
        top_k + len(self._deleted),
        self._index.ntotal,
    )
    scores, indices = self._index.search(vec, k)

    results: List[RetrievalResult] = []
    for score, idx in zip(
        scores[0].tolist(), indices[0].tolist()
    ):
        if idx < 0:
            continue
        doc_id = self._id_map[idx]
        if doc_id in self._deleted:
            continue
        content, source, meta = self._documents[doc_id]
        results.append(
            RetrievalResult(
                content=content,
                score=float(score),
                source=source,
                metadata=dict(meta),
            )
        )
        if len(results) >= top_k:
            break

    bus = get_event_bus()
    bus.publish(
        EventType.MEMORY_RETRIEVE,
        {
            "backend": self.backend_id,
            "query": query,
            "num_results": len(results),
        },
    )
    return results

delete

delete(doc_id: str) -> bool

Soft-delete doc_id. Return True if it existed.

Source code in src/openjarvis/tools/storage/faiss_backend.py
def delete(self, doc_id: str) -> bool:
    """Soft-delete *doc_id*.  Return True if it existed."""
    if (
        doc_id not in self._documents
        or doc_id in self._deleted
    ):
        return False
    self._deleted.add(doc_id)
    return True

clear

clear() -> None

Reset the index and all internal storage.

Source code in src/openjarvis/tools/storage/faiss_backend.py
def clear(self) -> None:
    """Reset the index and all internal storage."""
    self._index.reset()
    self._documents.clear()
    self._id_map.clear()
    self._deleted.clear()

ColBERTMemory

ColBERTMemory

ColBERTMemory(*, checkpoint: str = 'colbert-ir/colbertv2.0', device: str = 'cpu')

Bases: MemoryBackend

In-memory ColBERTv2 late interaction retrieval backend.

Encodes queries and documents into token-level embeddings using a ColBERT checkpoint, then scores via MaxSim (for each query token, take the maximum cosine similarity across all document tokens and sum the results).

The checkpoint is lazily loaded on first use to avoid heavy model loading during import or instantiation.

Source code in src/openjarvis/tools/storage/colbert_backend.py
def __init__(
    self,
    *,
    checkpoint: str = "colbert-ir/colbertv2.0",
    device: str = "cpu",
) -> None:
    self._checkpoint_name = checkpoint
    self._device = device

    # id -> (content, source, metadata)
    self._documents: Dict[
        str, Tuple[str, str, Dict[str, Any]]
    ] = {}
    # id -> token-level embedding tensor
    self._embeddings: Dict[str, Any] = {}

    self._checkpoint_loaded: bool = False
    self._checkpoint_obj: Any = None

Functions

store

store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str

Persist content and return a unique document id.

Source code in src/openjarvis/tools/storage/colbert_backend.py
def store(
    self,
    content: str,
    *,
    source: str = "",
    metadata: Optional[Dict[str, Any]] = None,
) -> str:
    """Persist *content* and return a unique document id."""
    doc_id = uuid.uuid4().hex
    self._documents[doc_id] = (
        content,
        source,
        metadata or {},
    )
    self._embeddings[doc_id] = self._encode(content)

    bus = get_event_bus()
    bus.publish(EventType.MEMORY_STORE, {
        "backend": self.backend_id,
        "doc_id": doc_id,
        "source": source,
    })
    return doc_id

retrieve

retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]

Search for query and return the top-k results.

Source code in src/openjarvis/tools/storage/colbert_backend.py
def retrieve(
    self,
    query: str,
    *,
    top_k: int = 5,
    **kwargs: Any,
) -> List[RetrievalResult]:
    """Search for *query* and return the top-k results."""
    if not query.strip() or not self._documents:
        bus = get_event_bus()
        bus.publish(EventType.MEMORY_RETRIEVE, {
            "backend": self.backend_id,
            "query": query,
            "num_results": 0,
        })
        return []

    query_embs = self._encode(query)

    scored: List[Tuple[str, float]] = []
    for doc_id, doc_embs in self._embeddings.items():
        score = self._maxsim(query_embs, doc_embs)
        scored.append((doc_id, score))

    scored.sort(key=lambda pair: pair[1], reverse=True)

    results: List[RetrievalResult] = []
    for doc_id, score in scored[:top_k]:
        content, source, metadata = self._documents[doc_id]
        results.append(RetrievalResult(
            content=content,
            score=score,
            source=source,
            metadata=dict(metadata),
        ))

    bus = get_event_bus()
    bus.publish(EventType.MEMORY_RETRIEVE, {
        "backend": self.backend_id,
        "query": query,
        "num_results": len(results),
    })
    return results

delete

delete(doc_id: str) -> bool

Delete a document by id. Return True if it existed.

Source code in src/openjarvis/tools/storage/colbert_backend.py
def delete(self, doc_id: str) -> bool:
    """Delete a document by id. Return ``True`` if it existed."""
    if doc_id not in self._documents:
        return False
    del self._documents[doc_id]
    del self._embeddings[doc_id]
    return True

clear

clear() -> None

Remove all stored documents.

Source code in src/openjarvis/tools/storage/colbert_backend.py
def clear(self) -> None:
    """Remove all stored documents."""
    self._documents.clear()
    self._embeddings.clear()

count

count() -> int

Return the number of stored documents.

Source code in src/openjarvis/tools/storage/colbert_backend.py
def count(self) -> int:
    """Return the number of stored documents."""
    return len(self._documents)

BM25Memory

BM25Memory

BM25Memory()

Bases: MemoryBackend

In-memory BM25 (Okapi) retrieval backend.

Uses the rank_bm25 library to score documents against a query using the classic BM25 probabilistic ranking function. All data lives in memory — there is no persistence across restarts.

Source code in src/openjarvis/tools/storage/bm25.py
def __init__(self) -> None:
    # id -> (content, source, metadata)
    self._documents: Dict[
        str, Tuple[str, str, Dict[str, Any]]
    ] = {}
    self._corpus: List[List[str]] = []
    self._doc_ids: List[str] = []
    self._bm25: Optional[BM25Okapi] = None

Functions

store

store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str

Persist content and return a unique document id.

Source code in src/openjarvis/tools/storage/bm25.py
def store(
    self,
    content: str,
    *,
    source: str = "",
    metadata: Optional[Dict[str, Any]] = None,
) -> str:
    """Persist *content* and return a unique document id."""
    doc_id = uuid.uuid4().hex
    self._documents[doc_id] = (
        content,
        source,
        metadata or {},
    )
    self._rebuild_index()

    bus = get_event_bus()
    bus.publish(EventType.MEMORY_STORE, {
        "backend": self.backend_id,
        "doc_id": doc_id,
        "source": source,
    })
    return doc_id

retrieve

retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]

Search for query and return the top-k results.

Source code in src/openjarvis/tools/storage/bm25.py
def retrieve(
    self,
    query: str,
    *,
    top_k: int = 5,
    **kwargs: Any,
) -> List[RetrievalResult]:
    """Search for *query* and return the top-k results."""
    if not query.strip() or self._bm25 is None:
        bus = get_event_bus()
        bus.publish(EventType.MEMORY_RETRIEVE, {
            "backend": self.backend_id,
            "query": query,
            "num_results": 0,
        })
        return []

    tokenized_query = _tokenize(query)
    query_set = set(tokenized_query)
    scores = self._bm25.get_scores(tokenized_query)

    # Pair (index, score), sort descending by score
    scored = sorted(
        enumerate(scores),
        key=lambda pair: pair[1],
        reverse=True,
    )

    results: List[RetrievalResult] = []
    for idx, score in scored[:top_k]:
        # Skip documents that share no tokens with the query.
        # We check token overlap rather than score > 0 because
        # BM25Okapi can assign IDF = 0 for terms appearing in
        # exactly half the corpus, producing a zero score even
        # when the document genuinely matches.
        if not query_set.intersection(self._corpus[idx]):
            continue
        doc_id = self._doc_ids[idx]
        content, source, metadata = self._documents[doc_id]
        results.append(RetrievalResult(
            content=content,
            score=float(score),
            source=source,
            metadata=dict(metadata),
        ))

    bus = get_event_bus()
    bus.publish(EventType.MEMORY_RETRIEVE, {
        "backend": self.backend_id,
        "query": query,
        "num_results": len(results),
    })
    return results

delete

delete(doc_id: str) -> bool

Delete a document by id. Return True if it existed.

Source code in src/openjarvis/tools/storage/bm25.py
def delete(self, doc_id: str) -> bool:
    """Delete a document by id. Return ``True`` if it existed."""
    if doc_id not in self._documents:
        return False
    del self._documents[doc_id]
    self._rebuild_index()
    return True

clear

clear() -> None

Remove all stored documents.

Source code in src/openjarvis/tools/storage/bm25.py
def clear(self) -> None:
    """Remove all stored documents."""
    self._documents.clear()
    self._corpus.clear()
    self._doc_ids.clear()
    self._bm25 = None

count

count() -> int

Return the number of stored documents.

Source code in src/openjarvis/tools/storage/bm25.py
def count(self) -> int:
    """Return the number of stored documents."""
    return len(self._documents)

HybridMemory

HybridMemory

HybridMemory(*, sparse: MemoryBackend, dense: MemoryBackend, k: int = 60, sparse_weight: float = 1.0, dense_weight: float = 1.0)

Bases: MemoryBackend

Fuses a sparse and a dense retriever via RRF.

Stores documents in both sub-backends and merges retrieval results using Reciprocal Rank Fusion.

Source code in src/openjarvis/tools/storage/hybrid.py
def __init__(
    self,
    *,
    sparse: MemoryBackend,
    dense: MemoryBackend,
    k: int = 60,
    sparse_weight: float = 1.0,
    dense_weight: float = 1.0,
) -> None:
    self._sparse = sparse
    self._dense = dense
    self._k = k
    self._weights = [sparse_weight, dense_weight]
    # Track doc IDs across both backends
    self._id_map: Dict[str, str] = {}

Functions

store

store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str

Store in both sub-backends with the same doc id.

Source code in src/openjarvis/tools/storage/hybrid.py
def store(
    self,
    content: str,
    *,
    source: str = "",
    metadata: Optional[Dict[str, Any]] = None,
) -> str:
    """Store in both sub-backends with the same doc id."""
    # Store in sparse first to get the id
    sparse_id = self._sparse.store(
        content, source=source, metadata=metadata,
    )
    # Store in dense — it generates its own id
    dense_id = self._dense.store(
        content, source=source, metadata=metadata,
    )
    # Map sparse_id -> dense_id so we can delete from both
    self._id_map[sparse_id] = dense_id

    bus = get_event_bus()
    bus.publish(EventType.MEMORY_STORE, {
        "backend": self.backend_id,
        "doc_id": sparse_id,
        "source": source,
    })
    return sparse_id

retrieve

retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]

Retrieve from both backends and fuse with RRF.

Source code in src/openjarvis/tools/storage/hybrid.py
def retrieve(
    self,
    query: str,
    *,
    top_k: int = 5,
    **kwargs: Any,
) -> List[RetrievalResult]:
    """Retrieve from both backends and fuse with RRF."""
    # Over-fetch for better fusion
    fetch_k = top_k * 3

    sparse_results = self._sparse.retrieve(
        query, top_k=fetch_k,
    )
    dense_results = self._dense.retrieve(
        query, top_k=fetch_k,
    )

    fused = reciprocal_rank_fusion(
        [sparse_results, dense_results],
        k=self._k,
        weights=self._weights,
    )

    bus = get_event_bus()
    bus.publish(EventType.MEMORY_RETRIEVE, {
        "backend": self.backend_id,
        "query": query,
        "num_results": min(len(fused), top_k),
    })

    return fused[:top_k]

delete

delete(doc_id: str) -> bool

Delete from both sub-backends.

Source code in src/openjarvis/tools/storage/hybrid.py
def delete(self, doc_id: str) -> bool:
    """Delete from both sub-backends."""
    sparse_ok = self._sparse.delete(doc_id)
    dense_id = self._id_map.pop(doc_id, None)
    dense_ok = False
    if dense_id is not None:
        dense_ok = self._dense.delete(dense_id)
    return sparse_ok or dense_ok

clear

clear() -> None

Clear both sub-backends.

Source code in src/openjarvis/tools/storage/hybrid.py
def clear(self) -> None:
    """Clear both sub-backends."""
    self._sparse.clear()
    self._dense.clear()
    self._id_map.clear()

reciprocal_rank_fusion

reciprocal_rank_fusion

reciprocal_rank_fusion(ranked_lists: List[List[RetrievalResult]], *, k: int = 60, weights: Optional[List[float]] = None) -> List[RetrievalResult]

Fuse multiple ranked result lists using RRF.

RRF_score(d) = sum(weight_i / (k + rank_i(d)))

PARAMETER DESCRIPTION
ranked_lists

Each inner list is a ranked sequence of results (best first).

TYPE: List[List[RetrievalResult]]

k

RRF constant (default 60).

TYPE: int DEFAULT: 60

weights

Per-list weight (defaults to equal weighting).

TYPE: Optional[List[float]] DEFAULT: None

RETURNS DESCRIPTION
Merged list sorted by fused score, descending.
Source code in src/openjarvis/tools/storage/hybrid.py
def reciprocal_rank_fusion(
    ranked_lists: List[List[RetrievalResult]],
    *,
    k: int = 60,
    weights: Optional[List[float]] = None,
) -> List[RetrievalResult]:
    """Fuse multiple ranked result lists using RRF.

    ``RRF_score(d) = sum(weight_i / (k + rank_i(d)))``

    Parameters
    ----------
    ranked_lists:
        Each inner list is a ranked sequence of results (best first).
    k:
        RRF constant (default 60).
    weights:
        Per-list weight (defaults to equal weighting).

    Returns
    -------
    Merged list sorted by fused score, descending.
    """
    if weights is None:
        weights = [1.0] * len(ranked_lists)

    # Map content -> (fused_score, best_result)
    scores: Dict[str, float] = {}
    best_result: Dict[str, RetrievalResult] = {}

    for weight, results in zip(weights, ranked_lists):
        for rank, result in enumerate(results):
            key = result.content
            rrf = weight / (k + rank + 1)
            scores[key] = scores.get(key, 0.0) + rrf

            # Keep the result with the highest original score
            if key not in best_result:
                best_result[key] = result

    # Build fused results
    fused = []
    for content_key, fused_score in sorted(
        scores.items(), key=lambda x: x[1], reverse=True
    ):
        original = best_result[content_key]
        fused.append(RetrievalResult(
            content=original.content,
            score=fused_score,
            source=original.source,
            metadata=original.metadata,
        ))

    return fused

Document Chunking

Splits text into fixed-size chunks with configurable overlap, respecting paragraph boundaries.

ChunkConfig

ChunkConfig dataclass

ChunkConfig(chunk_size: int = 512, chunk_overlap: int = 64, min_chunk_size: int = 50)

Parameters controlling the chunking strategy.

Chunk

Chunk dataclass

Chunk(content: str, source: str = '', offset: int = 0, index: int = 0, metadata: Dict[str, Any] = dict())

A single chunk produced by the chunking pipeline.

chunk_text

chunk_text

chunk_text(text: str, *, source: str = '', config: Optional[ChunkConfig] = None) -> List[Chunk]

Split text into chunks respecting paragraph boundaries.

PARAMETER DESCRIPTION
text

The full document text.

TYPE: str

source

Originating filename or identifier.

TYPE: str DEFAULT: ''

config

Chunking parameters (uses defaults if None).

TYPE: Optional[ChunkConfig] DEFAULT: None

RETURNS DESCRIPTION
List of :class:`Chunk` objects, in order.
Source code in src/openjarvis/tools/storage/chunking.py
def chunk_text(
    text: str,
    *,
    source: str = "",
    config: Optional[ChunkConfig] = None,
) -> List[Chunk]:
    """Split *text* into chunks respecting paragraph boundaries.

    Parameters
    ----------
    text:
        The full document text.
    source:
        Originating filename or identifier.
    config:
        Chunking parameters (uses defaults if ``None``).

    Returns
    -------
    List of :class:`Chunk` objects, in order.
    """
    if not text or not text.strip():
        return []

    cfg = config or ChunkConfig()

    # Split into paragraphs (double newline)
    paragraphs = [p for p in text.split("\n\n") if p.strip()]

    chunks: List[Chunk] = []
    current_tokens: List[str] = []
    current_offset = 0
    chunk_start_offset = 0

    for para in paragraphs:
        para_tokens = para.split()

        # If adding this paragraph would exceed chunk_size and we already
        # have content, flush the current chunk first.
        if (
            current_tokens
            and len(current_tokens) + len(para_tokens) > cfg.chunk_size
        ):
            chunk_content = " ".join(current_tokens)
            if _count_tokens(chunk_content) >= cfg.min_chunk_size:
                chunks.append(Chunk(
                    content=chunk_content,
                    source=source,
                    offset=chunk_start_offset,
                    index=len(chunks),
                ))

            # Keep the overlap tail for the next chunk
            if cfg.chunk_overlap > 0 and len(current_tokens) > cfg.chunk_overlap:
                overlap = current_tokens[-cfg.chunk_overlap:]
                current_tokens = list(overlap)
            else:
                current_tokens = []
            chunk_start_offset = current_offset

        # If a single paragraph exceeds chunk_size, split it directly
        if len(para_tokens) > cfg.chunk_size:
            # Flush anything accumulated first
            if current_tokens:
                chunk_content = " ".join(current_tokens)
                if _count_tokens(chunk_content) >= cfg.min_chunk_size:
                    chunks.append(Chunk(
                        content=chunk_content,
                        source=source,
                        offset=chunk_start_offset,
                        index=len(chunks),
                    ))
                current_tokens = []

            # Split the oversized paragraph into fixed windows
            idx = 0
            while idx < len(para_tokens):
                window = para_tokens[idx:idx + cfg.chunk_size]
                chunk_content = " ".join(window)
                if _count_tokens(chunk_content) >= cfg.min_chunk_size:
                    chunks.append(Chunk(
                        content=chunk_content,
                        source=source,
                        offset=current_offset + idx,
                        index=len(chunks),
                    ))
                step = max(1, cfg.chunk_size - cfg.chunk_overlap)
                idx += step

            current_offset += len(para_tokens)
            chunk_start_offset = current_offset
            continue

        current_tokens.extend(para_tokens)
        current_offset += len(para_tokens)

    # Flush remaining tokens
    if current_tokens:
        chunk_content = " ".join(current_tokens)
        if _count_tokens(chunk_content) >= cfg.min_chunk_size:
            chunks.append(Chunk(
                content=chunk_content,
                source=source,
                offset=chunk_start_offset,
                index=len(chunks),
            ))

    return chunks

Document Ingestion

File reading, type detection, and directory walking for the ingestion pipeline.

DocumentMeta

DocumentMeta dataclass

DocumentMeta(path: str, file_type: str, size_bytes: int, line_count: int)

Metadata about an ingested document.

detect_file_type

detect_file_type

detect_file_type(path: Path) -> str

Map a file extension to one of: text, markdown, pdf, code.

Source code in src/openjarvis/tools/storage/ingest.py
def detect_file_type(path: Path) -> str:
    """Map a file extension to one of: text, markdown, pdf, code."""
    suffix = path.suffix.lower()
    if suffix in {".md", ".markdown", ".mdx"}:
        return "markdown"
    if suffix == ".pdf":
        return "pdf"
    if suffix in _CODE_EXTS:
        return "code"
    return "text"

read_document

read_document

read_document(path: Path) -> Tuple[str, DocumentMeta]

Read a file and return (text, metadata).

RAISES DESCRIPTION
ImportError

If the file is a PDF and pdfplumber is not installed.

FileNotFoundError

If path does not exist.

Source code in src/openjarvis/tools/storage/ingest.py
def read_document(path: Path) -> Tuple[str, DocumentMeta]:
    """Read a file and return ``(text, metadata)``.

    Raises
    ------
    ImportError
        If the file is a PDF and ``pdfplumber`` is not installed.
    FileNotFoundError
        If *path* does not exist.
    """
    if not path.exists():
        raise FileNotFoundError(f"File not found: {path}")

    ftype = detect_file_type(path)

    if ftype == "pdf":
        try:
            import pdfplumber  # noqa: F401
        except ImportError:
            raise ImportError(
                "PDF support requires pdfplumber. "
                "Install it with: pip install openjarvis[memory-pdf]"
            ) from None

        text = _read_pdf(path)
    else:
        text = _read_text(path)

    line_count = text.count("\n") + 1 if text else 0
    meta = DocumentMeta(
        path=str(path),
        file_type=ftype,
        size_bytes=path.stat().st_size,
        line_count=line_count,
    )
    return text, meta

ingest_path

ingest_path

ingest_path(path: Path, *, config: Optional[ChunkConfig] = None) -> List[Chunk]

Ingest a file or directory into chunks.

If path is a file, reads and chunks it. If path is a directory, recursively walks it (skipping hidden and common non-content directories) and chunks each file.

Source code in src/openjarvis/tools/storage/ingest.py
def ingest_path(
    path: Path,
    *,
    config: Optional[ChunkConfig] = None,
) -> List[Chunk]:
    """Ingest a file or directory into chunks.

    If *path* is a file, reads and chunks it.
    If *path* is a directory, recursively walks it (skipping hidden and
    common non-content directories) and chunks each file.
    """
    if not path.exists():
        raise FileNotFoundError(f"Path not found: {path}")

    if path.is_file():
        text, _meta = read_document(path)
        return chunk_text(text, source=str(path), config=config)

    # Directory: recursive walk
    all_chunks: List[Chunk] = []
    for child in sorted(path.rglob("*")):
        # Skip directories themselves — rglob yields files too
        if child.is_dir():
            continue

        # Check if any parent directory should be skipped
        rel = child.relative_to(path)
        skip = False
        for part in rel.parts[:-1]:
            if _should_skip_dir(part):
                skip = True
                break
        if skip:
            continue

        # Skip hidden files
        if child.name.startswith("."):
            continue

        # Skip sensitive files (secrets, credentials, keys)
        from openjarvis.security.file_policy import is_sensitive_file

        if is_sensitive_file(child):
            continue

        # Skip binary-looking files
        if child.suffix.lower() in {
            ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico",
            ".mp3", ".mp4", ".wav", ".avi", ".mov",
            ".zip", ".tar", ".gz", ".bz2", ".7z",
            ".exe", ".dll", ".so", ".dylib", ".o",
            ".pyc", ".pyo", ".class", ".wasm",
        }:
            continue

        try:
            text, _meta = read_document(child)
            chunks = chunk_text(text, source=str(child), config=config)
            all_chunks.extend(chunks)
        except (ImportError, OSError):
            # Skip files we can't read (e.g. PDF without pdfplumber)
            continue

    return all_chunks

Context Injection

Retrieves relevant memory and injects it into prompts as system messages with source attribution.

ContextConfig

ContextConfig dataclass

ContextConfig(enabled: bool = True, top_k: int = 5, min_score: float = 0.1, max_context_tokens: int = 2048)

Controls how retrieved context is injected into prompts.

inject_context

inject_context

inject_context(query: str, messages: List[Message], backend: MemoryBackend, *, config: Optional[ContextConfig] = None) -> List[Message]

Retrieve relevant context and prepend it to messages.

Returns a new list — the original list is not mutated. If no results pass the score threshold, returns the original messages unchanged.

PARAMETER DESCRIPTION
query

The user query to search for.

TYPE: str

messages

The existing message list.

TYPE: List[Message]

backend

The memory backend to search.

TYPE: MemoryBackend

config

Context injection settings (uses defaults if None).

TYPE: Optional[ContextConfig] DEFAULT: None

Source code in src/openjarvis/tools/storage/context.py
def inject_context(
    query: str,
    messages: List[Message],
    backend: MemoryBackend,
    *,
    config: Optional[ContextConfig] = None,
) -> List[Message]:
    """Retrieve relevant context and prepend it to *messages*.

    Returns a **new** list — the original list is not mutated.
    If no results pass the score threshold, returns the original
    messages unchanged.

    Parameters
    ----------
    query:
        The user query to search for.
    messages:
        The existing message list.
    backend:
        The memory backend to search.
    config:
        Context injection settings (uses defaults if ``None``).
    """
    cfg = config or ContextConfig()
    if not cfg.enabled:
        return messages

    results = backend.retrieve(query, top_k=cfg.top_k)

    # Filter by minimum score
    results = [r for r in results if r.score >= cfg.min_score]

    if not results:
        return messages

    # Truncate to max_context_tokens
    truncated: List[RetrievalResult] = []
    total_tokens = 0
    for r in results:
        tokens = _count_tokens(r.content)
        if total_tokens + tokens > cfg.max_context_tokens:
            break
        truncated.append(r)
        total_tokens += tokens

    if not truncated:
        return messages

    # Publish event
    bus = get_event_bus()
    bus.publish(EventType.MEMORY_RETRIEVE, {
        "context_injection": True,
        "query": query,
        "num_results": len(truncated),
        "total_tokens": total_tokens,
    })

    # Build context message and prepend
    ctx_msg = build_context_message(truncated)
    return [ctx_msg] + list(messages)

format_context

format_context

format_context(results: List[RetrievalResult]) -> str

Format retrieval results into a context block.

Each result is prefixed with its source attribution.

Source code in src/openjarvis/tools/storage/context.py
def format_context(results: List[RetrievalResult]) -> str:
    """Format retrieval results into a context block.

    Each result is prefixed with its source attribution.
    """
    if not results:
        return ""

    lines = []
    for r in results:
        source_tag = f"[Source: {r.source}]" if r.source else ""
        if source_tag:
            lines.append(f"{source_tag} {r.content}")
        else:
            lines.append(r.content)

    return "\n\n".join(lines)

build_context_message

build_context_message

build_context_message(results: List[RetrievalResult]) -> Message

Create a system message with formatted context.

Source code in src/openjarvis/tools/storage/context.py
def build_context_message(
    results: List[RetrievalResult],
) -> Message:
    """Create a system message with formatted context."""
    context_text = format_context(results)
    content = (
        "The following context was retrieved from the knowledge"
        " base. Use it to inform your response, citing sources"
        " where applicable:\n\n"
        + context_text
    )
    return Message(role=Role.SYSTEM, content=content)

Embeddings

Abstraction layer for text embedding models used by dense retrieval backends.

Embedder

Embedder

Bases: ABC

Base class for text embedding models.

Subclasses must implement :meth:embed and :meth:dim.

Functions

embed abstractmethod

embed(texts: list[str]) -> Any

Embed texts and return a numpy array of shape (n, dim).

Source code in src/openjarvis/tools/storage/embeddings.py
@abstractmethod
def embed(self, texts: list[str]) -> Any:
    """Embed *texts* and return a numpy array of shape (n, dim)."""

dim abstractmethod

dim() -> int

Return the dimensionality of the embedding vectors.

Source code in src/openjarvis/tools/storage/embeddings.py
@abstractmethod
def dim(self) -> int:
    """Return the dimensionality of the embedding vectors."""

SentenceTransformerEmbedder

SentenceTransformerEmbedder

SentenceTransformerEmbedder(model_name: str = 'all-MiniLM-L6-v2')

Bases: Embedder

Embedder backed by sentence-transformers.

PARAMETER DESCRIPTION
model_name

HuggingFace model identifier. Defaults to the lightweight all-MiniLM-L6-v2 (384-dim, ~22 MB).

TYPE: str DEFAULT: 'all-MiniLM-L6-v2'

Source code in src/openjarvis/tools/storage/embeddings.py
def __init__(
    self, model_name: str = "all-MiniLM-L6-v2"
) -> None:
    try:
        from sentence_transformers import (
            SentenceTransformer,
        )
    except ImportError as exc:
        raise ImportError(
            "sentence-transformers is required for "
            "SentenceTransformerEmbedder. Install it with: "
            "pip install sentence-transformers"
        ) from exc

    self._model = SentenceTransformer(model_name)
    self._dim: int = (
        self._model.get_sentence_embedding_dimension()
    )

Functions

embed

embed(texts: list[str]) -> Any

Return a numpy array of shape (len(texts), dim).

Source code in src/openjarvis/tools/storage/embeddings.py
def embed(self, texts: list[str]) -> Any:
    """Return a numpy array of shape ``(len(texts), dim)``."""
    return self._model.encode(
        texts, convert_to_numpy=True
    )

dim

dim() -> int

Return the embedding dimensionality.

Source code in src/openjarvis/tools/storage/embeddings.py
def dim(self) -> int:
    """Return the embedding dimensionality."""
    return self._dim