Skip to content

API Reference: Security

The openjarvis.security package provides text scanning, content guardrails, file path filtering, and audit logging. All public components are documented below.

For usage examples and configuration, see the Security user guide. For the architectural design, see Security architecture.


Types

Core data types shared across the security subsystem.

ThreatLevel

Severity classification for individual scan findings. Ordered from least to most severe: LOW < MEDIUM < HIGH < CRITICAL.

ThreatLevel

Bases: str, Enum

Severity classification for security findings.

RedactionMode

Controls the action taken by GuardrailsEngine when findings are detected.

RedactionMode

Bases: str, Enum

Action mode when findings are detected.

SecurityEventType

Categories of security events recorded by AuditLogger.

SecurityEventType

Bases: str, Enum

Categories of security events.

ScanFinding

A single match produced by a scanner. Includes the pattern name, matched text, position, threat level, and a human-readable description.

ScanFinding dataclass

ScanFinding(pattern_name: str, matched_text: str, threat_level: ThreatLevel, start: int, end: int, description: str = '')

A single finding from a security scanner.

ScanResult

Aggregated result from one or more scanner passes. The clean property returns True when no findings were detected; highest_threat returns the most severe ThreatLevel found.

ScanResult dataclass

ScanResult(findings: List[ScanFinding] = list())

Aggregated result from one or more scanners.

Attributes
clean property
clean: bool

Return True if no findings were detected.

highest_threat property
highest_threat: Optional[ThreatLevel]

Return the highest threat level among findings, or None.

SecurityEvent

A recorded security event, as persisted by AuditLogger.

SecurityEvent dataclass

SecurityEvent(event_type: SecurityEventType, timestamp: float, findings: List[ScanFinding] = list(), content_preview: str = '', action_taken: str = '')

A recorded security event for audit logging.


BaseScanner

BaseScanner is the abstract base class for all scanner implementations. Implement both scan() and redact() to create a custom scanner.

BaseScanner

Bases: ABC

Base class for all security scanners.

Subclasses implement pattern-based scanning for secrets, PII, or other sensitive content.

Functions

scan abstractmethod
scan(text: str) -> ScanResult

Scan text and return findings.

Source code in src/openjarvis/security/_stubs.py
@abstractmethod
def scan(self, text: str) -> ScanResult:
    """Scan *text* and return findings."""
redact abstractmethod
redact(text: str) -> str

Return text with sensitive matches replaced by redaction markers.

Source code in src/openjarvis/security/_stubs.py
@abstractmethod
def redact(self, text: str) -> str:
    """Return *text* with sensitive matches replaced by redaction markers."""

SecretScanner

Detects API keys, tokens, passwords, and credentials in text using regex patterns. See the pattern reference table in the user guide for the full list of patterns and their threat levels.

SecretScanner

Bases: BaseScanner

Detect API keys, tokens, passwords, and other secrets in text.

Functions

scan
scan(text: str) -> ScanResult

Scan text for secret patterns.

Source code in src/openjarvis/security/scanner.py
def scan(self, text: str) -> ScanResult:
    """Scan *text* for secret patterns."""
    findings = []
    for name, (pattern, threat, desc) in self.PATTERNS.items():
        for match in re.finditer(pattern, text):
            findings.append(
                ScanFinding(
                    pattern_name=name,
                    matched_text=match.group(0),
                    threat_level=threat,
                    start=match.start(),
                    end=match.end(),
                    description=desc,
                )
            )
    return ScanResult(findings=findings)
redact
redact(text: str) -> str

Replace secret matches with [REDACTED:{pattern_name}].

Source code in src/openjarvis/security/scanner.py
def redact(self, text: str) -> str:
    """Replace secret matches with ``[REDACTED:{pattern_name}]``."""
    result = text
    for name, (pattern, _threat, _desc) in self.PATTERNS.items():
        result = re.sub(pattern, f"[REDACTED:{name}]", result)
    return result

PIIScanner

Detects personally identifiable information including email addresses, Social Security Numbers, credit card numbers, phone numbers, and public IP addresses.

PIIScanner

Bases: BaseScanner

Detect personally identifiable information in text.

Functions

scan
scan(text: str) -> ScanResult

Scan text for PII patterns.

Source code in src/openjarvis/security/scanner.py
def scan(self, text: str) -> ScanResult:
    """Scan *text* for PII patterns."""
    findings = []
    for name, (pattern, threat, desc) in self.PATTERNS.items():
        for match in re.finditer(pattern, text):
            findings.append(
                ScanFinding(
                    pattern_name=name,
                    matched_text=match.group(0),
                    threat_level=threat,
                    start=match.start(),
                    end=match.end(),
                    description=desc,
                )
            )
    return ScanResult(findings=findings)
redact
redact(text: str) -> str

Replace PII matches with [REDACTED:{pattern_name}].

Source code in src/openjarvis/security/scanner.py
def redact(self, text: str) -> str:
    """Replace PII matches with ``[REDACTED:{pattern_name}]``."""
    result = text
    for name, (pattern, _threat, _desc) in self.PATTERNS.items():
        result = re.sub(pattern, f"[REDACTED:{name}]", result)
    return result

GuardrailsEngine

GuardrailsEngine wraps any InferenceEngine with security scanning on both input and output. It implements the full InferenceEngine interface, so it can be used anywhere an engine is expected.

Registration

GuardrailsEngine is not registered in EngineRegistry. Instantiate it directly by wrapping an existing engine instance.

GuardrailsEngine

GuardrailsEngine(engine: InferenceEngine, *, scanners: Optional[List[BaseScanner]] = None, mode: RedactionMode = WARN, scan_input: bool = True, scan_output: bool = True, bus: Optional[EventBus] = None)

Bases: InferenceEngine

Wraps an existing InferenceEngine with security scanning.

Not registered in EngineRegistry — instantiated dynamically to wrap any engine at runtime.

PARAMETER DESCRIPTION
engine

The wrapped inference engine.

TYPE: InferenceEngine

scanners

List of scanners to run. Defaults to SecretScanner + PIIScanner.

TYPE: Optional[List[BaseScanner]] DEFAULT: None

mode

Action taken on findings: WARN, REDACT, or BLOCK.

TYPE: RedactionMode DEFAULT: WARN

scan_input

Whether to scan input messages.

TYPE: bool DEFAULT: True

scan_output

Whether to scan output content.

TYPE: bool DEFAULT: True

bus

Optional event bus for publishing security events.

TYPE: Optional[EventBus] DEFAULT: None

Source code in src/openjarvis/security/guardrails.py
def __init__(
    self,
    engine: InferenceEngine,
    *,
    scanners: Optional[List[BaseScanner]] = None,
    mode: RedactionMode = RedactionMode.WARN,
    scan_input: bool = True,
    scan_output: bool = True,
    bus: Optional[EventBus] = None,
) -> None:
    self._engine = engine
    self._scanners: List[BaseScanner] = scanners if scanners is not None else [
        SecretScanner(),
        PIIScanner(),
    ]
    self._mode = mode
    self._scan_input = scan_input
    self._scan_output = scan_output
    self._bus = bus

Attributes

engine_id property
engine_id: str

Delegate to the wrapped engine.

Functions

generate
generate(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> Dict[str, Any]

Scan input, call wrapped engine, scan output.

Source code in src/openjarvis/security/guardrails.py
def generate(
    self,
    messages: Sequence[Message],
    *,
    model: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Scan input, call wrapped engine, scan output."""
    # Scan input messages
    if self._scan_input:
        for msg in messages:
            if msg.content:
                result = self._scan_text(msg.content)
                if not result.clean:
                    msg = Message(
                        role=msg.role,
                        content=self._handle_findings(
                            msg.content, result, "input"
                        ),
                        name=msg.name,
                        tool_calls=msg.tool_calls,
                        tool_call_id=msg.tool_call_id,
                        metadata=msg.metadata,
                    )

    # Call wrapped engine
    response = self._engine.generate(
        messages, model=model, temperature=temperature,
        max_tokens=max_tokens, **kwargs,
    )

    # Scan output
    if self._scan_output:
        content = response.get("content", "")
        if content:
            result = self._scan_text(content)
            if not result.clean:
                response["content"] = self._handle_findings(
                    content, result, "output"
                )

    return response
stream async
stream(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> AsyncIterator[str]

Yield tokens in real-time, scan accumulated output post-hoc.

Source code in src/openjarvis/security/guardrails.py
async def stream(
    self,
    messages: Sequence[Message],
    *,
    model: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> AsyncIterator[str]:
    """Yield tokens in real-time, scan accumulated output post-hoc."""
    accumulated = []
    async for token in self._engine.stream(
        messages, model=model, temperature=temperature,
        max_tokens=max_tokens, **kwargs,
    ):
        accumulated.append(token)
        yield token

    # Post-hoc scan of accumulated output for logging only
    if self._scan_output:
        full_output = "".join(accumulated)
        if full_output:
            result = self._scan_text(full_output)
            if not result.clean and self._bus:
                finding_dicts = [
                    {
                        "pattern": f.pattern_name,
                        "threat": f.threat_level.value,
                        "description": f.description,
                    }
                    for f in result.findings
                ]
                self._bus.publish(
                    EventType.SECURITY_ALERT,
                    {
                        "direction": "output",
                        "findings": finding_dicts,
                        "mode": "stream_post_hoc",
                    },
                )
list_models
list_models() -> List[str]

Delegate to wrapped engine.

Source code in src/openjarvis/security/guardrails.py
def list_models(self) -> List[str]:
    """Delegate to wrapped engine."""
    return self._engine.list_models()
health
health() -> bool

Delegate to wrapped engine.

Source code in src/openjarvis/security/guardrails.py
def health(self) -> bool:
    """Delegate to wrapped engine."""
    return self._engine.health()

SecurityBlockError

Raised by GuardrailsEngine when mode=RedactionMode.BLOCK and findings are detected during a scan. Catch this exception to handle blocked requests gracefully.

from openjarvis.security.guardrails import GuardrailsEngine, SecurityBlockError
from openjarvis.security.types import RedactionMode

guarded = GuardrailsEngine(engine, mode=RedactionMode.BLOCK)

try:
    response = guarded.generate(messages, model="qwen3:8b")
except SecurityBlockError as exc:
    # exc.args[0] describes the direction and finding count
    print(f"Request blocked: {exc}")

SecurityBlockError

Bases: Exception

Raised when mode is BLOCK and security findings are detected.


File Policy

Functions and constants for filtering sensitive file paths. Used internally by FileReadTool and the memory ingest path.

DEFAULT_SENSITIVE_PATTERNS

The default set of glob patterns used to identify sensitive files. This is a frozenset[str] exported from openjarvis.security.file_policy.

See the sensitive file patterns table in the user guide for the complete list.

DEFAULT_SENSITIVE_PATTERNS module-attribute

DEFAULT_SENSITIVE_PATTERNS: frozenset[str] = frozenset({'.env', '.env.*', '*.env', '.secret', '*.secrets', 'credentials.*', '*.pem', '*.key', '*.p12', '*.pfx', '*.jks', 'id_rsa', 'id_ed25519', '.htpasswd', '.pgpass', '.netrc'})

is_sensitive_file

is_sensitive_file

is_sensitive_file(path: Union[str, Path]) -> bool

Return True if path matches a sensitive file pattern.

Checks both the filename and the full name against DEFAULT_SENSITIVE_PATTERNS using :func:fnmatch.fnmatch.

Source code in src/openjarvis/security/file_policy.py
def is_sensitive_file(path: Union[str, Path]) -> bool:
    """Return ``True`` if *path* matches a sensitive file pattern.

    Checks both the filename and the full name against
    ``DEFAULT_SENSITIVE_PATTERNS`` using :func:`fnmatch.fnmatch`.
    """
    p = Path(path)
    name = p.name
    for pattern in DEFAULT_SENSITIVE_PATTERNS:
        if fnmatch.fnmatch(name, pattern):
            return True
    return False

filter_sensitive_paths

filter_sensitive_paths

filter_sensitive_paths(paths: Iterable[Union[str, Path]]) -> List[Path]

Return only non-sensitive paths from paths.

Source code in src/openjarvis/security/file_policy.py
def filter_sensitive_paths(paths: Iterable[Union[str, Path]]) -> List[Path]:
    """Return only non-sensitive paths from *paths*."""
    return [Path(p) for p in paths if not is_sensitive_file(p)]

AuditLogger

Append-only SQLite-backed storage for security events. Subscribes to SECURITY_SCAN, SECURITY_ALERT, and SECURITY_BLOCK events on the EventBus when a bus is provided.

The default database path is ~/.openjarvis/audit.db, overridable via security.audit_log_path in config.toml.

audit_logger_example.py
from openjarvis.core.events import EventBus
from openjarvis.security.audit import AuditLogger
from openjarvis.security.guardrails import GuardrailsEngine
from openjarvis.security.types import RedactionMode

bus = EventBus()
audit = AuditLogger(bus=bus)

guarded = GuardrailsEngine(engine, mode=RedactionMode.WARN, bus=bus)

# Security events from guarded engine are now persisted automatically
events = audit.query(limit=10)
print(f"Logged {audit.count()} events")
audit.close()

AuditLogger

AuditLogger(db_path: Union[str, Path] = DEFAULT_CONFIG_DIR / 'audit.db', bus: Optional[EventBus] = None)

Append-only SQLite audit log for security events.

PARAMETER DESCRIPTION
db_path

Path to the SQLite database file.

TYPE: Union[str, Path] DEFAULT: DEFAULT_CONFIG_DIR / 'audit.db'

bus

Optional event bus — if provided, subscribes to security events (SECURITY_SCAN, SECURITY_ALERT, SECURITY_BLOCK).

TYPE: Optional[EventBus] DEFAULT: None

Source code in src/openjarvis/security/audit.py
def __init__(
    self,
    db_path: Union[str, Path] = DEFAULT_CONFIG_DIR / "audit.db",
    bus: Optional[EventBus] = None,
) -> None:
    self._db_path = Path(db_path)
    self._db_path.parent.mkdir(parents=True, exist_ok=True)
    self._conn = sqlite3.connect(str(self._db_path))
    self._conn.execute(
        """
        CREATE TABLE IF NOT EXISTS security_events (
            id          INTEGER PRIMARY KEY,
            timestamp   REAL,
            event_type  TEXT,
            findings_json TEXT,
            content_preview TEXT,
            action_taken TEXT
        )
        """
    )
    self._conn.commit()

    if bus is not None:
        bus.subscribe(EventType.SECURITY_SCAN, self._on_event)
        bus.subscribe(EventType.SECURITY_ALERT, self._on_event)
        bus.subscribe(EventType.SECURITY_BLOCK, self._on_event)

Functions

log
log(event: SecurityEvent) -> None

Insert a security event into the audit log.

Source code in src/openjarvis/security/audit.py
def log(self, event: SecurityEvent) -> None:
    """Insert a security event into the audit log."""
    findings_json = json.dumps([
        {
            "pattern_name": f.pattern_name,
            "matched_text": f.matched_text,
            "threat_level": f.threat_level.value,
            "start": f.start,
            "end": f.end,
            "description": f.description,
        }
        for f in event.findings
    ])
    self._conn.execute(
        """
        INSERT INTO security_events
            (timestamp, event_type, findings_json, content_preview, action_taken)
        VALUES (?, ?, ?, ?, ?)
        """,
        (
            event.timestamp,
            event.event_type.value,
            findings_json,
            event.content_preview,
            event.action_taken,
        ),
    )
    self._conn.commit()
query
query(*, event_type: Optional[str] = None, since: Optional[float] = None, limit: int = 100) -> List[SecurityEvent]

Query logged security events with optional filters.

Source code in src/openjarvis/security/audit.py
def query(
    self,
    *,
    event_type: Optional[str] = None,
    since: Optional[float] = None,
    limit: int = 100,
) -> List[SecurityEvent]:
    """Query logged security events with optional filters."""
    sql = (
        "SELECT timestamp, event_type, findings_json,"
        " content_preview, action_taken"
        " FROM security_events WHERE 1=1"
    )
    params: list = []

    if event_type is not None:
        sql += " AND event_type = ?"
        params.append(event_type)
    if since is not None:
        sql += " AND timestamp >= ?"
        params.append(since)

    sql += " ORDER BY timestamp DESC LIMIT ?"
    params.append(limit)

    rows = self._conn.execute(sql, params).fetchall()
    events: List[SecurityEvent] = []
    for row in rows:
        ts, etype, findings_json, preview, action = row
        findings_raw = json.loads(findings_json) if findings_json else []
        findings = [
            ScanFinding(
                pattern_name=f["pattern_name"],
                matched_text=f["matched_text"],
                threat_level=ThreatLevel(f["threat_level"]),
                start=f["start"],
                end=f["end"],
                description=f.get("description", ""),
            )
            for f in findings_raw
        ]
        events.append(
            SecurityEvent(
                event_type=SecurityEventType(etype),
                timestamp=ts,
                findings=findings,
                content_preview=preview or "",
                action_taken=action or "",
            )
        )
    return events
count
count() -> int

Return the total number of logged security events.

Source code in src/openjarvis/security/audit.py
def count(self) -> int:
    """Return the total number of logged security events."""
    row = self._conn.execute(
        "SELECT COUNT(*) FROM security_events"
    ).fetchone()
    return row[0] if row else 0
close
close() -> None

Close the SQLite connection.

Source code in src/openjarvis/security/audit.py
def close(self) -> None:
    """Close the SQLite connection."""
    self._conn.close()