Memory Module
The memory module implements persistent searchable storage for document
retrieval. All backends implement the MemoryBackend ABC with store(),
retrieve(), delete(), and clear() methods. The module also includes
a document ingestion pipeline (chunking, file reading) and context injection
for augmenting prompts with retrieved knowledge.
Canonical import location
Memory backends have moved to openjarvis.tools.storage.*. The old
openjarvis.memory.* imports still work via backward-compatibility shims.
Abstract Base Class
MemoryBackend
Bases: ABC
Base class for all memory / retrieval backends.
Subclasses must be registered via
@MemoryRegistry.register("name") to become discoverable.
store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str
Persist content and return a unique document id.
Source code in src/openjarvis/tools/storage/_stubs.py
| @abstractmethod
def store(
self,
content: str,
*,
source: str = "",
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Persist *content* and return a unique document id."""
|
retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]
Search for query and return the top-k results.
Source code in src/openjarvis/tools/storage/_stubs.py
| @abstractmethod
def retrieve(
self,
query: str,
*,
top_k: int = 5,
**kwargs: Any,
) -> List[RetrievalResult]:
"""Search for *query* and return the top-k results."""
|
delete(doc_id: str) -> bool
Delete a document by id. Return True if it existed.
Source code in src/openjarvis/tools/storage/_stubs.py
| @abstractmethod
def delete(self, doc_id: str) -> bool:
"""Delete a document by id. Return ``True`` if it existed."""
|
Remove all stored documents.
Source code in src/openjarvis/tools/storage/_stubs.py
| @abstractmethod
def clear(self) -> None:
"""Remove all stored documents."""
|
RetrievalResult
RetrievalResult(content: str, score: float = 0.0, source: str = '', metadata: Dict[str, Any] = dict())
A single result returned by a memory backend query.
Backend Implementations
SQLiteMemory
SQLiteMemory(db_path: str | Path = '')
Bases: MemoryBackend
Full-text search memory backend using SQLite FTS5.
Uses the built-in sqlite3 module — no extra dependencies.
Source code in src/openjarvis/tools/storage/sqlite.py
| def __init__(self, db_path: str | Path = "") -> None:
if not db_path:
from openjarvis.core.config import DEFAULT_CONFIG_DIR
db_path = str(DEFAULT_CONFIG_DIR / "memory.db")
self._db_path = str(db_path)
self._conn = sqlite3.connect(self._db_path)
self._conn.row_factory = sqlite3.Row
if not _check_fts5(self._conn):
raise RuntimeError(
"SQLite FTS5 extension is not available. "
"Upgrade your Python or install a SQLite build "
"with FTS5 enabled."
)
self._create_tables()
|
store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str
Persist content and return a unique document id.
Source code in src/openjarvis/tools/storage/sqlite.py
| def store(
self,
content: str,
*,
source: str = "",
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Persist *content* and return a unique document id."""
import time
doc_id = uuid.uuid4().hex
meta_json = json.dumps(metadata or {})
self._conn.execute(
"INSERT INTO documents (id, content, source, metadata, created_at)"
" VALUES (?, ?, ?, ?, ?)",
(doc_id, content, source, meta_json, time.time()),
)
self._conn.commit()
bus = get_event_bus()
bus.publish(EventType.MEMORY_STORE, {
"backend": self.backend_id,
"doc_id": doc_id,
"source": source,
})
return doc_id
|
retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]
Search via FTS5 MATCH with BM25 ranking.
Source code in src/openjarvis/tools/storage/sqlite.py
| def retrieve(
self,
query: str,
*,
top_k: int = 5,
**kwargs: Any,
) -> List[RetrievalResult]:
"""Search via FTS5 MATCH with BM25 ranking."""
if not query.strip():
return []
# Escape FTS5 special characters
safe_query = self._escape_fts_query(query)
if not safe_query:
return []
try:
rows = self._conn.execute(
"SELECT d.id, d.content, d.source, d.metadata,"
" rank AS score"
" FROM documents_fts f"
" JOIN documents d ON d.rowid = f.rowid"
" WHERE documents_fts MATCH ?"
" ORDER BY rank"
" LIMIT ?",
(safe_query, top_k),
).fetchall()
except sqlite3.OperationalError:
return []
results = []
for row in rows:
# FTS5 rank is negative (more negative = better match)
# Convert to positive score for consistency
score = -float(row["score"]) if row["score"] else 0.0
results.append(RetrievalResult(
content=row["content"],
score=score,
source=row["source"],
metadata=json.loads(row["metadata"]),
))
bus = get_event_bus()
bus.publish(EventType.MEMORY_RETRIEVE, {
"backend": self.backend_id,
"query": query,
"num_results": len(results),
})
return results
|
delete(doc_id: str) -> bool
Delete a document by id. Return True if it existed.
Source code in src/openjarvis/tools/storage/sqlite.py
| def delete(self, doc_id: str) -> bool:
"""Delete a document by id. Return True if it existed."""
cursor = self._conn.execute(
"DELETE FROM documents WHERE id = ?", (doc_id,)
)
self._conn.commit()
return cursor.rowcount > 0
|
Remove all stored documents.
Source code in src/openjarvis/tools/storage/sqlite.py
| def clear(self) -> None:
"""Remove all stored documents."""
self._conn.execute("DELETE FROM documents")
self._conn.commit()
|
Return the number of stored documents.
Source code in src/openjarvis/tools/storage/sqlite.py
| def count(self) -> int:
"""Return the number of stored documents."""
row = self._conn.execute(
"SELECT COUNT(*) FROM documents"
).fetchone()
return row[0] if row else 0
|
Close the database connection.
Source code in src/openjarvis/tools/storage/sqlite.py
| def close(self) -> None:
"""Close the database connection."""
self._conn.close()
|
FAISSMemory
FAISSMemory(*, embedder: Embedder | None = None)
Bases: MemoryBackend
Dense retrieval backend powered by FAISS.
Stores document embeddings in a faiss.IndexFlatIP index
(inner-product, which equals cosine similarity when vectors
are L2-normalised before insertion/search).
Source code in src/openjarvis/tools/storage/faiss_backend.py
| def __init__(
self,
*,
embedder: Embedder | None = None,
) -> None:
if embedder is None:
embedder = SentenceTransformerEmbedder()
self._embedder = embedder
self._index = faiss.IndexFlatIP(self._embedder.dim())
self._documents: Dict[
str, Tuple[str, str, Dict[str, Any]]
] = {}
self._id_map: List[str] = []
self._deleted: Set[str] = set()
|
store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str
Embed and store content, returning a unique doc id.
Source code in src/openjarvis/tools/storage/faiss_backend.py
| def store(
self,
content: str,
*,
source: str = "",
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Embed and store *content*, returning a unique doc id."""
doc_id = uuid.uuid4().hex
meta = metadata if metadata is not None else {}
vec = self._embedder.embed([content])
faiss.normalize_L2(vec)
self._index.add(vec)
self._documents[doc_id] = (content, source, meta)
self._id_map.append(doc_id)
bus = get_event_bus()
bus.publish(
EventType.MEMORY_STORE,
{
"backend": self.backend_id,
"doc_id": doc_id,
"source": source,
},
)
return doc_id
|
retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]
Embed query and return the top-k most similar docs.
Source code in src/openjarvis/tools/storage/faiss_backend.py
| def retrieve(
self,
query: str,
*,
top_k: int = 5,
**kwargs: Any,
) -> List[RetrievalResult]:
"""Embed *query* and return the top-k most similar docs."""
if not query.strip() or self._index.ntotal == 0:
bus = get_event_bus()
bus.publish(
EventType.MEMORY_RETRIEVE,
{
"backend": self.backend_id,
"query": query,
"num_results": 0,
},
)
return []
vec = self._embedder.embed([query])
faiss.normalize_L2(vec)
# Request more results to compensate for deleted docs
k = min(
top_k + len(self._deleted),
self._index.ntotal,
)
scores, indices = self._index.search(vec, k)
results: List[RetrievalResult] = []
for score, idx in zip(
scores[0].tolist(), indices[0].tolist()
):
if idx < 0:
continue
doc_id = self._id_map[idx]
if doc_id in self._deleted:
continue
content, source, meta = self._documents[doc_id]
results.append(
RetrievalResult(
content=content,
score=float(score),
source=source,
metadata=dict(meta),
)
)
if len(results) >= top_k:
break
bus = get_event_bus()
bus.publish(
EventType.MEMORY_RETRIEVE,
{
"backend": self.backend_id,
"query": query,
"num_results": len(results),
},
)
return results
|
delete(doc_id: str) -> bool
Soft-delete doc_id. Return True if it existed.
Source code in src/openjarvis/tools/storage/faiss_backend.py
| def delete(self, doc_id: str) -> bool:
"""Soft-delete *doc_id*. Return True if it existed."""
if (
doc_id not in self._documents
or doc_id in self._deleted
):
return False
self._deleted.add(doc_id)
return True
|
Reset the index and all internal storage.
Source code in src/openjarvis/tools/storage/faiss_backend.py
| def clear(self) -> None:
"""Reset the index and all internal storage."""
self._index.reset()
self._documents.clear()
self._id_map.clear()
self._deleted.clear()
|
ColBERTMemory
ColBERTMemory(*, checkpoint: str = 'colbert-ir/colbertv2.0', device: str = 'cpu')
Bases: MemoryBackend
In-memory ColBERTv2 late interaction retrieval backend.
Encodes queries and documents into token-level embeddings using a
ColBERT checkpoint, then scores via MaxSim (for each query token,
take the maximum cosine similarity across all document tokens and
sum the results).
The checkpoint is lazily loaded on first use to avoid heavy model
loading during import or instantiation.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def __init__(
self,
*,
checkpoint: str = "colbert-ir/colbertv2.0",
device: str = "cpu",
) -> None:
self._checkpoint_name = checkpoint
self._device = device
# id -> (content, source, metadata)
self._documents: Dict[
str, Tuple[str, str, Dict[str, Any]]
] = {}
# id -> token-level embedding tensor
self._embeddings: Dict[str, Any] = {}
self._checkpoint_loaded: bool = False
self._checkpoint_obj: Any = None
|
store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str
Persist content and return a unique document id.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def store(
self,
content: str,
*,
source: str = "",
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Persist *content* and return a unique document id."""
doc_id = uuid.uuid4().hex
self._documents[doc_id] = (
content,
source,
metadata or {},
)
self._embeddings[doc_id] = self._encode(content)
bus = get_event_bus()
bus.publish(EventType.MEMORY_STORE, {
"backend": self.backend_id,
"doc_id": doc_id,
"source": source,
})
return doc_id
|
retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]
Search for query and return the top-k results.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def retrieve(
self,
query: str,
*,
top_k: int = 5,
**kwargs: Any,
) -> List[RetrievalResult]:
"""Search for *query* and return the top-k results."""
if not query.strip() or not self._documents:
bus = get_event_bus()
bus.publish(EventType.MEMORY_RETRIEVE, {
"backend": self.backend_id,
"query": query,
"num_results": 0,
})
return []
query_embs = self._encode(query)
scored: List[Tuple[str, float]] = []
for doc_id, doc_embs in self._embeddings.items():
score = self._maxsim(query_embs, doc_embs)
scored.append((doc_id, score))
scored.sort(key=lambda pair: pair[1], reverse=True)
results: List[RetrievalResult] = []
for doc_id, score in scored[:top_k]:
content, source, metadata = self._documents[doc_id]
results.append(RetrievalResult(
content=content,
score=score,
source=source,
metadata=dict(metadata),
))
bus = get_event_bus()
bus.publish(EventType.MEMORY_RETRIEVE, {
"backend": self.backend_id,
"query": query,
"num_results": len(results),
})
return results
|
delete(doc_id: str) -> bool
Delete a document by id. Return True if it existed.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def delete(self, doc_id: str) -> bool:
"""Delete a document by id. Return ``True`` if it existed."""
if doc_id not in self._documents:
return False
del self._documents[doc_id]
del self._embeddings[doc_id]
return True
|
Remove all stored documents.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def clear(self) -> None:
"""Remove all stored documents."""
self._documents.clear()
self._embeddings.clear()
|
Return the number of stored documents.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def count(self) -> int:
"""Return the number of stored documents."""
return len(self._documents)
|
BM25Memory
Bases: MemoryBackend
In-memory BM25 (Okapi) retrieval backend.
Uses the rank_bm25 library to score documents against a query
using the classic BM25 probabilistic ranking function. All data
lives in memory — there is no persistence across restarts.
Source code in src/openjarvis/tools/storage/bm25.py
| def __init__(self) -> None:
# id -> (content, source, metadata)
self._documents: Dict[
str, Tuple[str, str, Dict[str, Any]]
] = {}
self._corpus: List[List[str]] = []
self._doc_ids: List[str] = []
self._bm25: Optional[BM25Okapi] = None
|
store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str
Persist content and return a unique document id.
Source code in src/openjarvis/tools/storage/bm25.py
| def store(
self,
content: str,
*,
source: str = "",
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Persist *content* and return a unique document id."""
doc_id = uuid.uuid4().hex
self._documents[doc_id] = (
content,
source,
metadata or {},
)
self._rebuild_index()
bus = get_event_bus()
bus.publish(EventType.MEMORY_STORE, {
"backend": self.backend_id,
"doc_id": doc_id,
"source": source,
})
return doc_id
|
retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]
Search for query and return the top-k results.
Source code in src/openjarvis/tools/storage/bm25.py
| def retrieve(
self,
query: str,
*,
top_k: int = 5,
**kwargs: Any,
) -> List[RetrievalResult]:
"""Search for *query* and return the top-k results."""
if not query.strip() or self._bm25 is None:
bus = get_event_bus()
bus.publish(EventType.MEMORY_RETRIEVE, {
"backend": self.backend_id,
"query": query,
"num_results": 0,
})
return []
tokenized_query = _tokenize(query)
query_set = set(tokenized_query)
scores = self._bm25.get_scores(tokenized_query)
# Pair (index, score), sort descending by score
scored = sorted(
enumerate(scores),
key=lambda pair: pair[1],
reverse=True,
)
results: List[RetrievalResult] = []
for idx, score in scored[:top_k]:
# Skip documents that share no tokens with the query.
# We check token overlap rather than score > 0 because
# BM25Okapi can assign IDF = 0 for terms appearing in
# exactly half the corpus, producing a zero score even
# when the document genuinely matches.
if not query_set.intersection(self._corpus[idx]):
continue
doc_id = self._doc_ids[idx]
content, source, metadata = self._documents[doc_id]
results.append(RetrievalResult(
content=content,
score=float(score),
source=source,
metadata=dict(metadata),
))
bus = get_event_bus()
bus.publish(EventType.MEMORY_RETRIEVE, {
"backend": self.backend_id,
"query": query,
"num_results": len(results),
})
return results
|
delete(doc_id: str) -> bool
Delete a document by id. Return True if it existed.
Source code in src/openjarvis/tools/storage/bm25.py
| def delete(self, doc_id: str) -> bool:
"""Delete a document by id. Return ``True`` if it existed."""
if doc_id not in self._documents:
return False
del self._documents[doc_id]
self._rebuild_index()
return True
|
Remove all stored documents.
Source code in src/openjarvis/tools/storage/bm25.py
| def clear(self) -> None:
"""Remove all stored documents."""
self._documents.clear()
self._corpus.clear()
self._doc_ids.clear()
self._bm25 = None
|
Return the number of stored documents.
Source code in src/openjarvis/tools/storage/bm25.py
| def count(self) -> int:
"""Return the number of stored documents."""
return len(self._documents)
|
HybridMemory
Bases: MemoryBackend
Fuses a sparse and a dense retriever via RRF.
Stores documents in both sub-backends and merges retrieval
results using Reciprocal Rank Fusion.
Source code in src/openjarvis/tools/storage/hybrid.py
| def __init__(
self,
*,
sparse: MemoryBackend,
dense: MemoryBackend,
k: int = 60,
sparse_weight: float = 1.0,
dense_weight: float = 1.0,
) -> None:
self._sparse = sparse
self._dense = dense
self._k = k
self._weights = [sparse_weight, dense_weight]
# Track doc IDs across both backends
self._id_map: Dict[str, str] = {}
|
store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str
Store in both sub-backends with the same doc id.
Source code in src/openjarvis/tools/storage/hybrid.py
| def store(
self,
content: str,
*,
source: str = "",
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Store in both sub-backends with the same doc id."""
# Store in sparse first to get the id
sparse_id = self._sparse.store(
content, source=source, metadata=metadata,
)
# Store in dense — it generates its own id
dense_id = self._dense.store(
content, source=source, metadata=metadata,
)
# Map sparse_id -> dense_id so we can delete from both
self._id_map[sparse_id] = dense_id
bus = get_event_bus()
bus.publish(EventType.MEMORY_STORE, {
"backend": self.backend_id,
"doc_id": sparse_id,
"source": source,
})
return sparse_id
|
retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]
Retrieve from both backends and fuse with RRF.
Source code in src/openjarvis/tools/storage/hybrid.py
| def retrieve(
self,
query: str,
*,
top_k: int = 5,
**kwargs: Any,
) -> List[RetrievalResult]:
"""Retrieve from both backends and fuse with RRF."""
# Over-fetch for better fusion
fetch_k = top_k * 3
sparse_results = self._sparse.retrieve(
query, top_k=fetch_k,
)
dense_results = self._dense.retrieve(
query, top_k=fetch_k,
)
fused = reciprocal_rank_fusion(
[sparse_results, dense_results],
k=self._k,
weights=self._weights,
)
bus = get_event_bus()
bus.publish(EventType.MEMORY_RETRIEVE, {
"backend": self.backend_id,
"query": query,
"num_results": min(len(fused), top_k),
})
return fused[:top_k]
|
delete(doc_id: str) -> bool
Delete from both sub-backends.
Source code in src/openjarvis/tools/storage/hybrid.py
| def delete(self, doc_id: str) -> bool:
"""Delete from both sub-backends."""
sparse_ok = self._sparse.delete(doc_id)
dense_id = self._id_map.pop(doc_id, None)
dense_ok = False
if dense_id is not None:
dense_ok = self._dense.delete(dense_id)
return sparse_ok or dense_ok
|
Clear both sub-backends.
Source code in src/openjarvis/tools/storage/hybrid.py
| def clear(self) -> None:
"""Clear both sub-backends."""
self._sparse.clear()
self._dense.clear()
self._id_map.clear()
|
reciprocal_rank_fusion
Fuse multiple ranked result lists using RRF.
RRF_score(d) = sum(weight_i / (k + rank_i(d)))
| PARAMETER |
DESCRIPTION |
ranked_lists
|
Each inner list is a ranked sequence of results (best first).
TYPE:
List[List[RetrievalResult]]
|
k
|
RRF constant (default 60).
TYPE:
int
DEFAULT:
60
|
weights
|
Per-list weight (defaults to equal weighting).
TYPE:
Optional[List[float]]
DEFAULT:
None
|
| RETURNS |
DESCRIPTION |
Merged list sorted by fused score, descending.
|
|
Source code in src/openjarvis/tools/storage/hybrid.py
| def reciprocal_rank_fusion(
ranked_lists: List[List[RetrievalResult]],
*,
k: int = 60,
weights: Optional[List[float]] = None,
) -> List[RetrievalResult]:
"""Fuse multiple ranked result lists using RRF.
``RRF_score(d) = sum(weight_i / (k + rank_i(d)))``
Parameters
----------
ranked_lists:
Each inner list is a ranked sequence of results (best first).
k:
RRF constant (default 60).
weights:
Per-list weight (defaults to equal weighting).
Returns
-------
Merged list sorted by fused score, descending.
"""
if weights is None:
weights = [1.0] * len(ranked_lists)
# Map content -> (fused_score, best_result)
scores: Dict[str, float] = {}
best_result: Dict[str, RetrievalResult] = {}
for weight, results in zip(weights, ranked_lists):
for rank, result in enumerate(results):
key = result.content
rrf = weight / (k + rank + 1)
scores[key] = scores.get(key, 0.0) + rrf
# Keep the result with the highest original score
if key not in best_result:
best_result[key] = result
# Build fused results
fused = []
for content_key, fused_score in sorted(
scores.items(), key=lambda x: x[1], reverse=True
):
original = best_result[content_key]
fused.append(RetrievalResult(
content=original.content,
score=fused_score,
source=original.source,
metadata=original.metadata,
))
return fused
|
Document Chunking
Splits text into fixed-size chunks with configurable overlap, respecting
paragraph boundaries.
ChunkConfig
ChunkConfig(chunk_size: int = 512, chunk_overlap: int = 64, min_chunk_size: int = 50)
Parameters controlling the chunking strategy.
Chunk
Chunk(content: str, source: str = '', offset: int = 0, index: int = 0, metadata: Dict[str, Any] = dict())
A single chunk produced by the chunking pipeline.
chunk_text
chunk_text
chunk_text(text: str, *, source: str = '', config: Optional[ChunkConfig] = None) -> List[Chunk]
Split text into chunks respecting paragraph boundaries.
| PARAMETER |
DESCRIPTION |
text
|
TYPE:
str
|
source
|
Originating filename or identifier.
TYPE:
str
DEFAULT:
''
|
config
|
Chunking parameters (uses defaults if None).
TYPE:
Optional[ChunkConfig]
DEFAULT:
None
|
| RETURNS |
DESCRIPTION |
List of :class:`Chunk` objects, in order.
|
|
Source code in src/openjarvis/tools/storage/chunking.py
| def chunk_text(
text: str,
*,
source: str = "",
config: Optional[ChunkConfig] = None,
) -> List[Chunk]:
"""Split *text* into chunks respecting paragraph boundaries.
Parameters
----------
text:
The full document text.
source:
Originating filename or identifier.
config:
Chunking parameters (uses defaults if ``None``).
Returns
-------
List of :class:`Chunk` objects, in order.
"""
if not text or not text.strip():
return []
cfg = config or ChunkConfig()
# Split into paragraphs (double newline)
paragraphs = [p for p in text.split("\n\n") if p.strip()]
chunks: List[Chunk] = []
current_tokens: List[str] = []
current_offset = 0
chunk_start_offset = 0
for para in paragraphs:
para_tokens = para.split()
# If adding this paragraph would exceed chunk_size and we already
# have content, flush the current chunk first.
if (
current_tokens
and len(current_tokens) + len(para_tokens) > cfg.chunk_size
):
chunk_content = " ".join(current_tokens)
if _count_tokens(chunk_content) >= cfg.min_chunk_size:
chunks.append(Chunk(
content=chunk_content,
source=source,
offset=chunk_start_offset,
index=len(chunks),
))
# Keep the overlap tail for the next chunk
if cfg.chunk_overlap > 0 and len(current_tokens) > cfg.chunk_overlap:
overlap = current_tokens[-cfg.chunk_overlap:]
current_tokens = list(overlap)
else:
current_tokens = []
chunk_start_offset = current_offset
# If a single paragraph exceeds chunk_size, split it directly
if len(para_tokens) > cfg.chunk_size:
# Flush anything accumulated first
if current_tokens:
chunk_content = " ".join(current_tokens)
if _count_tokens(chunk_content) >= cfg.min_chunk_size:
chunks.append(Chunk(
content=chunk_content,
source=source,
offset=chunk_start_offset,
index=len(chunks),
))
current_tokens = []
# Split the oversized paragraph into fixed windows
idx = 0
while idx < len(para_tokens):
window = para_tokens[idx:idx + cfg.chunk_size]
chunk_content = " ".join(window)
if _count_tokens(chunk_content) >= cfg.min_chunk_size:
chunks.append(Chunk(
content=chunk_content,
source=source,
offset=current_offset + idx,
index=len(chunks),
))
step = max(1, cfg.chunk_size - cfg.chunk_overlap)
idx += step
current_offset += len(para_tokens)
chunk_start_offset = current_offset
continue
current_tokens.extend(para_tokens)
current_offset += len(para_tokens)
# Flush remaining tokens
if current_tokens:
chunk_content = " ".join(current_tokens)
if _count_tokens(chunk_content) >= cfg.min_chunk_size:
chunks.append(Chunk(
content=chunk_content,
source=source,
offset=chunk_start_offset,
index=len(chunks),
))
return chunks
|
Document Ingestion
File reading, type detection, and directory walking for the ingestion
pipeline.
DocumentMeta(path: str, file_type: str, size_bytes: int, line_count: int)
Metadata about an ingested document.
detect_file_type
detect_file_type(path: Path) -> str
Map a file extension to one of: text, markdown, pdf, code.
Source code in src/openjarvis/tools/storage/ingest.py
| def detect_file_type(path: Path) -> str:
"""Map a file extension to one of: text, markdown, pdf, code."""
suffix = path.suffix.lower()
if suffix in {".md", ".markdown", ".mdx"}:
return "markdown"
if suffix == ".pdf":
return "pdf"
if suffix in _CODE_EXTS:
return "code"
return "text"
|
read_document
Read a file and return (text, metadata).
| RAISES |
DESCRIPTION |
ImportError
|
If the file is a PDF and pdfplumber is not installed.
|
FileNotFoundError
|
|
Source code in src/openjarvis/tools/storage/ingest.py
| def read_document(path: Path) -> Tuple[str, DocumentMeta]:
"""Read a file and return ``(text, metadata)``.
Raises
------
ImportError
If the file is a PDF and ``pdfplumber`` is not installed.
FileNotFoundError
If *path* does not exist.
"""
if not path.exists():
raise FileNotFoundError(f"File not found: {path}")
ftype = detect_file_type(path)
if ftype == "pdf":
try:
import pdfplumber # noqa: F401
except ImportError:
raise ImportError(
"PDF support requires pdfplumber. "
"Install it with: pip install openjarvis[memory-pdf]"
) from None
text = _read_pdf(path)
else:
text = _read_text(path)
line_count = text.count("\n") + 1 if text else 0
meta = DocumentMeta(
path=str(path),
file_type=ftype,
size_bytes=path.stat().st_size,
line_count=line_count,
)
return text, meta
|
ingest_path
Ingest a file or directory into chunks.
If path is a file, reads and chunks it.
If path is a directory, recursively walks it (skipping hidden and
common non-content directories) and chunks each file.
Source code in src/openjarvis/tools/storage/ingest.py
| def ingest_path(
path: Path,
*,
config: Optional[ChunkConfig] = None,
) -> List[Chunk]:
"""Ingest a file or directory into chunks.
If *path* is a file, reads and chunks it.
If *path* is a directory, recursively walks it (skipping hidden and
common non-content directories) and chunks each file.
"""
if not path.exists():
raise FileNotFoundError(f"Path not found: {path}")
if path.is_file():
text, _meta = read_document(path)
return chunk_text(text, source=str(path), config=config)
# Directory: recursive walk
all_chunks: List[Chunk] = []
for child in sorted(path.rglob("*")):
# Skip directories themselves — rglob yields files too
if child.is_dir():
continue
# Check if any parent directory should be skipped
rel = child.relative_to(path)
skip = False
for part in rel.parts[:-1]:
if _should_skip_dir(part):
skip = True
break
if skip:
continue
# Skip hidden files
if child.name.startswith("."):
continue
# Skip sensitive files (secrets, credentials, keys)
from openjarvis.security.file_policy import is_sensitive_file
if is_sensitive_file(child):
continue
# Skip binary-looking files
if child.suffix.lower() in {
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico",
".mp3", ".mp4", ".wav", ".avi", ".mov",
".zip", ".tar", ".gz", ".bz2", ".7z",
".exe", ".dll", ".so", ".dylib", ".o",
".pyc", ".pyo", ".class", ".wasm",
}:
continue
try:
text, _meta = read_document(child)
chunks = chunk_text(text, source=str(child), config=config)
all_chunks.extend(chunks)
except (ImportError, OSError):
# Skip files we can't read (e.g. PDF without pdfplumber)
continue
return all_chunks
|
Context Injection
Retrieves relevant memory and injects it into prompts as system messages
with source attribution.
ContextConfig
ContextConfig
dataclass
ContextConfig(enabled: bool = True, top_k: int = 5, min_score: float = 0.1, max_context_tokens: int = 2048)
Controls how retrieved context is injected into prompts.
inject_context
inject_context
Retrieve relevant context and prepend it to messages.
Returns a new list — the original list is not mutated.
If no results pass the score threshold, returns the original
messages unchanged.
| PARAMETER |
DESCRIPTION |
query
|
The user query to search for.
TYPE:
str
|
messages
|
The existing message list.
TYPE:
List[Message]
|
backend
|
The memory backend to search.
TYPE:
MemoryBackend
|
config
|
Context injection settings (uses defaults if None).
TYPE:
Optional[ContextConfig]
DEFAULT:
None
|
Source code in src/openjarvis/tools/storage/context.py
| def inject_context(
query: str,
messages: List[Message],
backend: MemoryBackend,
*,
config: Optional[ContextConfig] = None,
) -> List[Message]:
"""Retrieve relevant context and prepend it to *messages*.
Returns a **new** list — the original list is not mutated.
If no results pass the score threshold, returns the original
messages unchanged.
Parameters
----------
query:
The user query to search for.
messages:
The existing message list.
backend:
The memory backend to search.
config:
Context injection settings (uses defaults if ``None``).
"""
cfg = config or ContextConfig()
if not cfg.enabled:
return messages
results = backend.retrieve(query, top_k=cfg.top_k)
# Filter by minimum score
results = [r for r in results if r.score >= cfg.min_score]
if not results:
return messages
# Truncate to max_context_tokens
truncated: List[RetrievalResult] = []
total_tokens = 0
for r in results:
tokens = _count_tokens(r.content)
if total_tokens + tokens > cfg.max_context_tokens:
break
truncated.append(r)
total_tokens += tokens
if not truncated:
return messages
# Publish event
bus = get_event_bus()
bus.publish(EventType.MEMORY_RETRIEVE, {
"context_injection": True,
"query": query,
"num_results": len(truncated),
"total_tokens": total_tokens,
})
# Build context message and prepend
ctx_msg = build_context_message(truncated)
return [ctx_msg] + list(messages)
|
format_context
format_context
Format retrieval results into a context block.
Each result is prefixed with its source attribution.
Source code in src/openjarvis/tools/storage/context.py
| def format_context(results: List[RetrievalResult]) -> str:
"""Format retrieval results into a context block.
Each result is prefixed with its source attribution.
"""
if not results:
return ""
lines = []
for r in results:
source_tag = f"[Source: {r.source}]" if r.source else ""
if source_tag:
lines.append(f"{source_tag} {r.content}")
else:
lines.append(r.content)
return "\n\n".join(lines)
|
build_context_message
build_context_message
Create a system message with formatted context.
Source code in src/openjarvis/tools/storage/context.py
| def build_context_message(
results: List[RetrievalResult],
) -> Message:
"""Create a system message with formatted context."""
context_text = format_context(results)
content = (
"The following context was retrieved from the knowledge"
" base. Use it to inform your response, citing sources"
" where applicable:\n\n"
+ context_text
)
return Message(role=Role.SYSTEM, content=content)
|
Embeddings
Abstraction layer for text embedding models used by dense retrieval backends.
Embedder
Bases: ABC
Base class for text embedding models.
Subclasses must implement :meth:embed and :meth:dim.
embed(texts: list[str]) -> Any
Embed texts and return a numpy array of shape (n, dim).
Source code in src/openjarvis/tools/storage/embeddings.py
| @abstractmethod
def embed(self, texts: list[str]) -> Any:
"""Embed *texts* and return a numpy array of shape (n, dim)."""
|
Return the dimensionality of the embedding vectors.
Source code in src/openjarvis/tools/storage/embeddings.py
| @abstractmethod
def dim(self) -> int:
"""Return the dimensionality of the embedding vectors."""
|
SentenceTransformerEmbedder(model_name: str = 'all-MiniLM-L6-v2')
Bases: Embedder
Embedder backed by sentence-transformers.
| PARAMETER |
DESCRIPTION |
model_name
|
HuggingFace model identifier. Defaults to the lightweight
all-MiniLM-L6-v2 (384-dim, ~22 MB).
TYPE:
str
DEFAULT:
'all-MiniLM-L6-v2'
|
Source code in src/openjarvis/tools/storage/embeddings.py
| def __init__(
self, model_name: str = "all-MiniLM-L6-v2"
) -> None:
try:
from sentence_transformers import (
SentenceTransformer,
)
except ImportError as exc:
raise ImportError(
"sentence-transformers is required for "
"SentenceTransformerEmbedder. Install it with: "
"pip install sentence-transformers"
) from exc
self._model = SentenceTransformer(model_name)
self._dim: int = (
self._model.get_sentence_embedding_dimension()
)
|
embed(texts: list[str]) -> Any
Return a numpy array of shape (len(texts), dim).
Source code in src/openjarvis/tools/storage/embeddings.py
| def embed(self, texts: list[str]) -> Any:
"""Return a numpy array of shape ``(len(texts), dim)``."""
return self._model.encode(
texts, convert_to_numpy=True
)
|
Return the embedding dimensionality.
Source code in src/openjarvis/tools/storage/embeddings.py
| def dim(self) -> int:
"""Return the embedding dimensionality."""
return self._dim
|