feat: data redaction in log messages and update related tests

This commit is contained in:
Lloyd
2026-06-10 17:05:21 +01:00
parent 4a8876a95e
commit 018f425dc1
4 changed files with 50 additions and 6 deletions
-3
View File
@@ -120,9 +120,6 @@ class ACL:
logger.debug(
f"Admin pwd len={len(admin_pwd) if admin_pwd else 0}, Guest pwd len={len(guest_pwd) if guest_pwd else 0}"
)
logger.debug(
f"Password comparison: '{password}' vs admin='{admin_pwd[:4]}...' ({len(admin_pwd)} chars)"
)
if admin_pwd and password == admin_pwd:
permissions = PERM_ACL_ADMIN
logger.info(f"Admin password validated for '{target_identity_name or 'unknown'}'")
-1
View File
@@ -2521,7 +2521,6 @@ class APIEndpoints:
"timestamp": datetime.now().isoformat(),
"level": "INFO",
"logger": "HTTPServer",
"raw_message": "No logs available",
}
]
)
+27 -2
View File
@@ -2,6 +2,7 @@ import json
import logging
import os
import queue
import re
import secrets
import threading
from collections import deque
@@ -42,6 +43,13 @@ logger = logging.getLogger("HTTPServer")
# In-memory log buffer
class LogBuffer(logging.Handler):
_SECRET_PATTERNS = (
re.compile(
r"(?i)\b(admin_password|guest_password|password|passwd|api[_-]?key|token|jwt_secret)\b(\s*[:=]\s*)(['\"]?)([^,'\"\s]+)(['\"]?)"
),
re.compile(r"(?i)\bBearer\s+[A-Za-z0-9._-]+"),
)
def __init__(self, max_lines=100):
super().__init__()
self.logs = deque(maxlen=max_lines)
@@ -50,17 +58,34 @@ class LogBuffer(logging.Handler):
self._subscribers = []
self.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
@classmethod
def _sanitize_log_text(cls, text: str) -> str:
if not text:
return ""
sanitized = text
def _replace_secret(match: re.Match) -> str:
key = match.group(1)
sep = match.group(2)
quote_start = match.group(3) or ""
quote_end = match.group(5) or quote_start
return f"{key}{sep}{quote_start}[REDACTED]{quote_end}"
sanitized = cls._SECRET_PATTERNS[0].sub(_replace_secret, sanitized)
sanitized = cls._SECRET_PATTERNS[1].sub("Bearer [REDACTED]", sanitized)
return sanitized
def emit(self, record):
try:
formatted_message = self.format(record)
formatted_message = self._sanitize_log_text(self.format(record))
entry = {
"id": self._next_log_id(),
"message": formatted_message,
"timestamp": datetime.fromtimestamp(record.created).isoformat(),
"level": record.levelname,
"logger": record.name,
"raw_message": record.getMessage(),
"module": record.module,
"pathname": record.pathname,
"line": record.lineno,
+23
View File
@@ -24,6 +24,29 @@ def test_log_buffer_emit_collects_messages():
assert "warn" in buf.logs[-1]["message"]
def test_log_buffer_emit_redacts_sensitive_values():
buf = hs.LogBuffer(max_lines=5)
rec = logging.LogRecord(
"auth",
logging.DEBUG,
__file__,
10,
"auth password=secret123 token=abc123 Authorization: Bearer deadbeef",
(),
None,
)
buf.emit(rec)
assert len(buf.logs) == 1
entry = buf.logs[0]
assert "secret123" not in entry["message"]
assert "abc123" not in entry["message"]
assert "deadbeef" not in entry["message"]
assert "[REDACTED]" in entry["message"]
assert "raw_message" not in entry
def test_doc_endpoint_routes_and_openapi_json_paths(monkeypatch):
api = SimpleNamespace(docs=lambda: "docs-html")
doc = hs.DocEndpoint(api)