"""Tests for API cache layer.""" import json from unittest.mock import MagicMock, patch import pytest from fastapi import FastAPI, Request from pydantic import BaseModel from meshcore_hub.api.cache import cached, sorted_query_string from meshcore_hub.common.redis import NullCache, RedisCacheBackend class _SampleModel(BaseModel): items: list total: int class TestSortedQueryString: def test_empty_query_params(self): scope = {"type": "http", "query_string": b"", "headers": []} request = Request(scope) assert sorted_query_string(request) == "" def test_single_param(self): scope = {"type": "http", "query_string": b"limit=50", "headers": []} request = Request(scope) assert sorted_query_string(request) == "limit=50" def test_multiple_params_sorted(self): scope = { "type": "http", "query_string": b"offset=0&limit=50", "headers": [], } request = Request(scope) result = sorted_query_string(request) assert result == "limit=50&offset=0" def test_url_encoded_special_chars(self): scope = { "type": "http", "query_string": b"search=foo+bar", "headers": [], } request = Request(scope) result = sorted_query_string(request) assert "search=" in result assert "foo" in result class TestNullCache: def test_get_returns_none(self): cache = NullCache() assert cache.get("any_key") is None def test_set_does_not_raise(self): cache = NullCache() cache.set("key", "value", 30) def test_ping_returns_false(self): cache = NullCache() assert cache.ping() is False def test_delete_does_not_raise(self): cache = NullCache() cache.delete("prefix") class TestRedisCacheBackend: def test_get_returns_cached_value(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.get.return_value = b'{"items": []}' backend = RedisCacheBackend(key_prefix="hub") result = backend.get("nodes:limit=50") assert result == '{"items": []}' mock_client.get.assert_called_once_with("hub:nodes:limit=50") def test_get_returns_none_on_miss(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.get.return_value = None backend = RedisCacheBackend(key_prefix="hub") assert backend.get("nodes:limit=50") is None def test_set_stores_with_ttl(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client backend = RedisCacheBackend(key_prefix="hub") backend.set("nodes:limit=50", '{"items": []}', 30) mock_client.setex.assert_called_once_with( "hub:nodes:limit=50", 30, '{"items": []}' ) def test_ping_returns_true(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.ping.return_value = True backend = RedisCacheBackend(key_prefix="hub") assert backend.ping() is True def test_ping_returns_false_on_error(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.ping.side_effect = Exception("connection refused") backend = RedisCacheBackend(key_prefix="hub") assert backend.ping() is False def test_get_returns_none_on_connection_error(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.get.side_effect = Exception("connection error") backend = RedisCacheBackend(key_prefix="hub") assert backend.get("any_key") is None def test_set_logs_warning_on_error(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.setex.side_effect = Exception("timeout") backend = RedisCacheBackend(key_prefix="hub") backend.set("key", "value", 30) # Should not raise def test_key_prefix_prepended(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.get.return_value = None backend = RedisCacheBackend(key_prefix="hub-stg") backend.get("nodes:") mock_client.get.assert_called_once_with("hub-stg:nodes:") def test_get_decodes_str_value(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.get.return_value = '{"already": "str"}' backend = RedisCacheBackend(key_prefix="hub") result = backend.get("key") assert result == '{"already": "str"}' def test_delete_scans_and_deletes_keys(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.scan.return_value = (0, [b"hub:nodes:1", b"hub:nodes:2"]) backend = RedisCacheBackend(key_prefix="hub") backend.delete("nodes") mock_client.scan.assert_called_once_with(0, match="hub:nodes*", count=100) mock_client.delete.assert_called_once_with(b"hub:nodes:1", b"hub:nodes:2") def test_delete_multi_page_scan(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.scan.side_effect = [ (42, [b"hub:nodes:1"]), (0, [b"hub:nodes:2"]), ] backend = RedisCacheBackend(key_prefix="hub") backend.delete("nodes") assert mock_client.scan.call_count == 2 assert mock_client.delete.call_count == 2 def test_delete_handles_exception(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.scan.side_effect = Exception("scan error") backend = RedisCacheBackend(key_prefix="hub") backend.delete("nodes") def test_close_calls_client_close(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client backend = RedisCacheBackend(key_prefix="hub") backend.close() mock_client.close.assert_called_once() def test_close_handles_exception(self): with patch("redis.Redis") as mock_redis_cls: mock_client = MagicMock() mock_redis_cls.return_value = mock_client mock_client.close.side_effect = Exception("close error") backend = RedisCacheBackend(key_prefix="hub") backend.close() class TestCachedDecorator: async def test_cache_hit_returns_cached_data(self): app = FastAPI() mock_cache = MagicMock() mock_cache.get.return_value = json.dumps({"items": [], "total": 0}) app.state.redis_cache = mock_cache app.state.redis_cache_ttl = 30 @cached("nodes") async def handler(request: Request): return {"items": ["should not appear"], "total": 1} scope = { "type": "http", "query_string": b"limit=50", "headers": [], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() result = await handler(request=request) assert result == {"items": [], "total": 0} assert request.state.cache_status == "HIT" async def test_cache_miss_calls_handler(self): app = FastAPI() mock_cache = MagicMock() mock_cache.get.return_value = None app.state.redis_cache = mock_cache app.state.redis_cache_ttl = 30 @cached("nodes") async def handler(request: Request): return {"items": ["real"], "total": 1} scope = { "type": "http", "query_string": b"limit=50", "headers": [], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() result = await handler(request=request) assert result == {"items": ["real"], "total": 1} assert request.state.cache_status == "MISS" mock_cache.set.assert_called_once() async def test_null_cache_always_calls_handler(self): app = FastAPI() app.state.redis_cache = NullCache() app.state.redis_cache_ttl = 30 call_count = 0 @cached("nodes") async def handler(request: Request): nonlocal call_count call_count += 1 return {"items": [], "total": 0} scope = { "type": "http", "query_string": b"limit=50", "headers": [], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() await handler(request=request) await handler(request=request) assert call_count == 2 async def test_redis_error_falls_through(self): app = FastAPI() mock_cache = MagicMock() mock_cache.get.side_effect = Exception("redis down") app.state.redis_cache = mock_cache app.state.redis_cache_ttl = 30 @cached("nodes") async def handler(request: Request): return {"items": ["fallback"], "total": 1} scope = { "type": "http", "query_string": b"", "headers": [], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() result = await handler(request=request) assert result == {"items": ["fallback"], "total": 1} async def test_no_request_raises_error(self): @cached("nodes") async def handler(): return {"items": []} with pytest.raises(TypeError, match="No Request"): await handler() async def test_custom_key_builder(self): app = FastAPI() mock_cache = MagicMock() mock_cache.get.return_value = None app.state.redis_cache = mock_cache app.state.redis_cache_ttl = 30 def role_key_builder(request: Request) -> str: role = request.headers.get("x-user-roles", "anonymous") return f"messages:role={role}:" @cached("messages", key_builder=role_key_builder) async def handler(request: Request): return {"items": []} scope = { "type": "http", "query_string": b"", "headers": [(b"x-user-roles", b"admin")], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() await handler(request=request) mock_cache.get.assert_called_once_with("messages:role=admin:") async def test_dashboard_ttl_override(self): app = FastAPI() mock_cache = MagicMock() mock_cache.get.return_value = None app.state.redis_cache = mock_cache app.state.redis_cache_ttl = 30 app.state.redis_cache_ttl_dashboard = 60 @cached("dashboard/stats", ttl_setting="redis_cache_ttl_dashboard") async def handler(request: Request): return {"total_nodes": 10} scope = { "type": "http", "query_string": b"", "headers": [], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() await handler(request=request) call_args = mock_cache.set.call_args assert call_args[0][2] == 60 # TTL should be 60, not 30 async def test_serializes_pydantic_model_result(self): app = FastAPI() mock_cache = MagicMock() mock_cache.get.return_value = None app.state.redis_cache = mock_cache app.state.redis_cache_ttl = 30 @cached("nodes") async def handler(request: Request): return _SampleModel(items=[1, 2], total=2) scope = { "type": "http", "query_string": b"", "headers": [], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() result = await handler(request=request) assert result.items == [1, 2] set_call = mock_cache.set.call_args stored = json.loads(set_call[0][1]) assert stored == {"items": [1, 2], "total": 2} async def test_serializes_dict_result(self): app = FastAPI() mock_cache = MagicMock() mock_cache.get.return_value = None app.state.redis_cache = mock_cache app.state.redis_cache_ttl = 30 @cached("nodes") async def handler(request: Request): return {"key": "value"} scope = { "type": "http", "query_string": b"", "headers": [], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() result = await handler(request=request) assert result == {"key": "value"} set_call = mock_cache.set.call_args assert set_call[0][1] == '{"key": "value"}' async def test_serializes_other_result_with_default_str(self): app = FastAPI() mock_cache = MagicMock() mock_cache.get.return_value = None app.state.redis_cache = mock_cache app.state.redis_cache_ttl = 30 @cached("nodes") async def handler(request: Request): return ["a", "b"] scope = { "type": "http", "query_string": b"", "headers": [], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() result = await handler(request=request) assert result == ["a", "b"] set_call = mock_cache.set.call_args assert json.loads(set_call[0][1]) == ["a", "b"] async def test_cache_set_error_falls_through(self): app = FastAPI() mock_cache = MagicMock() mock_cache.get.return_value = None mock_cache.set.side_effect = Exception("set error") app.state.redis_cache = mock_cache app.state.redis_cache_ttl = 30 @cached("nodes") async def handler(request: Request): return {"items": ["ok"]} scope = { "type": "http", "query_string": b"", "headers": [], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() result = await handler(request=request) assert result == {"items": ["ok"]} async def test_no_cache_on_app_state_calls_handler(self): app = FastAPI() @cached("nodes") async def handler(request: Request): return {"items": ["direct"]} scope = { "type": "http", "query_string": b"", "headers": [], "app": app, } from starlette.datastructures import State request = Request(scope) request._state = State() result = await handler(request=request) assert result == {"items": ["direct"]} class TestLifespanRedis: async def test_lifespan_creates_null_cache_when_disabled(self): import meshcore_hub.api.app as app_module from meshcore_hub.api.app import lifespan app = FastAPI() app.state.database_url = "sqlite:///./test.db" app.state.redis_enabled = False app_module._db_manager = None with patch.object(app_module, "DatabaseManager", return_value=MagicMock()): async with lifespan(app): assert isinstance(app.state.redis_cache, NullCache) async def test_lifespan_creates_redis_cache_when_enabled(self): import meshcore_hub.api.app as app_module from meshcore_hub.api.app import lifespan app = FastAPI() app.state.database_url = "sqlite:///./test.db" app.state.redis_enabled = True app.state.redis_host = "redis-host" app.state.redis_port = 6380 app.state.redis_db = 1 app.state.redis_password = "secret" app.state.redis_key_prefix = "myprefix" app_module._db_manager = None with patch.object(app_module, "DatabaseManager", return_value=MagicMock()): with patch("meshcore_hub.common.redis.RedisCacheBackend") as mock_cls: mock_instance = MagicMock() mock_cls.return_value = mock_instance async with lifespan(app): mock_cls.assert_called_once_with( host="redis-host", port=6380, db=1, password="secret", key_prefix="myprefix", ) assert app.state.redis_cache is mock_instance async def test_lifespan_closes_cache_on_shutdown(self): import meshcore_hub.api.app as app_module from meshcore_hub.api.app import lifespan app = FastAPI() app.state.database_url = "sqlite:///./test.db" app.state.redis_enabled = True mock_cache = MagicMock() mock_db_manager = MagicMock() app_module._db_manager = None with patch.object(app_module, "DatabaseManager", return_value=mock_db_manager): with patch( "meshcore_hub.common.redis.RedisCacheBackend", return_value=mock_cache, ): async with lifespan(app): pass mock_cache.close.assert_called_once() mock_db_manager.dispose.assert_called_once() class TestXCacheMiddleware: def test_adds_x_cache_header_on_hit(self, client_no_auth): mock_cache = MagicMock() mock_cache.get.return_value = json.dumps( {"items": [], "total": 0, "limit": 50, "offset": 0} ) client_no_auth.app.state.redis_cache = mock_cache client_no_auth.app.state.redis_cache_ttl = 30 response = client_no_auth.get("/api/v1/nodes") assert response.headers.get("x-cache") == "HIT" def test_adds_x_cache_header_on_miss(self, client_no_auth): mock_cache = MagicMock() mock_cache.get.return_value = None client_no_auth.app.state.redis_cache = mock_cache client_no_auth.app.state.redis_cache_ttl = 30 response = client_no_auth.get("/api/v1/nodes") assert response.headers.get("x-cache") == "MISS" def test_no_x_cache_header_when_no_cache_status(self, client_no_auth): if hasattr(client_no_auth.app.state, "redis_cache"): del client_no_auth.app.state.redis_cache response = client_no_auth.get("/api/v1/nodes") assert "x-cache" not in response.headers class TestHealthReadyRedis: def test_health_ready_omits_redis_when_disabled(self, client_no_auth): if hasattr(client_no_auth.app.state, "redis_cache"): del client_no_auth.app.state.redis_cache client_no_auth.app.state.redis_enabled = False response = client_no_auth.get("/health/ready") assert response.status_code == 200 data = response.json() assert "redis" not in data def test_health_ready_reports_connected(self, client_no_auth): mock_cache = MagicMock() mock_cache.ping.return_value = True client_no_auth.app.state.redis_cache = mock_cache client_no_auth.app.state.redis_enabled = True response = client_no_auth.get("/health/ready") assert response.status_code == 200 assert response.json()["redis"] == "connected" def test_health_ready_reports_unreachable(self, client_no_auth): mock_cache = MagicMock() mock_cache.ping.return_value = False client_no_auth.app.state.redis_cache = mock_cache client_no_auth.app.state.redis_enabled = True response = client_no_auth.get("/health/ready") assert response.status_code == 200 assert response.json()["redis"] == "unreachable" class TestCliRedis: def test_redis_enabled_shows_banner(self): from click.testing import CliRunner from meshcore_hub.api.cli import api runner = CliRunner() with patch("uvicorn.run"): with patch("meshcore_hub.common.config.get_api_settings") as mock_settings: mock_settings.return_value = MagicMock( data_home="/tmp/test", effective_database_url="sqlite:///test.db", ) result = runner.invoke( api, ["--redis-enabled", "--redis-host", "myredis"], catch_exceptions=False, ) assert "Redis enabled: True" in result.output assert "Redis: myredis:6379/0" in result.output assert "Redis key prefix: hub" in result.output def test_redis_disabled_hides_details(self): from click.testing import CliRunner from meshcore_hub.api.cli import api runner = CliRunner() with patch("uvicorn.run"): with patch("meshcore_hub.common.config.get_api_settings") as mock_settings: mock_settings.return_value = MagicMock( data_home="/tmp/test", effective_database_url="sqlite:///test.db", ) result = runner.invoke(api, catch_exceptions=False) assert "Redis enabled: False" in result.output assert "Redis:" not in result.output.replace("Redis enabled: False", "") def test_redis_params_passed_to_create_app(self): from click.testing import CliRunner from meshcore_hub.api.cli import api runner = CliRunner() with patch("uvicorn.run"): with patch("meshcore_hub.common.config.get_api_settings") as mock_settings: mock_settings.return_value = MagicMock( data_home="/tmp/test", effective_database_url="sqlite:///test.db", ) with patch("meshcore_hub.api.app.create_app") as mock_create_app: mock_create_app.return_value = MagicMock() runner.invoke( api, [ "--redis-enabled", "--redis-host", "rhost", "--redis-port", "6380", "--redis-db", "2", "--redis-password", "pw", "--redis-key-prefix", "pre", "--redis-cache-ttl", "60", "--redis-cache-ttl-dashboard", "120", ], catch_exceptions=False, ) call_kwargs = mock_create_app.call_args[1] assert call_kwargs["redis_enabled"] is True assert call_kwargs["redis_host"] == "rhost" assert call_kwargs["redis_port"] == 6380 assert call_kwargs["redis_db"] == 2 assert call_kwargs["redis_password"] == "pw" assert call_kwargs["redis_key_prefix"] == "pre" assert call_kwargs["redis_cache_ttl"] == 60 assert call_kwargs["redis_cache_ttl_dashboard"] == 120 class TestKeyBuilders: def test_dashboard_stats_key_builder(self): from meshcore_hub.api.routes.dashboard import _dashboard_stats_key_builder with patch( "meshcore_hub.api.routes.dashboard.resolve_user_role", return_value="admin", ): scope = { "type": "http", "query_string": b"days=30", "headers": [], } request = Request(scope) key = _dashboard_stats_key_builder(request) assert key == "dashboard/stats:role=admin:days=30" def test_dashboard_stats_key_builder_anonymous(self): from meshcore_hub.api.routes.dashboard import _dashboard_stats_key_builder with patch( "meshcore_hub.api.routes.dashboard.resolve_user_role", return_value=None, ): scope = { "type": "http", "query_string": b"", "headers": [], } request = Request(scope) key = _dashboard_stats_key_builder(request) assert "role=anonymous" in key def test_dashboard_msg_activity_key_builder(self): from meshcore_hub.api.routes.dashboard import ( _dashboard_msg_activity_key_builder, ) with patch( "meshcore_hub.api.routes.dashboard.resolve_user_role", return_value="member", ): scope = { "type": "http", "query_string": b"days=7", "headers": [], } request = Request(scope) key = _dashboard_msg_activity_key_builder(request) assert key == "dashboard/message-activity:role=member:days=7" def test_channels_key_builder(self): from meshcore_hub.api.routes.channels import _channels_key_builder with patch( "meshcore_hub.api.routes.channels.resolve_user_role", return_value="operator", ): scope = { "type": "http", "query_string": b"", "headers": [], } request = Request(scope) key = _channels_key_builder(request) assert key == "channels:role=operator:" def test_channels_key_builder_anonymous(self): from meshcore_hub.api.routes.channels import _channels_key_builder with patch( "meshcore_hub.api.routes.channels.resolve_user_role", return_value=None, ): scope = { "type": "http", "query_string": b"", "headers": [], } request = Request(scope) key = _channels_key_builder(request) assert "role=anonymous" in key def test_messages_key_builder(self): from meshcore_hub.api.routes.messages import _messages_key_builder with patch( "meshcore_hub.api.routes.messages.resolve_user_role", return_value="admin", ): scope = { "type": "http", "query_string": b"limit=10&offset=0", "headers": [], } request = Request(scope) key = _messages_key_builder(request) assert "role=admin" in key assert "limit=10" in key