Compare commits

19 Commits

Author SHA1 Message Date
JingleManSweep
624fa458ac Merge pull request #66 from ipnet-mesh/chore/fix-sqlite-path-exists
Ensure SQLite database path/subdirectories exist before initialising …
2026-01-15 17:36:58 +00:00
Louis King
309d575fc0 Ensure SQLite database path/subdirectories exist before initialising database 2026-01-15 17:32:56 +00:00
Louis King
f7b4df13a7 Added more test coverage 2026-01-12 21:00:02 +00:00
Louis King
13bae5c8d7 Added more test coverage 2026-01-12 20:34:53 +00:00
Louis King
8a6b4d8e88 Tidying 2026-01-12 20:02:45 +00:00
JingleManSweep
b67e1b5b2b Merge pull request #65 from ipnet-mesh/claude/plan-member-editor-BwkcS
Plan Member Editor for Organization Management
2026-01-12 19:59:32 +00:00
Louis King
d4e3dc0399 Local tweaks 2026-01-12 19:59:14 +00:00
Claude
7f0adfa6a7 Implement Member Editor admin interface
Add a complete CRUD interface for managing network members at /a/members,
following the proven pattern established by the Tag Editor.

Changes:
- Add member routes to admin.py (GET, POST create/update/delete)
- Create admin/members.html template with member table, forms, and modals
- Add Members navigation card to admin index page
- Include proper authentication checks and flash message handling
- Fix mypy type hints for optional form fields

The Member Editor allows admins to:
- View all network members in a sortable table
- Create new members with all fields (member_id, name, callsign, role, contact, description)
- Edit existing members via modal dialog
- Delete members with confirmation
- Client-side validation for member_id format (alphanumeric + underscore)

All backend API infrastructure (models, schemas, routes) was already implemented.
This is purely a web UI layer built on top of the existing /api/v1/members endpoints.
2026-01-12 19:41:56 +00:00
Claude
94b03b49d9 Add comprehensive Member Editor implementation plan
Create detailed plan for building a Member Editor admin interface at /a/members.
The plan follows the proven Tag Editor pattern and includes:

- Complete route structure for CRUD operations
- Full HTML template layout with modals and forms
- JavaScript event handlers for edit/delete actions
- Integration with existing Member API endpoints
- Testing checklist and acceptance criteria

All backend infrastructure (API, models, schemas) already exists.
This is purely a web UI implementation task estimated at 2-3 hours.
2026-01-12 19:33:13 +00:00
Louis King
20d75fe041 Add bulk copy and delete all tags for node replacement workflow
When replacing a node device, users can now:
- Copy All: Copy all tags to a new node (skips existing tags)
- Delete All: Remove all tags from a node after migration

New API endpoints:
- POST /api/v1/nodes/{pk}/tags/copy-to/{dest_pk}
- DELETE /api/v1/nodes/{pk}/tags

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 14:46:51 +00:00
Louis King
307f3935e0 Add access denied page for unauthenticated admin access
When users try to access /a/ without valid OAuth2Proxy headers (e.g.,
GitHub account not in org), they now see a friendly 403 page instead
of a 500 error. Added authentication checks to all admin routes.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-11 13:34:03 +00:00
Louis King
6901bafb02 Tidying Tag Editor layout 2026-01-11 13:13:22 +00:00
JingleManSweep
e595dc2b27 Merge pull request #63 from ipnet-mesh/claude/admin-node-tags-interface-pHbKm
Add admin interface for managing node tags
2026-01-11 12:51:56 +00:00
Louis King
ed2cf09ff3 Improve admin UI and remove unused coordinate tag type
- Replace node type badge with icon in admin tag editor
- Add Edit/Add Tags button on node detail page (when admin enabled and authenticated)
- Remove automatic seed container startup to prevent overwriting user changes
- Remove unused 'coordinate' value type from node tags (only string, number, boolean remain)
2026-01-11 12:49:34 +00:00
Claude
bec736a894 Sort node dropdown alphabetically in admin interface
Nodes in the dropdown are now sorted alphabetically by name,
with unnamed nodes appearing at the end.
2026-01-11 12:01:11 +00:00
Claude
1457360703 Use API_ADMIN_KEY for web service to enable admin operations
The web admin interface needs write permissions to create, update,
move, and delete node tags. Changed to use API_ADMIN_KEY with
fallback to API_READ_KEY if admin key is not configured.
2026-01-11 11:55:15 +00:00
Claude
d8a0f2abb8 Fix security vulnerabilities and add validation
- Fix XSS vulnerability by using data attributes instead of inline
  onclick handlers in node_tags.html template
- Fix URL injection by using urlencode for all redirect URL parameters
- Add validation to reject moves where source and destination nodes
  are the same (returns 400 Bad Request)
- Add error handling for response.json() calls that may fail
- Add missing test coverage for update endpoint error scenarios
2026-01-11 11:51:57 +00:00
Claude
367f838371 Add admin interface for managing node tags
Implement CRUD operations for NodeTags in the admin interface:

- Add NodeTagMove schema for moving tags between nodes
- Add PUT /nodes/{public_key}/tags/{key}/move API endpoint
- Add web routes at /a/node-tags for tag management
- Create admin templates with node selector and tag management UI
- Support editing, adding, moving, and deleting tags via API calls
- Add comprehensive tests for new functionality

The interface allows selecting a node from a dropdown, viewing its
tags, and performing all CRUD operations including moving a tag
to a different node without having to delete and recreate it.
2026-01-11 01:34:07 +00:00
Louis King
741dd3ce84 Initial admin commit 2026-01-11 00:42:57 +00:00
34 changed files with 3575 additions and 36 deletions

View File

@@ -458,6 +458,7 @@ Key variables:
- `MQTT_HOST`, `MQTT_PORT`, `MQTT_PREFIX` - MQTT broker connection
- `MQTT_TLS` - Enable TLS/SSL for MQTT (default: `false`)
- `API_READ_KEY`, `API_ADMIN_KEY` - API authentication keys
- `WEB_ADMIN_ENABLED` - Enable admin interface at /a/ (default: `false`, requires auth proxy)
- `LOG_LEVEL` - Logging verbosity
The database defaults to `sqlite:///{DATA_HOME}/collector/meshcore.db` and does not typically need to be configured.
@@ -486,7 +487,7 @@ The database can be seeded with node tags and network members from YAML files in
- `node_tags.yaml` - Node tag definitions (keyed by public_key)
- `members.yaml` - Network member definitions
Seeding is a separate process from the collector and must be run explicitly:
**Important:** Seeding is NOT automatic and must be run explicitly. This prevents seed files from overwriting user changes made via the admin UI.
```bash
# Native CLI
@@ -496,6 +497,8 @@ meshcore-hub collector seed
docker compose --profile seed up
```
**Note:** Once the admin UI is enabled (`WEB_ADMIN_ENABLED=true`), tags should be managed through the web interface rather than seed files.
### Webhook Configuration
The collector supports forwarding events to external HTTP endpoints:

View File

@@ -376,6 +376,7 @@ The collector automatically cleans up old event data and inactive nodes:
| `WEB_HOST` | `0.0.0.0` | Web server bind address |
| `WEB_PORT` | `8080` | Web server port |
| `API_BASE_URL` | `http://localhost:8000` | API endpoint URL |
| `WEB_ADMIN_ENABLED` | `false` | Enable admin interface at /a/ (requires auth proxy) |
| `NETWORK_NAME` | `MeshCore Network` | Display name for the network |
| `NETWORK_CITY` | *(none)* | City where network is located |
| `NETWORK_COUNTRY` | *(none)* | Country code (ISO 3166-1 alpha-2) |
@@ -470,21 +471,18 @@ Tags are keyed by public key in YAML format:
fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210:
friendly_name: Oakland Repeater
altitude: 150
location:
value: "37.8044,-122.2712"
type: coordinate
```
Tag values can be:
- **YAML primitives** (auto-detected type): strings, numbers, booleans
- **Explicit type** (for special types like coordinate):
- **Explicit type** (when you need to force a specific type):
```yaml
location:
value: "37.7749,-122.4194"
type: coordinate
altitude:
value: "150"
type: number
```
Supported types: `string`, `number`, `boolean`, `coordinate`
Supported types: `string`, `number`, `boolean`
### Import Tags Manually

View File

@@ -139,7 +139,7 @@ services:
- core
restart: unless-stopped
depends_on:
seed:
db-migrate:
condition: service_completed_successfully
volumes:
- ${DATA_HOME:-./data}:/data
@@ -196,7 +196,7 @@ services:
- core
restart: unless-stopped
depends_on:
seed:
db-migrate:
condition: service_completed_successfully
collector:
condition: service_started
@@ -249,9 +249,12 @@ services:
environment:
- LOG_LEVEL=${LOG_LEVEL:-INFO}
- API_BASE_URL=http://api:8000
- API_KEY=${API_READ_KEY:-}
# Use ADMIN key to allow write operations from admin interface
# Falls back to READ key if ADMIN key is not set
- API_KEY=${API_ADMIN_KEY:-${API_READ_KEY:-}}
- WEB_HOST=0.0.0.0
- WEB_PORT=8080
- WEB_ADMIN_ENABLED=${WEB_ADMIN_ENABLED:-false}
- NETWORK_NAME=${NETWORK_NAME:-MeshCore Network}
- NETWORK_CITY=${NETWORK_CITY:-}
- NETWORK_COUNTRY=${NETWORK_COUNTRY:-}
@@ -292,7 +295,10 @@ services:
command: ["db", "upgrade"]
# ==========================================================================
# Seed Data - Import node_tags.json and members.json from SEED_HOME
# Seed Data - Import node_tags.yaml and members.yaml from SEED_HOME
# NOTE: This is NOT run automatically. Use --profile seed to run explicitly.
# Since tags are now managed via the admin UI, automatic seeding would
# overwrite user changes.
# ==========================================================================
seed:
image: ghcr.io/ipnet-mesh/meshcore-hub:${IMAGE_VERSION:-latest}
@@ -301,8 +307,6 @@ services:
dockerfile: Dockerfile
container_name: meshcore-seed
profiles:
- all
- core
- seed
restart: "no"
depends_on:
@@ -319,7 +323,7 @@ services:
- LOG_LEVEL=${LOG_LEVEL:-INFO}
# Explicitly unset to use DATA_HOME-based default path
- DATABASE_URL=
# Imports both node_tags.json and members.json if they exist
# Imports both node_tags.yaml and members.yaml if they exist
command: ["collector", "seed"]
# ==========================================================================

View File

@@ -7,12 +7,12 @@
# elevation: 150 # number
# is_online: true # boolean
#
# - Explicit type (for special types like coordinate):
# location:
# value: "37.7749,-122.4194"
# type: coordinate
# - Explicit type (when you need to force a specific type):
# altitude:
# value: "150"
# type: number
#
# Supported types: string, number, boolean, coordinate
# Supported types: string, number, boolean
0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef:
friendly_name: Gateway Node

View File

@@ -6,7 +6,13 @@ from sqlalchemy import select
from meshcore_hub.api.auth import RequireAdmin, RequireRead
from meshcore_hub.api.dependencies import DbSession
from meshcore_hub.common.models import Node, NodeTag
from meshcore_hub.common.schemas.nodes import NodeTagCreate, NodeTagRead, NodeTagUpdate
from meshcore_hub.common.schemas.nodes import (
NodeTagCreate,
NodeTagMove,
NodeTagRead,
NodeTagsCopyResult,
NodeTagUpdate,
)
router = APIRouter()
@@ -130,6 +136,131 @@ async def update_node_tag(
return NodeTagRead.model_validate(node_tag)
@router.put("/nodes/{public_key}/tags/{key}/move", response_model=NodeTagRead)
async def move_node_tag(
_: RequireAdmin,
session: DbSession,
public_key: str,
key: str,
data: NodeTagMove,
) -> NodeTagRead:
"""Move a node tag to a different node."""
# Check if source and destination are the same
if public_key == data.new_public_key:
raise HTTPException(
status_code=400,
detail="Source and destination nodes are the same",
)
# Find source node
source_query = select(Node).where(Node.public_key == public_key)
source_node = session.execute(source_query).scalar_one_or_none()
if not source_node:
raise HTTPException(status_code=404, detail="Source node not found")
# Find tag
tag_query = select(NodeTag).where(
(NodeTag.node_id == source_node.id) & (NodeTag.key == key)
)
node_tag = session.execute(tag_query).scalar_one_or_none()
if not node_tag:
raise HTTPException(status_code=404, detail="Tag not found")
# Find destination node
dest_query = select(Node).where(Node.public_key == data.new_public_key)
dest_node = session.execute(dest_query).scalar_one_or_none()
if not dest_node:
raise HTTPException(status_code=404, detail="Destination node not found")
# Check if tag already exists on destination node
conflict_query = select(NodeTag).where(
(NodeTag.node_id == dest_node.id) & (NodeTag.key == key)
)
conflict = session.execute(conflict_query).scalar_one_or_none()
if conflict:
raise HTTPException(
status_code=409,
detail=f"Tag '{key}' already exists on destination node",
)
# Move tag to destination node
node_tag.node_id = dest_node.id
session.commit()
session.refresh(node_tag)
return NodeTagRead.model_validate(node_tag)
@router.post(
"/nodes/{public_key}/tags/copy-to/{dest_public_key}",
response_model=NodeTagsCopyResult,
)
async def copy_all_tags(
_: RequireAdmin,
session: DbSession,
public_key: str,
dest_public_key: str,
) -> NodeTagsCopyResult:
"""Copy all tags from one node to another.
Tags that already exist on the destination node are skipped.
"""
# Check if source and destination are the same
if public_key == dest_public_key:
raise HTTPException(
status_code=400,
detail="Source and destination nodes are the same",
)
# Find source node
source_query = select(Node).where(Node.public_key == public_key)
source_node = session.execute(source_query).scalar_one_or_none()
if not source_node:
raise HTTPException(status_code=404, detail="Source node not found")
# Find destination node
dest_query = select(Node).where(Node.public_key == dest_public_key)
dest_node = session.execute(dest_query).scalar_one_or_none()
if not dest_node:
raise HTTPException(status_code=404, detail="Destination node not found")
# Get existing tags on destination node
existing_query = select(NodeTag.key).where(NodeTag.node_id == dest_node.id)
existing_keys = set(session.execute(existing_query).scalars().all())
# Copy tags
copied = 0
skipped_keys = []
for tag in source_node.tags:
if tag.key in existing_keys:
skipped_keys.append(tag.key)
continue
new_tag = NodeTag(
node_id=dest_node.id,
key=tag.key,
value=tag.value,
value_type=tag.value_type,
)
session.add(new_tag)
copied += 1
session.commit()
return NodeTagsCopyResult(
copied=copied,
skipped=len(skipped_keys),
skipped_keys=skipped_keys,
)
@router.delete("/nodes/{public_key}/tags/{key}", status_code=204)
async def delete_node_tag(
_: RequireAdmin,
@@ -156,3 +287,27 @@ async def delete_node_tag(
session.delete(node_tag)
session.commit()
@router.delete("/nodes/{public_key}/tags")
async def delete_all_node_tags(
_: RequireAdmin,
session: DbSession,
public_key: str,
) -> dict:
"""Delete all tags for a node."""
# Find node
node_query = select(Node).where(Node.public_key == public_key)
node = session.execute(node_query).scalar_one_or_none()
if not node:
raise HTTPException(status_code=404, detail="Node not found")
# Count and delete all tags
count = len(node.tags)
for tag in node.tags:
session.delete(tag)
session.commit()
return {"deleted": count}

View File

@@ -433,12 +433,12 @@ def import_tags_cmd(
\b
0123456789abcdef...:
friendly_name: My Node
location:
value: "52.0,1.0"
type: coordinate
altitude:
value: "150"
type: number
active:
value: "true"
type: boolean
Shorthand is also supported (string values with default type):
@@ -447,7 +447,7 @@ def import_tags_cmd(
friendly_name: My Node
role: gateway
Supported types: string, number, boolean, coordinate
Supported types: string, number, boolean
"""
from pathlib import Path

View File

@@ -19,7 +19,7 @@ class TagValue(BaseModel):
"""Schema for a tag value with type."""
value: str | None = None
type: str = Field(default="string", pattern=r"^(string|number|boolean|coordinate)$")
type: str = Field(default="string", pattern=r"^(string|number|boolean)$")
class NodeTags(BaseModel):

View File

@@ -253,6 +253,12 @@ class WebSettings(CommonSettings):
web_host: str = Field(default="0.0.0.0", description="Web server host")
web_port: int = Field(default=8080, description="Web server port")
# Admin interface (disabled by default for security)
web_admin_enabled: bool = Field(
default=False,
description="Enable admin interface at /a/ (requires OAuth2Proxy in front)",
)
# API connection
api_base_url: str = Field(
default="http://localhost:8000",

View File

@@ -98,6 +98,15 @@ class DatabaseManager:
echo: Enable SQL query logging
"""
self.database_url = database_url
# Ensure parent directory exists for SQLite databases
if database_url.startswith("sqlite:///"):
from pathlib import Path
# Extract path from sqlite:///path/to/db.sqlite
db_path = Path(database_url.replace("sqlite:///", ""))
db_path.parent.mkdir(parents=True, exist_ok=True)
self.engine = create_database_engine(database_url, echo=echo)
self.session_factory = create_session_factory(self.engine)

View File

@@ -21,7 +21,7 @@ class NodeTag(Base, UUIDMixin, TimestampMixin):
node_id: Foreign key to nodes table
key: Tag name/key
value: Tag value (stored as text, can be JSON for typed values)
value_type: Type hint (string, number, boolean, coordinate)
value_type: Type hint (string, number, boolean)
created_at: Record creation timestamp
updated_at: Record update timestamp
"""

View File

@@ -19,7 +19,7 @@ class NodeTagCreate(BaseModel):
default=None,
description="Tag value",
)
value_type: Literal["string", "number", "boolean", "coordinate"] = Field(
value_type: Literal["string", "number", "boolean"] = Field(
default="string",
description="Value type hint",
)
@@ -32,12 +32,33 @@ class NodeTagUpdate(BaseModel):
default=None,
description="Tag value",
)
value_type: Optional[Literal["string", "number", "boolean", "coordinate"]] = Field(
value_type: Optional[Literal["string", "number", "boolean"]] = Field(
default=None,
description="Value type hint",
)
class NodeTagMove(BaseModel):
"""Schema for moving a node tag to a different node."""
new_public_key: str = Field(
...,
min_length=64,
max_length=64,
description="Public key of the destination node",
)
class NodeTagsCopyResult(BaseModel):
"""Schema for bulk copy tags result."""
copied: int = Field(..., description="Number of tags copied")
skipped: int = Field(..., description="Number of tags skipped (already exist)")
skipped_keys: list[str] = Field(
default_factory=list, description="Keys of skipped tags"
)
class NodeTagRead(BaseModel):
"""Schema for reading a node tag."""

View File

@@ -50,6 +50,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
def create_app(
api_url: str | None = None,
api_key: str | None = None,
admin_enabled: bool | None = None,
network_name: str | None = None,
network_city: str | None = None,
network_country: str | None = None,
@@ -67,6 +68,7 @@ def create_app(
Args:
api_url: Base URL of the MeshCore Hub API
api_key: API key for authentication
admin_enabled: Enable admin interface at /a/
network_name: Display name for the network
network_city: City where the network is located
network_country: Country where the network is located
@@ -96,6 +98,9 @@ def create_app(
# Store configuration in app state (use args if provided, else settings)
app.state.api_url = api_url or settings.api_base_url
app.state.api_key = api_key or settings.api_key
app.state.admin_enabled = (
admin_enabled if admin_enabled is not None else settings.web_admin_enabled
)
app.state.network_name = network_name or settings.network_name
app.state.network_city = network_city or settings.network_city
app.state.network_country = network_country or settings.network_country
@@ -170,5 +175,6 @@ def get_network_context(request: Request) -> dict:
"network_contact_discord": request.app.state.network_contact_discord,
"network_contact_github": request.app.state.network_contact_github,
"network_welcome_text": request.app.state.network_welcome_text,
"admin_enabled": request.app.state.admin_enabled,
"version": __version__,
}

View File

@@ -9,6 +9,7 @@ from meshcore_hub.web.routes.messages import router as messages_router
from meshcore_hub.web.routes.advertisements import router as advertisements_router
from meshcore_hub.web.routes.map import router as map_router
from meshcore_hub.web.routes.members import router as members_router
from meshcore_hub.web.routes.admin import router as admin_router
# Create main web router
web_router = APIRouter()
@@ -21,5 +22,6 @@ web_router.include_router(messages_router)
web_router.include_router(advertisements_router)
web_router.include_router(map_router)
web_router.include_router(members_router)
web_router.include_router(admin_router)
__all__ = ["web_router"]

View File

@@ -0,0 +1,591 @@
"""Admin page routes."""
import logging
from typing import Any, Optional
from urllib.parse import urlencode
from fastapi import APIRouter, Form, HTTPException, Query, Request
from fastapi.responses import HTMLResponse, RedirectResponse
from httpx import Response
from meshcore_hub.web.app import get_network_context, get_templates
def _build_redirect_url(
public_key: str,
message: Optional[str] = None,
error: Optional[str] = None,
) -> str:
"""Build a properly encoded redirect URL with optional message/error."""
params: dict[str, str] = {"public_key": public_key}
if message:
params["message"] = message
if error:
params["error"] = error
return f"/a/node-tags?{urlencode(params)}"
def _get_error_detail(response: Response) -> str:
"""Safely extract error detail from response JSON."""
try:
data: Any = response.json()
detail: str = data.get("detail", "Unknown error")
return detail
except Exception:
return "Unknown error"
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/a", tags=["admin"])
def _check_admin_enabled(request: Request) -> None:
"""Check if admin interface is enabled, raise 404 if not."""
if not getattr(request.app.state, "admin_enabled", False):
raise HTTPException(status_code=404, detail="Not Found")
def _get_auth_context(request: Request) -> dict:
"""Extract OAuth2Proxy authentication headers."""
return {
"auth_user": request.headers.get("X-Forwarded-User"),
"auth_groups": request.headers.get("X-Forwarded-Groups"),
"auth_email": request.headers.get("X-Forwarded-Email"),
"auth_username": request.headers.get("X-Forwarded-Preferred-Username"),
}
def _is_authenticated(request: Request) -> bool:
"""Check if user is authenticated via OAuth2Proxy headers."""
return bool(
request.headers.get("X-Forwarded-User")
or request.headers.get("X-Forwarded-Email")
)
def _require_auth(request: Request) -> None:
"""Require authentication, raise 403 if not authenticated."""
if not _is_authenticated(request):
raise HTTPException(status_code=403, detail="Access denied")
@router.get("/", response_class=HTMLResponse)
async def admin_home(request: Request) -> HTMLResponse:
"""Render the admin page with OAuth2Proxy user info."""
_check_admin_enabled(request)
templates = get_templates(request)
context = get_network_context(request)
context["request"] = request
context.update(_get_auth_context(request))
# Check if user is authenticated
if not _is_authenticated(request):
return templates.TemplateResponse(
"admin/access_denied.html", context, status_code=403
)
return templates.TemplateResponse("admin/index.html", context)
@router.get("/node-tags", response_class=HTMLResponse)
async def admin_node_tags(
request: Request,
public_key: Optional[str] = Query(None),
message: Optional[str] = Query(None),
error: Optional[str] = Query(None),
) -> HTMLResponse:
"""Admin page for managing node tags."""
_check_admin_enabled(request)
templates = get_templates(request)
context = get_network_context(request)
context["request"] = request
context.update(_get_auth_context(request))
# Check if user is authenticated
if not _is_authenticated(request):
return templates.TemplateResponse(
"admin/access_denied.html", context, status_code=403
)
# Flash messages from redirects
context["message"] = message
context["error"] = error
# Fetch all nodes for dropdown
nodes = []
try:
response = await request.app.state.http_client.get(
"/api/v1/nodes",
params={"limit": 100},
)
if response.status_code == 200:
data = response.json()
nodes = data.get("items", [])
# Sort nodes alphabetically by name (unnamed nodes at the end)
nodes.sort(
key=lambda n: (n.get("name") is None, (n.get("name") or "").lower())
)
except Exception as e:
logger.exception("Failed to fetch nodes: %s", e)
context["error"] = "Failed to fetch nodes"
context["nodes"] = nodes
context["selected_public_key"] = public_key
# Fetch tags for selected node
tags = []
selected_node = None
if public_key:
# Find the selected node in the list
for node in nodes:
if node.get("public_key") == public_key:
selected_node = node
break
try:
response = await request.app.state.http_client.get(
f"/api/v1/nodes/{public_key}/tags",
)
if response.status_code == 200:
tags = response.json()
elif response.status_code == 404:
context["error"] = "Node not found"
except Exception as e:
logger.exception("Failed to fetch tags: %s", e)
context["error"] = "Failed to fetch tags"
context["tags"] = tags
context["selected_node"] = selected_node
return templates.TemplateResponse("admin/node_tags.html", context)
@router.post("/node-tags", response_class=RedirectResponse)
async def admin_create_node_tag(
request: Request,
public_key: str = Form(...),
key: str = Form(...),
value: str = Form(""),
value_type: str = Form("string"),
) -> RedirectResponse:
"""Create a new node tag."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.post(
f"/api/v1/nodes/{public_key}/tags",
json={
"key": key,
"value": value or None,
"value_type": value_type,
},
)
if response.status_code == 201:
redirect_url = _build_redirect_url(
public_key, message=f"Tag '{key}' created successfully"
)
elif response.status_code == 409:
redirect_url = _build_redirect_url(
public_key, error=f"Tag '{key}' already exists"
)
elif response.status_code == 404:
redirect_url = _build_redirect_url(public_key, error="Node not found")
else:
redirect_url = _build_redirect_url(
public_key, error=_get_error_detail(response)
)
except Exception as e:
logger.exception("Failed to create tag: %s", e)
redirect_url = _build_redirect_url(public_key, error="Failed to create tag")
return RedirectResponse(url=redirect_url, status_code=303)
@router.post("/node-tags/update", response_class=RedirectResponse)
async def admin_update_node_tag(
request: Request,
public_key: str = Form(...),
key: str = Form(...),
value: str = Form(""),
value_type: str = Form("string"),
) -> RedirectResponse:
"""Update an existing node tag."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.put(
f"/api/v1/nodes/{public_key}/tags/{key}",
json={
"value": value or None,
"value_type": value_type,
},
)
if response.status_code == 200:
redirect_url = _build_redirect_url(
public_key, message=f"Tag '{key}' updated successfully"
)
elif response.status_code == 404:
redirect_url = _build_redirect_url(
public_key, error=f"Tag '{key}' not found"
)
else:
redirect_url = _build_redirect_url(
public_key, error=_get_error_detail(response)
)
except Exception as e:
logger.exception("Failed to update tag: %s", e)
redirect_url = _build_redirect_url(public_key, error="Failed to update tag")
return RedirectResponse(url=redirect_url, status_code=303)
@router.post("/node-tags/move", response_class=RedirectResponse)
async def admin_move_node_tag(
request: Request,
public_key: str = Form(...),
key: str = Form(...),
new_public_key: str = Form(...),
) -> RedirectResponse:
"""Move a node tag to a different node."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.put(
f"/api/v1/nodes/{public_key}/tags/{key}/move",
json={"new_public_key": new_public_key},
)
if response.status_code == 200:
# Redirect to the destination node after successful move
redirect_url = _build_redirect_url(
new_public_key, message=f"Tag '{key}' moved successfully"
)
elif response.status_code == 404:
# Stay on source node if not found
redirect_url = _build_redirect_url(
public_key, error=_get_error_detail(response)
)
elif response.status_code == 409:
redirect_url = _build_redirect_url(
public_key, error=f"Tag '{key}' already exists on destination node"
)
else:
redirect_url = _build_redirect_url(
public_key, error=_get_error_detail(response)
)
except Exception as e:
logger.exception("Failed to move tag: %s", e)
redirect_url = _build_redirect_url(public_key, error="Failed to move tag")
return RedirectResponse(url=redirect_url, status_code=303)
@router.post("/node-tags/delete", response_class=RedirectResponse)
async def admin_delete_node_tag(
request: Request,
public_key: str = Form(...),
key: str = Form(...),
) -> RedirectResponse:
"""Delete a node tag."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.delete(
f"/api/v1/nodes/{public_key}/tags/{key}",
)
if response.status_code == 204:
redirect_url = _build_redirect_url(
public_key, message=f"Tag '{key}' deleted successfully"
)
elif response.status_code == 404:
redirect_url = _build_redirect_url(
public_key, error=f"Tag '{key}' not found"
)
else:
redirect_url = _build_redirect_url(
public_key, error=_get_error_detail(response)
)
except Exception as e:
logger.exception("Failed to delete tag: %s", e)
redirect_url = _build_redirect_url(public_key, error="Failed to delete tag")
return RedirectResponse(url=redirect_url, status_code=303)
@router.post("/node-tags/copy-all", response_class=RedirectResponse)
async def admin_copy_all_tags(
request: Request,
public_key: str = Form(...),
dest_public_key: str = Form(...),
) -> RedirectResponse:
"""Copy all tags from one node to another."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.post(
f"/api/v1/nodes/{public_key}/tags/copy-to/{dest_public_key}",
)
if response.status_code == 200:
data = response.json()
copied = data.get("copied", 0)
skipped = data.get("skipped", 0)
if skipped > 0:
message = f"Copied {copied} tag(s), skipped {skipped} existing"
else:
message = f"Copied {copied} tag(s) successfully"
# Redirect to destination node to show copied tags
redirect_url = _build_redirect_url(dest_public_key, message=message)
elif response.status_code == 400:
redirect_url = _build_redirect_url(
public_key, error=_get_error_detail(response)
)
elif response.status_code == 404:
redirect_url = _build_redirect_url(
public_key, error=_get_error_detail(response)
)
else:
redirect_url = _build_redirect_url(
public_key, error=_get_error_detail(response)
)
except Exception as e:
logger.exception("Failed to copy tags: %s", e)
redirect_url = _build_redirect_url(public_key, error="Failed to copy tags")
return RedirectResponse(url=redirect_url, status_code=303)
@router.post("/node-tags/delete-all", response_class=RedirectResponse)
async def admin_delete_all_tags(
request: Request,
public_key: str = Form(...),
) -> RedirectResponse:
"""Delete all tags from a node."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.delete(
f"/api/v1/nodes/{public_key}/tags",
)
if response.status_code == 200:
data = response.json()
deleted = data.get("deleted", 0)
message = f"Deleted {deleted} tag(s) successfully"
redirect_url = _build_redirect_url(public_key, message=message)
elif response.status_code == 404:
redirect_url = _build_redirect_url(
public_key, error=_get_error_detail(response)
)
else:
redirect_url = _build_redirect_url(
public_key, error=_get_error_detail(response)
)
except Exception as e:
logger.exception("Failed to delete tags: %s", e)
redirect_url = _build_redirect_url(public_key, error="Failed to delete tags")
return RedirectResponse(url=redirect_url, status_code=303)
def _build_members_redirect_url(
message: Optional[str] = None,
error: Optional[str] = None,
) -> str:
"""Build a properly encoded redirect URL for members page with optional message/error."""
params: dict[str, str] = {}
if message:
params["message"] = message
if error:
params["error"] = error
if params:
return f"/a/members?{urlencode(params)}"
return "/a/members"
@router.get("/members", response_class=HTMLResponse)
async def admin_members(
request: Request,
message: Optional[str] = Query(None),
error: Optional[str] = Query(None),
) -> HTMLResponse:
"""Admin page for managing members."""
_check_admin_enabled(request)
templates = get_templates(request)
context = get_network_context(request)
context["request"] = request
context.update(_get_auth_context(request))
# Check if user is authenticated
if not _is_authenticated(request):
return templates.TemplateResponse(
"admin/access_denied.html", context, status_code=403
)
# Flash messages from redirects
context["message"] = message
context["error"] = error
# Fetch all members
members = []
try:
response = await request.app.state.http_client.get(
"/api/v1/members",
params={"limit": 500},
)
if response.status_code == 200:
data = response.json()
members = data.get("items", [])
# Sort members alphabetically by name
members.sort(key=lambda m: m.get("name", "").lower())
except Exception as e:
logger.exception("Failed to fetch members: %s", e)
context["error"] = "Failed to fetch members"
context["members"] = members
return templates.TemplateResponse("admin/members.html", context)
@router.post("/members", response_class=RedirectResponse)
async def admin_create_member(
request: Request,
name: str = Form(...),
member_id: str = Form(...),
callsign: Optional[str] = Form(None),
role: Optional[str] = Form(None),
description: Optional[str] = Form(None),
contact: Optional[str] = Form(None),
) -> RedirectResponse:
"""Create a new member."""
_check_admin_enabled(request)
_require_auth(request)
try:
# Build request payload
payload = {
"name": name,
"member_id": member_id,
}
if callsign:
payload["callsign"] = callsign
if role:
payload["role"] = role
if description:
payload["description"] = description
if contact:
payload["contact"] = contact
response = await request.app.state.http_client.post(
"/api/v1/members",
json=payload,
)
if response.status_code == 201:
redirect_url = _build_members_redirect_url(
message=f"Member '{name}' created successfully"
)
elif response.status_code == 409:
redirect_url = _build_members_redirect_url(
error=f"Member ID '{member_id}' already exists"
)
else:
redirect_url = _build_members_redirect_url(
error=_get_error_detail(response)
)
except Exception as e:
logger.exception("Failed to create member: %s", e)
redirect_url = _build_members_redirect_url(error="Failed to create member")
return RedirectResponse(url=redirect_url, status_code=303)
@router.post("/members/update", response_class=RedirectResponse)
async def admin_update_member(
request: Request,
id: str = Form(...),
name: Optional[str] = Form(None),
member_id: Optional[str] = Form(None),
callsign: Optional[str] = Form(None),
role: Optional[str] = Form(None),
description: Optional[str] = Form(None),
contact: Optional[str] = Form(None),
) -> RedirectResponse:
"""Update an existing member."""
_check_admin_enabled(request)
_require_auth(request)
try:
# Build update payload (only include non-None fields)
payload: dict[str, str | None] = {}
if name is not None:
payload["name"] = name
if member_id is not None:
payload["member_id"] = member_id
if callsign is not None:
payload["callsign"] = callsign if callsign else None
if role is not None:
payload["role"] = role if role else None
if description is not None:
payload["description"] = description if description else None
if contact is not None:
payload["contact"] = contact if contact else None
response = await request.app.state.http_client.put(
f"/api/v1/members/{id}",
json=payload,
)
if response.status_code == 200:
redirect_url = _build_members_redirect_url(
message="Member updated successfully"
)
elif response.status_code == 404:
redirect_url = _build_members_redirect_url(error="Member not found")
elif response.status_code == 409:
redirect_url = _build_members_redirect_url(
error=f"Member ID '{member_id}' already exists"
)
else:
redirect_url = _build_members_redirect_url(
error=_get_error_detail(response)
)
except Exception as e:
logger.exception("Failed to update member: %s", e)
redirect_url = _build_members_redirect_url(error="Failed to update member")
return RedirectResponse(url=redirect_url, status_code=303)
@router.post("/members/delete", response_class=RedirectResponse)
async def admin_delete_member(
request: Request,
id: str = Form(...),
) -> RedirectResponse:
"""Delete a member."""
_check_admin_enabled(request)
_require_auth(request)
try:
response = await request.app.state.http_client.delete(
f"/api/v1/members/{id}",
)
if response.status_code == 204:
redirect_url = _build_members_redirect_url(
message="Member deleted successfully"
)
elif response.status_code == 404:
redirect_url = _build_members_redirect_url(error="Member not found")
else:
redirect_url = _build_members_redirect_url(
error=_get_error_detail(response)
)
except Exception as e:
logger.exception("Failed to delete member: %s", e)
redirect_url = _build_members_redirect_url(error="Failed to delete member")
return RedirectResponse(url=redirect_url, status_code=303)

View File

@@ -118,12 +118,18 @@ async def node_detail(request: Request, public_key: str) -> HTMLResponse:
logger.warning(f"Failed to fetch node details from API: {e}")
context["api_error"] = str(e)
# Check if admin editing is available
admin_enabled = getattr(request.app.state, "admin_enabled", False)
auth_user = request.headers.get("X-Forwarded-User")
context.update(
{
"node": node,
"advertisements": advertisements,
"telemetry": telemetry,
"public_key": public_key,
"admin_enabled": admin_enabled,
"is_authenticated": bool(auth_user),
}
)

View File

@@ -0,0 +1,20 @@
{% extends "base.html" %}
{% block title %}{{ network_name }} - Access Denied{% endblock %}
{% block content %}
<div class="flex flex-col items-center justify-center min-h-[50vh]">
<div class="text-center">
<svg xmlns="http://www.w3.org/2000/svg" class="h-24 w-24 mx-auto text-error opacity-50 mb-6" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 15v2m-6 4h12a2 2 0 002-2v-6a2 2 0 00-2-2H6a2 2 0 00-2 2v6a2 2 0 002 2zm10-10V7a4 4 0 00-8 0v4h8z" />
</svg>
<h1 class="text-3xl font-bold mb-2">Access Denied</h1>
<p class="text-lg opacity-70 mb-6">You don't have permission to access the admin area.</p>
<p class="text-sm opacity-50 mb-8">Please contact the network administrator if you believe this is an error.</p>
<div class="flex gap-4 justify-center">
<a href="/" class="btn btn-primary">Return Home</a>
<a href="/oauth2/sign_out" class="btn btn-outline">Sign Out</a>
</div>
</div>
</div>
{% endblock %}

View File

@@ -0,0 +1,70 @@
{% extends "base.html" %}
{% block title %}{{ network_name }} - Admin{% endblock %}
{% block content %}
<div class="flex items-center justify-between mb-4">
<div>
<h1 class="text-3xl font-bold">Admin</h1>
<div class="text-sm breadcrumbs">
<ul>
<li><a href="/">Home</a></li>
<li>Admin</li>
</ul>
</div>
</div>
<a href="/oauth2/sign_out" class="btn btn-outline btn-sm">Sign Out</a>
</div>
<!-- Authenticated User Info -->
<div class="flex flex-wrap items-center gap-4 text-sm opacity-70 mb-6">
{% if auth_username or auth_user %}
<span class="flex items-center gap-1.5">
<svg xmlns="http://www.w3.org/2000/svg" class="h-4 w-4" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
d="M16 7a4 4 0 11-8 0 4 4 0 018 0zM12 14a7 7 0 00-7 7h14a7 7 0 00-7-7z" />
</svg>
{{ auth_username or auth_user }}
</span>
{% endif %}
{% if auth_email %}
<span class="flex items-center gap-1.5">
<svg xmlns="http://www.w3.org/2000/svg" class="h-4 w-4" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
d="M3 8l7.89 5.26a2 2 0 002.22 0L21 8M5 19h14a2 2 0 002-2V7a2 2 0 00-2-2H5a2 2 0 00-2 2v10a2 2 0 002 2z" />
</svg>
{{ auth_email }}
</span>
{% endif %}
</div>
<!-- Navigation Cards -->
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
<a href="/a/members" class="card bg-base-100 shadow-xl hover:shadow-2xl transition-shadow">
<div class="card-body">
<h2 class="card-title">
<svg xmlns="http://www.w3.org/2000/svg" class="h-6 w-6" fill="none" viewBox="0 0 24 24"
stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
d="M17 20h5v-2a3 3 0 00-5.356-1.857M17 20H7m10 0v-2c0-.656-.126-1.283-.356-1.857M7 20H2v-2a3 3 0 015.356-1.857M7 20v-2c0-.656.126-1.283.356-1.857m0 0a5.002 5.002 0 019.288 0M15 7a3 3 0 11-6 0 3 3 0 016 0zm6 3a2 2 0 11-4 0 2 2 0 014 0zM7 10a2 2 0 11-4 0 2 2 0 014 0z" />
</svg>
Members
</h2>
<p>Manage network members and operators.</p>
</div>
</a>
<a href="/a/node-tags" class="card bg-base-100 shadow-xl hover:shadow-2xl transition-shadow">
<div class="card-body">
<h2 class="card-title">
<svg xmlns="http://www.w3.org/2000/svg" class="h-6 w-6" fill="none" viewBox="0 0 24 24"
stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2"
d="M7 7h.01M7 3h5c.512 0 1.024.195 1.414.586l7 7a2 2 0 010 2.828l-7 7a2 2 0 01-2.828 0l-7-7A2 2 0 013 12V7a4 4 0 014-4z" />
</svg>
Node Tags
</h2>
<p>Manage custom tags and metadata for network nodes.</p>
</div>
</a>
</div>
{% endblock %}

View File

@@ -0,0 +1,282 @@
{% extends "base.html" %}
{% block title %}{{ network_name }} - Members Admin{% endblock %}
{% block content %}
<div class="flex items-center justify-between mb-6">
<div>
<h1 class="text-3xl font-bold">Members</h1>
<div class="text-sm breadcrumbs">
<ul>
<li><a href="/">Home</a></li>
<li><a href="/a/">Admin</a></li>
<li>Members</li>
</ul>
</div>
</div>
<a href="/oauth2/sign_out" class="btn btn-outline btn-sm">Sign Out</a>
</div>
<!-- Flash Messages -->
{% if message %}
<div class="alert alert-success mb-4">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 12l2 2 4-4m6 2a9 9 0 11-18 0 9 9 0 0118 0z" />
</svg>
<span>{{ message }}</span>
</div>
{% endif %}
{% if error %}
<div class="alert alert-error mb-4">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10 14l2-2m0 0l2-2m-2 2l-2-2m2 2l2 2m7-2a9 9 0 11-18 0 9 9 0 0118 0z" />
</svg>
<span>{{ error }}</span>
</div>
{% endif %}
<!-- Members Table -->
<div class="card bg-base-100 shadow-xl">
<div class="card-body">
<div class="flex justify-between items-center">
<h2 class="card-title">Network Members ({{ members|length }})</h2>
<button class="btn btn-primary btn-sm" onclick="addModal.showModal()">Add Member</button>
</div>
{% if members %}
<div class="overflow-x-auto">
<table class="table table-zebra">
<thead>
<tr>
<th>Member ID</th>
<th>Name</th>
<th>Callsign</th>
<th>Contact</th>
<th class="w-32">Actions</th>
</tr>
</thead>
<tbody>
{% for member in members %}
<tr data-member-id="{{ member.id }}"
data-member-name="{{ member.name }}"
data-member-member-id="{{ member.member_id }}"
data-member-callsign="{{ member.callsign or '' }}"
data-member-description="{{ member.description or '' }}"
data-member-contact="{{ member.contact or '' }}">
<td class="font-mono font-semibold">{{ member.member_id }}</td>
<td>{{ member.name }}</td>
<td>
{% if member.callsign %}
<span class="badge badge-primary">{{ member.callsign }}</span>
{% else %}
<span class="text-base-content/40">-</span>
{% endif %}
</td>
<td class="max-w-xs truncate" title="{{ member.contact or '' }}">{{ member.contact or '-' }}</td>
<td>
<div class="flex gap-1">
<button class="btn btn-ghost btn-xs btn-edit">
Edit
</button>
<button class="btn btn-ghost btn-xs text-error btn-delete">
Delete
</button>
</div>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<div class="text-center py-8 text-base-content/60">
<p>No members configured yet.</p>
<p class="text-sm mt-2">Click "Add Member" to create the first member.</p>
</div>
{% endif %}
</div>
</div>
<!-- Add Modal -->
<dialog id="addModal" class="modal">
<div class="modal-box w-11/12 max-w-2xl">
<h3 class="font-bold text-lg">Add New Member</h3>
<form method="post" action="/a/members" class="py-4">
<div class="grid grid-cols-1 md:grid-cols-2 gap-4">
<div class="form-control">
<label class="label">
<span class="label-text">Member ID <span class="text-error">*</span></span>
</label>
<input type="text" name="member_id" id="add_member_id" class="input input-bordered"
placeholder="walshie86" required maxlength="50"
pattern="[a-zA-Z0-9_]+"
title="Letters, numbers, and underscores only">
<label class="label">
<span class="label-text-alt">Unique identifier (letters, numbers, underscore)</span>
</label>
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Name <span class="text-error">*</span></span>
</label>
<input type="text" name="name" id="add_name" class="input input-bordered"
placeholder="John Smith" required maxlength="255">
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Callsign</span>
</label>
<input type="text" name="callsign" id="add_callsign" class="input input-bordered"
placeholder="VK4ABC" maxlength="20">
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Contact</span>
</label>
<input type="text" name="contact" id="add_contact" class="input input-bordered"
placeholder="john@example.com or phone number" maxlength="255">
</div>
<div class="form-control md:col-span-2">
<label class="label">
<span class="label-text">Description</span>
</label>
<textarea name="description" id="add_description" rows="3" class="textarea textarea-bordered"
placeholder="Brief description of member's role and responsibilities..."></textarea>
</div>
</div>
<div class="modal-action">
<button type="button" class="btn" onclick="addModal.close()">Cancel</button>
<button type="submit" class="btn btn-primary">Add Member</button>
</div>
</form>
</div>
<form method="dialog" class="modal-backdrop">
<button>close</button>
</form>
</dialog>
<!-- Edit Modal -->
<dialog id="editModal" class="modal">
<div class="modal-box w-11/12 max-w-2xl">
<h3 class="font-bold text-lg">Edit Member</h3>
<form method="post" action="/a/members/update" class="py-4">
<input type="hidden" name="id" id="edit_id">
<div class="grid grid-cols-1 md:grid-cols-2 gap-4">
<div class="form-control">
<label class="label">
<span class="label-text">Member ID <span class="text-error">*</span></span>
</label>
<input type="text" name="member_id" id="edit_member_id" class="input input-bordered"
required maxlength="50" pattern="[a-zA-Z0-9_]+"
title="Letters, numbers, and underscores only">
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Name <span class="text-error">*</span></span>
</label>
<input type="text" name="name" id="edit_name" class="input input-bordered"
required maxlength="255">
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Callsign</span>
</label>
<input type="text" name="callsign" id="edit_callsign" class="input input-bordered"
maxlength="20">
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Contact</span>
</label>
<input type="text" name="contact" id="edit_contact" class="input input-bordered"
maxlength="255">
</div>
<div class="form-control md:col-span-2">
<label class="label">
<span class="label-text">Description</span>
</label>
<textarea name="description" id="edit_description" rows="3"
class="textarea textarea-bordered"></textarea>
</div>
</div>
<div class="modal-action">
<button type="button" class="btn" onclick="editModal.close()">Cancel</button>
<button type="submit" class="btn btn-primary">Save Changes</button>
</div>
</form>
</div>
<form method="dialog" class="modal-backdrop">
<button>close</button>
</form>
</dialog>
<!-- Delete Modal -->
<dialog id="deleteModal" class="modal">
<div class="modal-box">
<h3 class="font-bold text-lg">Delete Member</h3>
<form method="post" action="/a/members/delete" class="py-4">
<input type="hidden" name="id" id="delete_id">
<p class="py-4">Are you sure you want to delete member <strong id="delete_member_name"></strong>?</p>
<div class="alert alert-error mb-4">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z" />
</svg>
<span>This action cannot be undone.</span>
</div>
<div class="modal-action">
<button type="button" class="btn" onclick="deleteModal.close()">Cancel</button>
<button type="submit" class="btn btn-error">Delete</button>
</div>
</form>
</div>
<form method="dialog" class="modal-backdrop">
<button>close</button>
</form>
</dialog>
{% endblock %}
{% block extra_scripts %}
<script>
// Use event delegation to handle button clicks safely
document.addEventListener('DOMContentLoaded', function() {
// Edit button handler
document.querySelectorAll('.btn-edit').forEach(function(btn) {
btn.addEventListener('click', function() {
var row = this.closest('tr');
document.getElementById('edit_id').value = row.dataset.memberId;
document.getElementById('edit_member_id').value = row.dataset.memberMemberId;
document.getElementById('edit_name').value = row.dataset.memberName;
document.getElementById('edit_callsign').value = row.dataset.memberCallsign;
document.getElementById('edit_description').value = row.dataset.memberDescription;
document.getElementById('edit_contact').value = row.dataset.memberContact;
editModal.showModal();
});
});
// Delete button handler
document.querySelectorAll('.btn-delete').forEach(function(btn) {
btn.addEventListener('click', function() {
var row = this.closest('tr');
document.getElementById('delete_id').value = row.dataset.memberId;
document.getElementById('delete_member_name').textContent = row.dataset.memberName;
deleteModal.showModal();
});
});
});
</script>
{% endblock %}

View File

@@ -0,0 +1,434 @@
{% extends "base.html" %}
{% block title %}{{ network_name }} - Node Tags Admin{% endblock %}
{% block content %}
<div class="flex items-center justify-between mb-6">
<div>
<h1 class="text-3xl font-bold">Node Tags</h1>
<div class="text-sm breadcrumbs">
<ul>
<li><a href="/">Home</a></li>
<li><a href="/a/">Admin</a></li>
<li>Node Tags</li>
</ul>
</div>
</div>
<a href="/oauth2/sign_out" class="btn btn-outline btn-sm">Sign Out</a>
</div>
<!-- Flash Messages -->
{% if message %}
<div class="alert alert-success mb-4">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 12l2 2 4-4m6 2a9 9 0 11-18 0 9 9 0 0118 0z" />
</svg>
<span>{{ message }}</span>
</div>
{% endif %}
{% if error %}
<div class="alert alert-error mb-4">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10 14l2-2m0 0l2-2m-2 2l-2-2m2 2l2 2m7-2a9 9 0 11-18 0 9 9 0 0118 0z" />
</svg>
<span>{{ error }}</span>
</div>
{% endif %}
<!-- Node Selector -->
<div class="card bg-base-100 shadow-xl mb-6">
<div class="card-body">
<h2 class="card-title">Select Node</h2>
<form method="get" action="/a/node-tags" class="flex gap-4 items-end">
<div class="form-control flex-1">
<label class="label">
<span class="label-text">Node</span>
</label>
<select name="public_key" class="select select-bordered w-full" onchange="this.form.submit()">
<option value="">-- Select a node --</option>
{% for node in nodes %}
<option value="{{ node.public_key }}" {% if node.public_key == selected_public_key %}selected{% endif %}>
{{ node.name or 'Unnamed' }} ({{ node.public_key[:8] }}...{{ node.public_key[-4:] }})
</option>
{% endfor %}
</select>
</div>
<button type="submit" class="btn btn-primary">Load Tags</button>
</form>
</div>
</div>
{% if selected_public_key and selected_node %}
<!-- Selected Node Info -->
<div class="card bg-base-100 shadow-xl mb-6">
<div class="card-body">
<div class="flex justify-between items-start">
<div class="flex items-start gap-3">
<span class="text-2xl" title="{{ selected_node.adv_type or 'Unknown' }}">{% if selected_node.adv_type and selected_node.adv_type|lower == 'chat' %}💬{% elif selected_node.adv_type and selected_node.adv_type|lower == 'repeater' %}📡{% elif selected_node.adv_type and selected_node.adv_type|lower == 'room' %}🪧{% else %}📍{% endif %}</span>
<div>
<h2 class="card-title">{{ selected_node.name or 'Unnamed Node' }}</h2>
<p class="text-sm opacity-70 font-mono">{{ selected_public_key }}</p>
</div>
</div>
<div class="flex gap-2">
{% if tags %}
<button class="btn btn-outline btn-sm" onclick="copyAllModal.showModal()">Copy All</button>
<button class="btn btn-outline btn-error btn-sm" onclick="deleteAllModal.showModal()">Delete All</button>
{% endif %}
<a href="/nodes/{{ selected_public_key }}" class="btn btn-ghost btn-sm">View Node</a>
</div>
</div>
</div>
</div>
<!-- Tags Table -->
<div class="card bg-base-100 shadow-xl mb-6">
<div class="card-body">
<h2 class="card-title">Tags ({{ tags|length }})</h2>
{% if tags %}
<div class="overflow-x-auto">
<table class="table table-zebra">
<thead>
<tr>
<th>Key</th>
<th>Value</th>
<th>Type</th>
<th>Updated</th>
<th class="w-48">Actions</th>
</tr>
</thead>
<tbody>
{% for tag in tags %}
<tr data-tag-key="{{ tag.key }}" data-tag-value="{{ tag.value or '' }}" data-tag-type="{{ tag.value_type }}">
<td class="font-mono font-semibold">{{ tag.key }}</td>
<td class="max-w-xs truncate" title="{{ tag.value or '' }}">{{ tag.value or '-' }}</td>
<td>
<span class="badge badge-ghost badge-sm">{{ tag.value_type }}</span>
</td>
<td class="text-sm opacity-70">{{ tag.updated_at[:10] if tag.updated_at else '-' }}</td>
<td>
<div class="flex gap-1">
<button class="btn btn-ghost btn-xs btn-edit">
Edit
</button>
<button class="btn btn-ghost btn-xs btn-move">
Move
</button>
<button class="btn btn-ghost btn-xs text-error btn-delete">
Delete
</button>
</div>
</td>
</tr>
{% endfor %}
</tbody>
</table>
</div>
{% else %}
<div class="text-center py-8 text-base-content/60">
<p>No tags found for this node.</p>
<p class="text-sm mt-2">Add a new tag below.</p>
</div>
{% endif %}
</div>
</div>
<!-- Add New Tag Form -->
<div class="card bg-base-100 shadow-xl">
<div class="card-body">
<h2 class="card-title">Add New Tag</h2>
<form method="post" action="/a/node-tags" class="grid grid-cols-1 md:grid-cols-4 gap-4">
<input type="hidden" name="public_key" value="{{ selected_public_key }}">
<div class="form-control">
<label class="label">
<span class="label-text">Key</span>
</label>
<input type="text" name="key" class="input input-bordered" placeholder="tag_name" required maxlength="100">
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Value</span>
</label>
<input type="text" name="value" class="input input-bordered" placeholder="tag value">
</div>
<div class="form-control">
<label class="label">
<span class="label-text">Type</span>
</label>
<select name="value_type" class="select select-bordered">
<option value="string">string</option>
<option value="number">number</option>
<option value="boolean">boolean</option>
</select>
</div>
<div class="form-control">
<label class="label">
<span class="label-text">&nbsp;</span>
</label>
<button type="submit" class="btn btn-primary">Add Tag</button>
</div>
</form>
</div>
</div>
<!-- Edit Modal -->
<dialog id="editModal" class="modal">
<div class="modal-box">
<h3 class="font-bold text-lg">Edit Tag</h3>
<form method="post" action="/a/node-tags/update" class="py-4">
<input type="hidden" name="public_key" value="{{ selected_public_key }}">
<input type="hidden" name="key" id="editKey">
<div class="form-control mb-4">
<label class="label">
<span class="label-text">Key</span>
</label>
<input type="text" id="editKeyDisplay" class="input input-bordered" disabled>
</div>
<div class="form-control mb-4">
<label class="label">
<span class="label-text">Value</span>
</label>
<input type="text" name="value" id="editValue" class="input input-bordered">
</div>
<div class="form-control mb-4">
<label class="label">
<span class="label-text">Type</span>
</label>
<select name="value_type" id="editValueType" class="select select-bordered w-full">
<option value="string">string</option>
<option value="number">number</option>
<option value="boolean">boolean</option>
</select>
</div>
<div class="modal-action">
<button type="button" class="btn" onclick="editModal.close()">Cancel</button>
<button type="submit" class="btn btn-primary">Save Changes</button>
</div>
</form>
</div>
<form method="dialog" class="modal-backdrop">
<button>close</button>
</form>
</dialog>
<!-- Move Modal -->
<dialog id="moveModal" class="modal">
<div class="modal-box">
<h3 class="font-bold text-lg">Move Tag to Another Node</h3>
<form method="post" action="/a/node-tags/move" class="py-4">
<input type="hidden" name="public_key" value="{{ selected_public_key }}">
<input type="hidden" name="key" id="moveKey">
<div class="form-control mb-4">
<label class="label">
<span class="label-text">Tag Key</span>
</label>
<input type="text" id="moveKeyDisplay" class="input input-bordered" disabled>
</div>
<div class="form-control mb-4">
<label class="label">
<span class="label-text">Destination Node</span>
</label>
<select name="new_public_key" id="moveDestination" class="select select-bordered w-full" required>
<option value="">-- Select destination node --</option>
{% for node in nodes %}
{% if node.public_key != selected_public_key %}
<option value="{{ node.public_key }}">
{{ node.name or 'Unnamed' }} ({{ node.public_key[:8] }}...{{ node.public_key[-4:] }})
</option>
{% endif %}
{% endfor %}
</select>
</div>
<div class="alert alert-warning mb-4">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z" />
</svg>
<span>This will move the tag from the current node to the destination node.</span>
</div>
<div class="modal-action">
<button type="button" class="btn" onclick="moveModal.close()">Cancel</button>
<button type="submit" class="btn btn-warning">Move Tag</button>
</div>
</form>
</div>
<form method="dialog" class="modal-backdrop">
<button>close</button>
</form>
</dialog>
<!-- Delete Modal -->
<dialog id="deleteModal" class="modal">
<div class="modal-box">
<h3 class="font-bold text-lg">Delete Tag</h3>
<form method="post" action="/a/node-tags/delete" class="py-4">
<input type="hidden" name="public_key" value="{{ selected_public_key }}">
<input type="hidden" name="key" id="deleteKey">
<p class="py-4">Are you sure you want to delete the tag "<span id="deleteKeyDisplay" class="font-mono font-semibold"></span>"?</p>
<div class="alert alert-error mb-4">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z" />
</svg>
<span>This action cannot be undone.</span>
</div>
<div class="modal-action">
<button type="button" class="btn" onclick="deleteModal.close()">Cancel</button>
<button type="submit" class="btn btn-error">Delete</button>
</div>
</form>
</div>
<form method="dialog" class="modal-backdrop">
<button>close</button>
</form>
</dialog>
<!-- Copy All Tags Modal -->
<dialog id="copyAllModal" class="modal">
<div class="modal-box">
<h3 class="font-bold text-lg">Copy All Tags to Another Node</h3>
<form method="post" action="/a/node-tags/copy-all" class="py-4">
<input type="hidden" name="public_key" value="{{ selected_public_key }}">
<p class="mb-4">Copy all {{ tags|length }} tag(s) from <strong>{{ selected_node.name or 'Unnamed' }}</strong> to another node.</p>
<div class="form-control mb-4">
<label class="label">
<span class="label-text">Destination Node</span>
</label>
<select name="dest_public_key" class="select select-bordered w-full" required>
<option value="">-- Select destination node --</option>
{% for node in nodes %}
{% if node.public_key != selected_public_key %}
<option value="{{ node.public_key }}">
{{ node.name or 'Unnamed' }} ({{ node.public_key[:8] }}...{{ node.public_key[-4:] }})
</option>
{% endif %}
{% endfor %}
</select>
</div>
<div class="alert alert-info mb-4">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z" />
</svg>
<span>Tags that already exist on the destination node will be skipped. Original tags remain on this node.</span>
</div>
<div class="modal-action">
<button type="button" class="btn" onclick="copyAllModal.close()">Cancel</button>
<button type="submit" class="btn btn-primary">Copy Tags</button>
</div>
</form>
</div>
<form method="dialog" class="modal-backdrop">
<button>close</button>
</form>
</dialog>
<!-- Delete All Tags Modal -->
<dialog id="deleteAllModal" class="modal">
<div class="modal-box">
<h3 class="font-bold text-lg">Delete All Tags</h3>
<form method="post" action="/a/node-tags/delete-all" class="py-4">
<input type="hidden" name="public_key" value="{{ selected_public_key }}">
<p class="mb-4">Are you sure you want to delete all {{ tags|length }} tag(s) from <strong>{{ selected_node.name or 'Unnamed' }}</strong>?</p>
<div class="alert alert-error mb-4">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z" />
</svg>
<span>This action cannot be undone. All tags will be permanently deleted.</span>
</div>
<div class="modal-action">
<button type="button" class="btn" onclick="deleteAllModal.close()">Cancel</button>
<button type="submit" class="btn btn-error">Delete All Tags</button>
</div>
</form>
</div>
<form method="dialog" class="modal-backdrop">
<button>close</button>
</form>
</dialog>
{% elif selected_public_key and not selected_node %}
<div class="alert alert-warning">
<svg xmlns="http://www.w3.org/2000/svg" class="stroke-current shrink-0 h-6 w-6" fill="none" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z" />
</svg>
<span>Node not found: {{ selected_public_key }}</span>
</div>
{% else %}
<div class="card bg-base-100 shadow-xl">
<div class="card-body text-center py-12">
<svg xmlns="http://www.w3.org/2000/svg" class="h-16 w-16 mx-auto mb-4 opacity-30" fill="none" viewBox="0 0 24 24" stroke="currentColor">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M7 7h.01M7 3h5c.512 0 1.024.195 1.414.586l7 7a2 2 0 010 2.828l-7 7a2 2 0 01-2.828 0l-7-7A2 2 0 013 12V7a4 4 0 014-4z" />
</svg>
<h2 class="text-xl font-semibold mb-2">Select a Node</h2>
<p class="opacity-70">Choose a node from the dropdown above to view and manage its tags.</p>
</div>
</div>
{% endif %}
{% endblock %}
{% block extra_scripts %}
<script>
// Use event delegation to handle button clicks safely
document.addEventListener('DOMContentLoaded', function() {
// Edit button handler
document.querySelectorAll('.btn-edit').forEach(function(btn) {
btn.addEventListener('click', function() {
var row = this.closest('tr');
var key = row.dataset.tagKey;
var value = row.dataset.tagValue;
var valueType = row.dataset.tagType;
document.getElementById('editKey').value = key;
document.getElementById('editKeyDisplay').value = key;
document.getElementById('editValue').value = value;
document.getElementById('editValueType').value = valueType;
editModal.showModal();
});
});
// Move button handler
document.querySelectorAll('.btn-move').forEach(function(btn) {
btn.addEventListener('click', function() {
var row = this.closest('tr');
var key = row.dataset.tagKey;
document.getElementById('moveKey').value = key;
document.getElementById('moveKeyDisplay').value = key;
document.getElementById('moveDestination').selectedIndex = 0;
moveModal.showModal();
});
});
// Delete button handler
document.querySelectorAll('.btn-delete').forEach(function(btn) {
btn.addEventListener('click', function() {
var row = this.closest('tr');
var key = row.dataset.tagKey;
document.getElementById('deleteKey').value = key;
document.getElementById('deleteKeyDisplay').textContent = key;
deleteModal.showModal();
});
});
});
</script>
{% endblock %}

View File

@@ -114,7 +114,7 @@
<a href="{{ network_contact_github }}" target="_blank" rel="noopener noreferrer" class="link link-hover">GitHub</a>
{% endif %}
</p>
<p class="text-xs opacity-50 mt-2">Powered by <a href="https://github.com/ipnet-mesh/meshcore-hub" target="_blank" rel="noopener noreferrer" class="link link-hover">MeshCore Hub</a> {{ version }}</p>
<p class="text-xs opacity-50 mt-2">{% if admin_enabled %}<a href="/a/" class="link link-hover">Admin</a> | {% endif %}Powered by <a href="https://github.com/ipnet-mesh/meshcore-hub" target="_blank" rel="noopener noreferrer" class="link link-hover">MeshCore Hub</a> {{ version }}</p>
</aside>
</footer>

View File

@@ -97,9 +97,10 @@
<div class="grid grid-cols-1 {% if ns_map.lat and ns_map.lon %}lg:grid-cols-2{% endif %} gap-6 mt-6">
<!-- Tags -->
{% if node.tags %}
{% if node.tags or (admin_enabled and is_authenticated) %}
<div>
<h3 class="font-semibold opacity-70 mb-2">Tags</h3>
{% if node.tags %}
<div class="overflow-x-auto">
<table class="table table-compact w-full">
<thead>
@@ -120,6 +121,14 @@
</tbody>
</table>
</div>
{% else %}
<p class="text-sm opacity-70 mb-2">No tags defined.</p>
{% endif %}
{% if admin_enabled and is_authenticated %}
<div class="mt-3">
<a href="/a/node-tags?public_key={{ node.public_key }}" class="btn btn-sm btn-outline">{% if node.tags %}Edit Tags{% else %}Add Tags{% endif %}</a>
</div>
{% endif %}
</div>
{% endif %}

View File

@@ -20,6 +20,7 @@ from meshcore_hub.common.database import DatabaseManager
from meshcore_hub.common.models import (
Advertisement,
Base,
Member,
Message,
Node,
NodeTag,
@@ -264,3 +265,147 @@ def sample_trace_path(api_db_session):
api_db_session.commit()
api_db_session.refresh(trace)
return trace
@pytest.fixture
def sample_member(api_db_session):
"""Create a sample member in the database."""
member = Member(
member_id="alice",
name="Alice Smith",
callsign="W1ABC",
role="Admin",
description="Network administrator",
contact="alice@example.com",
)
api_db_session.add(member)
api_db_session.commit()
api_db_session.refresh(member)
return member
@pytest.fixture
def receiver_node(api_db_session):
"""Create a receiver node in the database."""
node = Node(
public_key="receiver123receiver123receiver12",
name="Receiver Node",
adv_type="REPEATER",
first_seen=datetime.now(timezone.utc),
last_seen=datetime.now(timezone.utc),
)
api_db_session.add(node)
api_db_session.commit()
api_db_session.refresh(node)
return node
@pytest.fixture
def sample_message_with_receiver(api_db_session, receiver_node):
"""Create a message with a receiver node."""
message = Message(
message_type="channel",
channel_idx=1,
pubkey_prefix="xyz789",
text="Channel message with receiver",
received_at=datetime.now(timezone.utc),
receiver_node_id=receiver_node.id,
)
api_db_session.add(message)
api_db_session.commit()
api_db_session.refresh(message)
return message
@pytest.fixture
def sample_advertisement_with_receiver(api_db_session, sample_node, receiver_node):
"""Create an advertisement with source and receiver nodes."""
advert = Advertisement(
public_key=sample_node.public_key,
name="SourceNode",
adv_type="REPEATER",
received_at=datetime.now(timezone.utc),
node_id=sample_node.id,
receiver_node_id=receiver_node.id,
)
api_db_session.add(advert)
api_db_session.commit()
api_db_session.refresh(advert)
return advert
@pytest.fixture
def sample_telemetry_with_receiver(api_db_session, receiver_node):
"""Create a telemetry record with a receiver node."""
telemetry = Telemetry(
node_public_key="xyz789xyz789xyz789xyz789xyz789xy",
parsed_data={"battery_level": 50.0},
received_at=datetime.now(timezone.utc),
receiver_node_id=receiver_node.id,
)
api_db_session.add(telemetry)
api_db_session.commit()
api_db_session.refresh(telemetry)
return telemetry
@pytest.fixture
def sample_trace_path_with_receiver(api_db_session, receiver_node):
"""Create a trace path with a receiver node."""
trace = TracePath(
initiator_tag=99999,
path_hashes=["aaa111", "bbb222"],
hop_count=2,
received_at=datetime.now(timezone.utc),
receiver_node_id=receiver_node.id,
)
api_db_session.add(trace)
api_db_session.commit()
api_db_session.refresh(trace)
return trace
@pytest.fixture
def sample_node_with_name_tag(api_db_session):
"""Create a node with a name tag for search testing."""
node = Node(
public_key="searchable123searchable123searc",
name="Original Name",
adv_type="CLIENT",
first_seen=datetime.now(timezone.utc),
)
api_db_session.add(node)
api_db_session.commit()
tag = NodeTag(
node_id=node.id,
key="name",
value="Friendly Search Name",
)
api_db_session.add(tag)
api_db_session.commit()
api_db_session.refresh(node)
return node
@pytest.fixture
def sample_node_with_member_tag(api_db_session):
"""Create a node with a member_id tag for filter testing."""
node = Node(
public_key="member123member123member123membe",
name="Member Node",
adv_type="CHAT",
first_seen=datetime.now(timezone.utc),
)
api_db_session.add(node)
api_db_session.commit()
tag = NodeTag(
node_id=node.id,
key="member_id",
value="alice",
)
api_db_session.add(tag)
api_db_session.commit()
api_db_session.refresh(node)
return node

View File

@@ -1,5 +1,7 @@
"""Tests for advertisement API routes."""
from datetime import datetime, timedelta, timezone
class TestListAdvertisements:
"""Tests for GET /advertisements endpoint."""
@@ -55,3 +57,120 @@ class TestGetAdvertisement:
"""Test getting a non-existent advertisement."""
response = client_no_auth.get("/api/v1/advertisements/nonexistent-id")
assert response.status_code == 404
class TestListAdvertisementsFilters:
"""Tests for advertisement list query filters."""
def test_filter_by_search_public_key(self, client_no_auth, sample_advertisement):
"""Test filtering advertisements by public key search."""
# Partial public key match
response = client_no_auth.get("/api/v1/advertisements?search=abc123")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
# No match
response = client_no_auth.get("/api/v1/advertisements?search=zzz999")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
def test_filter_by_search_name(self, client_no_auth, sample_advertisement):
"""Test filtering advertisements by name search."""
response = client_no_auth.get("/api/v1/advertisements?search=TestNode")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
def test_filter_by_received_by(
self,
client_no_auth,
sample_advertisement,
sample_advertisement_with_receiver,
receiver_node,
):
"""Test filtering advertisements by receiver node."""
response = client_no_auth.get(
f"/api/v1/advertisements?received_by={receiver_node.public_key}"
)
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
def test_filter_by_member_id(
self, client_no_auth, api_db_session, sample_node_with_member_tag
):
"""Test filtering advertisements by member_id tag."""
from meshcore_hub.common.models import Advertisement
# Create an advertisement for the node with member tag
advert = Advertisement(
public_key=sample_node_with_member_tag.public_key,
name="Member Node Ad",
adv_type="CHAT",
received_at=datetime.now(timezone.utc),
node_id=sample_node_with_member_tag.id,
)
api_db_session.add(advert)
api_db_session.commit()
# Filter by member_id
response = client_no_auth.get("/api/v1/advertisements?member_id=alice")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
# No match
response = client_no_auth.get("/api/v1/advertisements?member_id=unknown")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
def test_filter_by_since(self, client_no_auth, api_db_session):
"""Test filtering advertisements by since timestamp."""
from meshcore_hub.common.models import Advertisement
now = datetime.now(timezone.utc)
old_time = now - timedelta(days=7)
# Create an old advertisement
old_advert = Advertisement(
public_key="old123old123old123old123old123ol",
name="Old Advertisement",
adv_type="CLIENT",
received_at=old_time,
)
api_db_session.add(old_advert)
api_db_session.commit()
# Filter since yesterday - should not include old advertisement
since = (now - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S")
response = client_no_auth.get(f"/api/v1/advertisements?since={since}")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
def test_filter_by_until(self, client_no_auth, api_db_session):
"""Test filtering advertisements by until timestamp."""
from meshcore_hub.common.models import Advertisement
now = datetime.now(timezone.utc)
old_time = now - timedelta(days=7)
# Create an old advertisement
old_advert = Advertisement(
public_key="until123until123until123until12",
name="Old Advertisement Until",
adv_type="CLIENT",
received_at=old_time,
)
api_db_session.add(old_advert)
api_db_session.commit()
# Filter until 5 days ago - should include old advertisement
until = (now - timedelta(days=5)).strftime("%Y-%m-%dT%H:%M:%S")
response = client_no_auth.get(f"/api/v1/advertisements?until={until}")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1

View File

@@ -0,0 +1,285 @@
"""Tests for member API routes."""
class TestListMembers:
"""Tests for GET /members endpoint."""
def test_list_members_empty(self, client_no_auth):
"""Test listing members when database is empty."""
response = client_no_auth.get("/api/v1/members")
assert response.status_code == 200
data = response.json()
assert data["items"] == []
assert data["total"] == 0
def test_list_members_with_data(self, client_no_auth, sample_member):
"""Test listing members with data in database."""
response = client_no_auth.get("/api/v1/members")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
assert data["total"] == 1
assert data["items"][0]["member_id"] == sample_member.member_id
assert data["items"][0]["name"] == sample_member.name
assert data["items"][0]["callsign"] == sample_member.callsign
def test_list_members_pagination(self, client_no_auth, sample_member):
"""Test member list pagination parameters."""
response = client_no_auth.get("/api/v1/members?limit=25&offset=10")
assert response.status_code == 200
data = response.json()
assert data["limit"] == 25
assert data["offset"] == 10
def test_list_members_requires_read_auth(self, client_with_auth):
"""Test listing members requires read auth when configured."""
# Without auth header
response = client_with_auth.get("/api/v1/members")
assert response.status_code == 401
# With read key
response = client_with_auth.get(
"/api/v1/members",
headers={"Authorization": "Bearer test-read-key"},
)
assert response.status_code == 200
class TestGetMember:
"""Tests for GET /members/{member_id} endpoint."""
def test_get_member_success(self, client_no_auth, sample_member):
"""Test getting a specific member."""
response = client_no_auth.get(f"/api/v1/members/{sample_member.id}")
assert response.status_code == 200
data = response.json()
assert data["member_id"] == sample_member.member_id
assert data["name"] == sample_member.name
assert data["callsign"] == sample_member.callsign
assert data["role"] == sample_member.role
assert data["description"] == sample_member.description
assert data["contact"] == sample_member.contact
def test_get_member_not_found(self, client_no_auth):
"""Test getting a non-existent member."""
response = client_no_auth.get("/api/v1/members/nonexistent-id")
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()
def test_get_member_requires_read_auth(self, client_with_auth, sample_member):
"""Test getting a member requires read auth when configured."""
# Without auth header
response = client_with_auth.get(f"/api/v1/members/{sample_member.id}")
assert response.status_code == 401
# With read key
response = client_with_auth.get(
f"/api/v1/members/{sample_member.id}",
headers={"Authorization": "Bearer test-read-key"},
)
assert response.status_code == 200
class TestCreateMember:
"""Tests for POST /members endpoint."""
def test_create_member_success(self, client_no_auth):
"""Test creating a new member."""
response = client_no_auth.post(
"/api/v1/members",
json={
"member_id": "bob",
"name": "Bob Jones",
"callsign": "W2XYZ",
"role": "Member",
"description": "Regular member",
"contact": "bob@example.com",
},
)
assert response.status_code == 201
data = response.json()
assert data["member_id"] == "bob"
assert data["name"] == "Bob Jones"
assert data["callsign"] == "W2XYZ"
assert data["role"] == "Member"
assert "id" in data
assert "created_at" in data
def test_create_member_minimal(self, client_no_auth):
"""Test creating a member with only required fields."""
response = client_no_auth.post(
"/api/v1/members",
json={
"member_id": "charlie",
"name": "Charlie Brown",
},
)
assert response.status_code == 201
data = response.json()
assert data["member_id"] == "charlie"
assert data["name"] == "Charlie Brown"
assert data["callsign"] is None
assert data["role"] is None
def test_create_member_duplicate_member_id(self, client_no_auth, sample_member):
"""Test creating a member with duplicate member_id fails."""
response = client_no_auth.post(
"/api/v1/members",
json={
"member_id": sample_member.member_id, # "alice" already exists
"name": "Another Alice",
},
)
assert response.status_code == 400
assert "already exists" in response.json()["detail"].lower()
def test_create_member_requires_admin_auth(self, client_with_auth):
"""Test creating a member requires admin auth."""
# Without auth
response = client_with_auth.post(
"/api/v1/members",
json={"member_id": "test", "name": "Test User"},
)
assert response.status_code == 401
# With read key (not admin)
response = client_with_auth.post(
"/api/v1/members",
json={"member_id": "test", "name": "Test User"},
headers={"Authorization": "Bearer test-read-key"},
)
assert response.status_code == 403
# With admin key
response = client_with_auth.post(
"/api/v1/members",
json={"member_id": "test", "name": "Test User"},
headers={"Authorization": "Bearer test-admin-key"},
)
assert response.status_code == 201
class TestUpdateMember:
"""Tests for PUT /members/{member_id} endpoint."""
def test_update_member_success(self, client_no_auth, sample_member):
"""Test updating a member."""
response = client_no_auth.put(
f"/api/v1/members/{sample_member.id}",
json={
"name": "Alice Johnson",
"role": "Super Admin",
},
)
assert response.status_code == 200
data = response.json()
assert data["name"] == "Alice Johnson"
assert data["role"] == "Super Admin"
# Unchanged fields should remain
assert data["member_id"] == sample_member.member_id
assert data["callsign"] == sample_member.callsign
def test_update_member_change_member_id(self, client_no_auth, sample_member):
"""Test updating member_id."""
response = client_no_auth.put(
f"/api/v1/members/{sample_member.id}",
json={"member_id": "alice2"},
)
assert response.status_code == 200
data = response.json()
assert data["member_id"] == "alice2"
def test_update_member_member_id_collision(
self, client_no_auth, api_db_session, sample_member
):
"""Test updating member_id to one that already exists fails."""
from meshcore_hub.common.models import Member
# Create another member
other_member = Member(
member_id="bob",
name="Bob",
)
api_db_session.add(other_member)
api_db_session.commit()
# Try to change alice's member_id to "bob"
response = client_no_auth.put(
f"/api/v1/members/{sample_member.id}",
json={"member_id": "bob"},
)
assert response.status_code == 400
assert "already exists" in response.json()["detail"].lower()
def test_update_member_not_found(self, client_no_auth):
"""Test updating a non-existent member."""
response = client_no_auth.put(
"/api/v1/members/nonexistent-id",
json={"name": "New Name"},
)
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()
def test_update_member_requires_admin_auth(self, client_with_auth, sample_member):
"""Test updating a member requires admin auth."""
# Without auth
response = client_with_auth.put(
f"/api/v1/members/{sample_member.id}",
json={"name": "New Name"},
)
assert response.status_code == 401
# With read key (not admin)
response = client_with_auth.put(
f"/api/v1/members/{sample_member.id}",
json={"name": "New Name"},
headers={"Authorization": "Bearer test-read-key"},
)
assert response.status_code == 403
# With admin key
response = client_with_auth.put(
f"/api/v1/members/{sample_member.id}",
json={"name": "New Name"},
headers={"Authorization": "Bearer test-admin-key"},
)
assert response.status_code == 200
class TestDeleteMember:
"""Tests for DELETE /members/{member_id} endpoint."""
def test_delete_member_success(self, client_no_auth, sample_member):
"""Test deleting a member."""
response = client_no_auth.delete(f"/api/v1/members/{sample_member.id}")
assert response.status_code == 204
# Verify it's deleted
response = client_no_auth.get(f"/api/v1/members/{sample_member.id}")
assert response.status_code == 404
def test_delete_member_not_found(self, client_no_auth):
"""Test deleting a non-existent member."""
response = client_no_auth.delete("/api/v1/members/nonexistent-id")
assert response.status_code == 404
assert "not found" in response.json()["detail"].lower()
def test_delete_member_requires_admin_auth(self, client_with_auth, sample_member):
"""Test deleting a member requires admin auth."""
# Without auth
response = client_with_auth.delete(f"/api/v1/members/{sample_member.id}")
assert response.status_code == 401
# With read key (not admin)
response = client_with_auth.delete(
f"/api/v1/members/{sample_member.id}",
headers={"Authorization": "Bearer test-read-key"},
)
assert response.status_code == 403
# With admin key
response = client_with_auth.delete(
f"/api/v1/members/{sample_member.id}",
headers={"Authorization": "Bearer test-admin-key"},
)
assert response.status_code == 204

View File

@@ -1,5 +1,7 @@
"""Tests for message API routes."""
from datetime import datetime, timezone
class TestListMessages:
"""Tests for GET /messages endpoint."""
@@ -57,3 +59,127 @@ class TestGetMessage:
"""Test getting a non-existent message."""
response = client_no_auth.get("/api/v1/messages/nonexistent-id")
assert response.status_code == 404
class TestListMessagesFilters:
"""Tests for message list query filters."""
def test_filter_by_pubkey_prefix(self, client_no_auth, sample_message):
"""Test filtering messages by pubkey_prefix."""
# Match
response = client_no_auth.get("/api/v1/messages?pubkey_prefix=abc123")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
# No match
response = client_no_auth.get("/api/v1/messages?pubkey_prefix=xyz999")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
def test_filter_by_channel_idx(
self, client_no_auth, sample_message, sample_message_with_receiver
):
"""Test filtering messages by channel_idx."""
# Channel 1 should match sample_message_with_receiver
response = client_no_auth.get("/api/v1/messages?channel_idx=1")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
assert data["items"][0]["channel_idx"] == 1
# Channel 0 should return no results
response = client_no_auth.get("/api/v1/messages?channel_idx=0")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
def test_filter_by_received_by(
self,
client_no_auth,
sample_message,
sample_message_with_receiver,
receiver_node,
):
"""Test filtering messages by receiver node."""
response = client_no_auth.get(
f"/api/v1/messages?received_by={receiver_node.public_key}"
)
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
assert data["items"][0]["text"] == sample_message_with_receiver.text
def test_filter_by_since(self, client_no_auth, api_db_session):
"""Test filtering messages by since timestamp."""
from datetime import timedelta
from meshcore_hub.common.models import Message
now = datetime.now(timezone.utc)
old_time = now - timedelta(days=7)
# Create an old message
old_msg = Message(
message_type="direct",
pubkey_prefix="old123",
text="Old message",
received_at=old_time,
)
api_db_session.add(old_msg)
api_db_session.commit()
# Filter since yesterday - should not include old message
since = (now - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S")
response = client_no_auth.get(f"/api/v1/messages?since={since}")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
def test_filter_by_until(self, client_no_auth, api_db_session):
"""Test filtering messages by until timestamp."""
from datetime import timedelta
from meshcore_hub.common.models import Message
now = datetime.now(timezone.utc)
old_time = now - timedelta(days=7)
# Create an old message
old_msg = Message(
message_type="direct",
pubkey_prefix="old456",
text="Old message for until",
received_at=old_time,
)
api_db_session.add(old_msg)
api_db_session.commit()
# Filter until 5 days ago - should include old message
until = (now - timedelta(days=5)).strftime("%Y-%m-%dT%H:%M:%S")
response = client_no_auth.get(f"/api/v1/messages?until={until}")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
assert data["items"][0]["text"] == "Old message for until"
def test_filter_by_search(self, client_no_auth, sample_message):
"""Test filtering messages by text search."""
# Match
response = client_no_auth.get("/api/v1/messages?search=Hello")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
# Case insensitive match
response = client_no_auth.get("/api/v1/messages?search=hello")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
# No match
response = client_no_auth.get("/api/v1/messages?search=nonexistent")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0

View File

@@ -57,6 +57,66 @@ class TestListNodes:
assert response.status_code == 200
class TestListNodesFilters:
"""Tests for node list query filters."""
def test_filter_by_search_public_key(self, client_no_auth, sample_node):
"""Test filtering nodes by public key search."""
# Partial public key match
response = client_no_auth.get("/api/v1/nodes?search=abc123")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
# No match
response = client_no_auth.get("/api/v1/nodes?search=zzz999")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
def test_filter_by_search_node_name(self, client_no_auth, sample_node):
"""Test filtering nodes by node name search."""
response = client_no_auth.get("/api/v1/nodes?search=Test%20Node")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
def test_filter_by_search_name_tag(self, client_no_auth, sample_node_with_name_tag):
"""Test filtering nodes by name tag search."""
response = client_no_auth.get("/api/v1/nodes?search=Friendly%20Search")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
def test_filter_by_adv_type(self, client_no_auth, sample_node):
"""Test filtering nodes by advertisement type."""
# Match REPEATER
response = client_no_auth.get("/api/v1/nodes?adv_type=REPEATER")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
# No match
response = client_no_auth.get("/api/v1/nodes?adv_type=CLIENT")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
def test_filter_by_member_id(self, client_no_auth, sample_node_with_member_tag):
"""Test filtering nodes by member_id tag."""
# Match alice
response = client_no_auth.get("/api/v1/nodes?member_id=alice")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
# No match
response = client_no_auth.get("/api/v1/nodes?member_id=unknown")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
class TestGetNode:
"""Tests for GET /nodes/{public_key} endpoint."""
@@ -158,3 +218,166 @@ class TestNodeTags:
headers={"Authorization": "Bearer test-admin-key"},
)
assert response.status_code == 201 # Created
class TestMoveNodeTag:
"""Tests for PUT /nodes/{public_key}/tags/{key}/move endpoint."""
# 64-character public key for testing
DEST_PUBLIC_KEY = "xyz789xyz789xyz789xyz789xyz789xyabc123abc123abc123abc123abc123ab"
def test_move_node_tag_success(
self, client_no_auth, api_db_session, sample_node, sample_node_tag
):
"""Test successfully moving a tag to another node."""
from meshcore_hub.common.models import Node
from datetime import datetime, timezone
# Create a second node with 64-char public key
second_node = Node(
public_key=self.DEST_PUBLIC_KEY,
name="Second Node",
adv_type="CHAT",
first_seen=datetime.now(timezone.utc),
)
api_db_session.add(second_node)
api_db_session.commit()
response = client_no_auth.put(
f"/api/v1/nodes/{sample_node.public_key}/tags/{sample_node_tag.key}/move",
json={"new_public_key": second_node.public_key},
)
assert response.status_code == 200
data = response.json()
assert data["key"] == sample_node_tag.key
assert data["value"] == sample_node_tag.value
# Verify tag is no longer on original node
response = client_no_auth.get(
f"/api/v1/nodes/{sample_node.public_key}/tags/{sample_node_tag.key}"
)
assert response.status_code == 404
# Verify tag is now on new node
response = client_no_auth.get(
f"/api/v1/nodes/{second_node.public_key}/tags/{sample_node_tag.key}"
)
assert response.status_code == 200
def test_move_node_tag_source_not_found(self, client_no_auth):
"""Test moving a tag from a non-existent node."""
response = client_no_auth.put(
"/api/v1/nodes/nonexistent123/tags/somekey/move",
json={"new_public_key": self.DEST_PUBLIC_KEY},
)
assert response.status_code == 404
assert "Source node not found" in response.json()["detail"]
def test_move_node_tag_tag_not_found(self, client_no_auth, sample_node):
"""Test moving a non-existent tag."""
response = client_no_auth.put(
f"/api/v1/nodes/{sample_node.public_key}/tags/nonexistent/move",
json={"new_public_key": self.DEST_PUBLIC_KEY},
)
assert response.status_code == 404
assert "Tag not found" in response.json()["detail"]
def test_move_node_tag_dest_not_found(
self, client_no_auth, sample_node, sample_node_tag
):
"""Test moving a tag to a non-existent destination node."""
# 64-character nonexistent public key
nonexistent_key = (
"1111111111111111111111111111111122222222222222222222222222222222"
)
response = client_no_auth.put(
f"/api/v1/nodes/{sample_node.public_key}/tags/{sample_node_tag.key}/move",
json={"new_public_key": nonexistent_key},
)
assert response.status_code == 404
assert "Destination node not found" in response.json()["detail"]
def test_move_node_tag_conflict(
self, client_no_auth, api_db_session, sample_node, sample_node_tag
):
"""Test moving a tag when destination already has that key."""
from meshcore_hub.common.models import Node, NodeTag
from datetime import datetime, timezone
# Create second node with same tag key
second_node = Node(
public_key=self.DEST_PUBLIC_KEY,
name="Second Node",
adv_type="CHAT",
first_seen=datetime.now(timezone.utc),
)
api_db_session.add(second_node)
api_db_session.commit()
# Add the same tag key to second node
existing_tag = NodeTag(
node_id=second_node.id,
key=sample_node_tag.key, # Same key
value="different value",
)
api_db_session.add(existing_tag)
api_db_session.commit()
response = client_no_auth.put(
f"/api/v1/nodes/{sample_node.public_key}/tags/{sample_node_tag.key}/move",
json={"new_public_key": second_node.public_key},
)
assert response.status_code == 409
assert "already exists on destination" in response.json()["detail"]
def test_move_node_tag_requires_admin(
self, client_with_auth, sample_node, sample_node_tag
):
"""Test that move operation requires admin auth."""
# Without auth
response = client_with_auth.put(
f"/api/v1/nodes/{sample_node.public_key}/tags/{sample_node_tag.key}/move",
json={"new_public_key": self.DEST_PUBLIC_KEY},
)
assert response.status_code == 401
# With read key (not admin)
response = client_with_auth.put(
f"/api/v1/nodes/{sample_node.public_key}/tags/{sample_node_tag.key}/move",
json={"new_public_key": self.DEST_PUBLIC_KEY},
headers={"Authorization": "Bearer test-read-key"},
)
assert response.status_code == 403
def test_move_node_tag_same_node(self, client_no_auth, api_db_session):
"""Test moving a tag to the same node returns 400."""
from datetime import datetime, timezone
from meshcore_hub.common.models import Node, NodeTag
# Create node with 64-char public key
full_key = "abc123def456abc123def456abc123deabc123def456abc123def456abc123de"
node = Node(
public_key=full_key,
name="Test Node 64",
adv_type="REPEATER",
first_seen=datetime.now(timezone.utc),
)
api_db_session.add(node)
api_db_session.commit()
# Create tag
tag = NodeTag(
node_id=node.id,
key="test_tag",
value="test_value",
)
api_db_session.add(tag)
api_db_session.commit()
response = client_no_auth.put(
f"/api/v1/nodes/{full_key}/tags/test_tag/move",
json={"new_public_key": full_key},
)
assert response.status_code == 400
assert "same" in response.json()["detail"].lower()

View File

@@ -1,5 +1,7 @@
"""Tests for telemetry API routes."""
from datetime import datetime, timedelta, timezone
class TestListTelemetry:
"""Tests for GET /telemetry endpoint."""
@@ -51,3 +53,68 @@ class TestGetTelemetry:
"""Test getting a non-existent telemetry record."""
response = client_no_auth.get("/api/v1/telemetry/nonexistent-id")
assert response.status_code == 404
class TestListTelemetryFilters:
"""Tests for telemetry list query filters."""
def test_filter_by_received_by(
self,
client_no_auth,
sample_telemetry,
sample_telemetry_with_receiver,
receiver_node,
):
"""Test filtering telemetry by receiver node."""
response = client_no_auth.get(
f"/api/v1/telemetry?received_by={receiver_node.public_key}"
)
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
def test_filter_by_since(self, client_no_auth, api_db_session):
"""Test filtering telemetry by since timestamp."""
from meshcore_hub.common.models import Telemetry
now = datetime.now(timezone.utc)
old_time = now - timedelta(days=7)
# Create old telemetry
old_telemetry = Telemetry(
node_public_key="old123old123old123old123old123ol",
parsed_data={"battery_level": 10.0},
received_at=old_time,
)
api_db_session.add(old_telemetry)
api_db_session.commit()
# Filter since yesterday - should not include old telemetry
since = (now - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S")
response = client_no_auth.get(f"/api/v1/telemetry?since={since}")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
def test_filter_by_until(self, client_no_auth, api_db_session):
"""Test filtering telemetry by until timestamp."""
from meshcore_hub.common.models import Telemetry
now = datetime.now(timezone.utc)
old_time = now - timedelta(days=7)
# Create old telemetry
old_telemetry = Telemetry(
node_public_key="until123until123until123until12",
parsed_data={"battery_level": 20.0},
received_at=old_time,
)
api_db_session.add(old_telemetry)
api_db_session.commit()
# Filter until 5 days ago - should include old telemetry
until = (now - timedelta(days=5)).strftime("%Y-%m-%dT%H:%M:%S")
response = client_no_auth.get(f"/api/v1/telemetry?until={until}")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1

View File

@@ -1,5 +1,7 @@
"""Tests for trace path API routes."""
from datetime import datetime, timedelta, timezone
class TestListTracePaths:
"""Tests for GET /trace-paths endpoint."""
@@ -37,3 +39,70 @@ class TestGetTracePath:
"""Test getting a non-existent trace path."""
response = client_no_auth.get("/api/v1/trace-paths/nonexistent-id")
assert response.status_code == 404
class TestListTracePathsFilters:
"""Tests for trace path list query filters."""
def test_filter_by_received_by(
self,
client_no_auth,
sample_trace_path,
sample_trace_path_with_receiver,
receiver_node,
):
"""Test filtering trace paths by receiver node."""
response = client_no_auth.get(
f"/api/v1/trace-paths?received_by={receiver_node.public_key}"
)
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
def test_filter_by_since(self, client_no_auth, api_db_session):
"""Test filtering trace paths by since timestamp."""
from meshcore_hub.common.models import TracePath
now = datetime.now(timezone.utc)
old_time = now - timedelta(days=7)
# Create old trace path
old_trace = TracePath(
initiator_tag=11111,
path_hashes=["old1", "old2"],
hop_count=2,
received_at=old_time,
)
api_db_session.add(old_trace)
api_db_session.commit()
# Filter since yesterday - should not include old trace path
since = (now - timedelta(days=1)).strftime("%Y-%m-%dT%H:%M:%S")
response = client_no_auth.get(f"/api/v1/trace-paths?since={since}")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 0
def test_filter_by_until(self, client_no_auth, api_db_session):
"""Test filtering trace paths by until timestamp."""
from meshcore_hub.common.models import TracePath
now = datetime.now(timezone.utc)
old_time = now - timedelta(days=7)
# Create old trace path
old_trace = TracePath(
initiator_tag=22222,
path_hashes=["until1", "until2"],
hop_count=2,
received_at=old_time,
)
api_db_session.add(old_trace)
api_db_session.commit()
# Filter until 5 days ago - should include old trace path
until = (now - timedelta(days=5)).strftime("%Y-%m-%dT%H:%M:%S")
response = client_no_auth.get(f"/api/v1/trace-paths?until={until}")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1

View File

@@ -99,7 +99,7 @@ class TestLoadTagsFile:
"""Test loading file with full format (value and type)."""
data = {
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef": {
"location": {"value": "52.0,1.0", "type": "coordinate"},
"is_active": {"value": "true", "type": "boolean"},
"altitude": {"value": "150", "type": "number"},
}
}
@@ -110,7 +110,7 @@ class TestLoadTagsFile:
result = load_tags_file(f.name)
key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
assert result[key]["location"]["type"] == "coordinate"
assert result[key]["is_active"]["type"] == "boolean"
assert result[key]["altitude"]["type"] == "number"
Path(f.name).unlink()

View File

@@ -55,14 +55,14 @@ class TestNodeModel:
def test_node_tags_relationship(self, db_session) -> None:
"""Test node-tag relationship."""
node = Node(public_key="b" * 64, name="Tagged Node")
tag = NodeTag(key="location", value="51.5,-0.1", value_type="coordinate")
tag = NodeTag(key="altitude", value="150", value_type="number")
node.tags.append(tag)
db_session.add(node)
db_session.commit()
assert len(node.tags) == 1
assert node.tags[0].key == "location"
assert node.tags[0].key == "altitude"
class TestMessageModel:

View File

@@ -255,6 +255,18 @@ class MockHttpClient:
key = f"POST:{path}"
return self._create_response(key)
async def put(
self, path: str, json: dict | None = None, params: dict | None = None
) -> Response:
"""Mock PUT request."""
key = f"PUT:{path}"
return self._create_response(key)
async def delete(self, path: str, params: dict | None = None) -> Response:
"""Mock DELETE request."""
key = f"DELETE:{path}"
return self._create_response(key)
async def aclose(self) -> None:
"""Mock close method."""
pass

View File

@@ -0,0 +1,426 @@
"""Tests for admin web routes."""
from typing import Any
import pytest
from fastapi.testclient import TestClient
from meshcore_hub.web.app import create_app
from .conftest import MockHttpClient
@pytest.fixture
def mock_http_client_admin() -> MockHttpClient:
"""Create a mock HTTP client for admin tests."""
client = MockHttpClient()
# Mock the nodes API response for admin dropdown
client.set_response(
"GET",
"/api/v1/nodes",
200,
{
"items": [
{
"public_key": "abc123def456abc123def456abc123de",
"name": "Node One",
"adv_type": "REPEATER",
"first_seen": "2024-01-01T00:00:00Z",
"last_seen": "2024-01-01T12:00:00Z",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T00:00:00Z",
"tags": [],
},
{
"public_key": "xyz789xyz789xyz789xyz789xyz789xy",
"name": "Node Two",
"adv_type": "CHAT",
"first_seen": "2024-01-01T00:00:00Z",
"last_seen": "2024-01-01T11:00:00Z",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T00:00:00Z",
"tags": [],
},
],
"total": 2,
"limit": 100,
"offset": 0,
},
)
# Mock node tags response
client.set_response(
"GET",
"/api/v1/nodes/abc123def456abc123def456abc123de/tags",
200,
[
{
"key": "environment",
"value": "production",
"value_type": "string",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T00:00:00Z",
},
{
"key": "location",
"value": "building-a",
"value_type": "string",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T00:00:00Z",
},
],
)
# Mock create tag response
client.set_response(
"POST",
"/api/v1/nodes/abc123def456abc123def456abc123de/tags",
201,
{
"key": "new_tag",
"value": "new_value",
"value_type": "string",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T00:00:00Z",
},
)
# Mock update tag response
client.set_response(
"PUT",
"/api/v1/nodes/abc123def456abc123def456abc123de/tags/environment",
200,
{
"key": "environment",
"value": "staging",
"value_type": "string",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T12:00:00Z",
},
)
# Mock move tag response
client.set_response(
"PUT",
"/api/v1/nodes/abc123def456abc123def456abc123de/tags/environment/move",
200,
{
"key": "environment",
"value": "production",
"value_type": "string",
"created_at": "2024-01-01T00:00:00Z",
"updated_at": "2024-01-01T12:00:00Z",
},
)
# Mock delete tag response
client.set_response(
"DELETE",
"/api/v1/nodes/abc123def456abc123def456abc123de/tags/environment",
204,
None,
)
return client
@pytest.fixture
def admin_app(mock_http_client_admin: MockHttpClient) -> Any:
"""Create a web app with admin enabled."""
app = create_app(
api_url="http://localhost:8000",
api_key="test-api-key",
network_name="Test Network",
network_city="Test City",
network_country="Test Country",
network_radio_config="Test Radio Config",
network_contact_email="test@example.com",
admin_enabled=True,
)
app.state.http_client = mock_http_client_admin
return app
@pytest.fixture
def admin_app_disabled(mock_http_client_admin: MockHttpClient) -> Any:
"""Create a web app with admin disabled."""
app = create_app(
api_url="http://localhost:8000",
api_key="test-api-key",
network_name="Test Network",
network_city="Test City",
network_country="Test Country",
network_radio_config="Test Radio Config",
network_contact_email="test@example.com",
admin_enabled=False,
)
app.state.http_client = mock_http_client_admin
return app
@pytest.fixture
def auth_headers() -> dict:
"""Authentication headers for admin requests."""
return {
"X-Forwarded-User": "test-user-id",
"X-Forwarded-Email": "test@example.com",
"X-Forwarded-Preferred-Username": "testuser",
}
@pytest.fixture
def admin_client(admin_app: Any, mock_http_client_admin: MockHttpClient) -> TestClient:
"""Create a test client with admin enabled."""
admin_app.state.http_client = mock_http_client_admin
return TestClient(admin_app, raise_server_exceptions=True)
@pytest.fixture
def admin_client_disabled(
admin_app_disabled: Any, mock_http_client_admin: MockHttpClient
) -> TestClient:
"""Create a test client with admin disabled."""
admin_app_disabled.state.http_client = mock_http_client_admin
return TestClient(admin_app_disabled, raise_server_exceptions=True)
class TestAdminHome:
"""Tests for admin home page."""
def test_admin_home_enabled(self, admin_client, auth_headers):
"""Test admin home page when enabled."""
response = admin_client.get("/a/", headers=auth_headers)
assert response.status_code == 200
assert "Admin" in response.text
assert "Node Tags" in response.text
def test_admin_home_disabled(self, admin_client_disabled, auth_headers):
"""Test admin home page when disabled."""
response = admin_client_disabled.get("/a/", headers=auth_headers)
assert response.status_code == 404
def test_admin_home_unauthenticated(self, admin_client):
"""Test admin home page without authentication."""
response = admin_client.get("/a/")
assert response.status_code == 403
assert "Access Denied" in response.text
class TestAdminNodeTags:
"""Tests for admin node tags page."""
def test_node_tags_page_no_selection(self, admin_client, auth_headers):
"""Test node tags page without selecting a node."""
response = admin_client.get("/a/node-tags", headers=auth_headers)
assert response.status_code == 200
assert "Node Tags" in response.text
assert "Select a Node" in response.text
# Should show node dropdown
assert "Node One" in response.text
assert "Node Two" in response.text
def test_node_tags_page_with_selection(self, admin_client, auth_headers):
"""Test node tags page with a node selected."""
response = admin_client.get(
"/a/node-tags?public_key=abc123def456abc123def456abc123de",
headers=auth_headers,
)
assert response.status_code == 200
assert "Node Tags" in response.text
# Should show the selected node's tags
assert "environment" in response.text
assert "production" in response.text
assert "location" in response.text
assert "building-a" in response.text
def test_node_tags_page_disabled(self, admin_client_disabled, auth_headers):
"""Test node tags page when admin is disabled."""
response = admin_client_disabled.get("/a/node-tags", headers=auth_headers)
assert response.status_code == 404
def test_node_tags_page_with_message(self, admin_client, auth_headers):
"""Test node tags page displays success message."""
response = admin_client.get(
"/a/node-tags?public_key=abc123def456abc123def456abc123de"
"&message=Tag%20created%20successfully",
headers=auth_headers,
)
assert response.status_code == 200
assert "Tag created successfully" in response.text
def test_node_tags_page_with_error(self, admin_client, auth_headers):
"""Test node tags page displays error message."""
response = admin_client.get(
"/a/node-tags?public_key=abc123def456abc123def456abc123de"
"&error=Tag%20already%20exists",
headers=auth_headers,
)
assert response.status_code == 200
assert "Tag already exists" in response.text
def test_node_tags_page_unauthenticated(self, admin_client):
"""Test node tags page without authentication."""
response = admin_client.get("/a/node-tags")
assert response.status_code == 403
assert "Access Denied" in response.text
class TestAdminCreateTag:
"""Tests for creating node tags."""
def test_create_tag_success(self, admin_client, auth_headers):
"""Test creating a new tag."""
response = admin_client.post(
"/a/node-tags",
data={
"public_key": "abc123def456abc123def456abc123de",
"key": "new_tag",
"value": "new_value",
"value_type": "string",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 303
assert "message=" in response.headers["location"]
assert "created" in response.headers["location"]
def test_create_tag_disabled(self, admin_client_disabled, auth_headers):
"""Test creating tag when admin is disabled."""
response = admin_client_disabled.post(
"/a/node-tags",
data={
"public_key": "abc123def456abc123def456abc123de",
"key": "new_tag",
"value": "new_value",
"value_type": "string",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 404
def test_create_tag_unauthenticated(self, admin_client):
"""Test creating tag without authentication."""
response = admin_client.post(
"/a/node-tags",
data={
"public_key": "abc123def456abc123def456abc123de",
"key": "new_tag",
"value": "new_value",
"value_type": "string",
},
follow_redirects=False,
)
assert response.status_code == 403
class TestAdminUpdateTag:
"""Tests for updating node tags."""
def test_update_tag_success(self, admin_client, auth_headers):
"""Test updating a tag."""
response = admin_client.post(
"/a/node-tags/update",
data={
"public_key": "abc123def456abc123def456abc123de",
"key": "environment",
"value": "staging",
"value_type": "string",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 303
assert "message=" in response.headers["location"]
assert "updated" in response.headers["location"]
def test_update_tag_not_found(
self, admin_app, mock_http_client_admin: MockHttpClient, auth_headers
):
"""Test updating a non-existent tag returns error."""
# Set up 404 response for this specific tag
mock_http_client_admin.set_response(
"PUT",
"/api/v1/nodes/abc123def456abc123def456abc123de/tags/nonexistent",
404,
{"detail": "Tag not found"},
)
admin_app.state.http_client = mock_http_client_admin
client = TestClient(admin_app, raise_server_exceptions=True)
response = client.post(
"/a/node-tags/update",
data={
"public_key": "abc123def456abc123def456abc123de",
"key": "nonexistent",
"value": "value",
"value_type": "string",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 303
assert "error=" in response.headers["location"]
assert "not+found" in response.headers["location"].lower()
def test_update_tag_disabled(self, admin_client_disabled, auth_headers):
"""Test updating tag when admin is disabled."""
response = admin_client_disabled.post(
"/a/node-tags/update",
data={
"public_key": "abc123def456abc123def456abc123de",
"key": "environment",
"value": "staging",
"value_type": "string",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 404
class TestAdminMoveTag:
"""Tests for moving node tags."""
def test_move_tag_success(self, admin_client, auth_headers):
"""Test moving a tag to another node."""
response = admin_client.post(
"/a/node-tags/move",
data={
"public_key": "abc123def456abc123def456abc123de",
"key": "environment",
"new_public_key": "xyz789xyz789xyz789xyz789xyz789xy",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 303
# Should redirect to destination node
assert "xyz789xyz789xyz789xyz789xyz789xy" in response.headers["location"]
assert "message=" in response.headers["location"]
assert "moved" in response.headers["location"]
class TestAdminDeleteTag:
"""Tests for deleting node tags."""
def test_delete_tag_success(self, admin_client, auth_headers):
"""Test deleting a tag."""
response = admin_client.post(
"/a/node-tags/delete",
data={
"public_key": "abc123def456abc123def456abc123de",
"key": "environment",
},
headers=auth_headers,
follow_redirects=False,
)
assert response.status_code == 303
assert "message=" in response.headers["location"]
assert "deleted" in response.headers["location"]

View File

@@ -0,0 +1,364 @@
"""Tests for the advertisements page route."""
from typing import Any
from fastapi.testclient import TestClient
from tests.test_web.conftest import MockHttpClient
class TestAdvertisementsPage:
"""Tests for the advertisements page."""
def test_advertisements_returns_200(self, client: TestClient) -> None:
"""Test that advertisements page returns 200 status code."""
response = client.get("/advertisements")
assert response.status_code == 200
def test_advertisements_returns_html(self, client: TestClient) -> None:
"""Test that advertisements page returns HTML content."""
response = client.get("/advertisements")
assert "text/html" in response.headers["content-type"]
def test_advertisements_contains_network_name(self, client: TestClient) -> None:
"""Test that advertisements page contains the network name."""
response = client.get("/advertisements")
assert "Test Network" in response.text
def test_advertisements_displays_advertisement_list(
self, client: TestClient, mock_http_client: MockHttpClient
) -> None:
"""Test that advertisements page displays advertisements from API."""
response = client.get("/advertisements")
assert response.status_code == 200
# Check for advertisement data from mock
assert "Node One" in response.text
def test_advertisements_displays_adv_type(
self, client: TestClient, mock_http_client: MockHttpClient
) -> None:
"""Test that advertisements page displays advertisement types."""
response = client.get("/advertisements")
# Should show adv type from mock data
assert "REPEATER" in response.text
class TestAdvertisementsPageFilters:
"""Tests for advertisements page filtering."""
def test_advertisements_with_search(self, client: TestClient) -> None:
"""Test advertisements page with search parameter."""
response = client.get("/advertisements?search=node")
assert response.status_code == 200
def test_advertisements_with_member_filter(self, client: TestClient) -> None:
"""Test advertisements page with member_id filter."""
response = client.get("/advertisements?member_id=alice")
assert response.status_code == 200
def test_advertisements_with_public_key_filter(self, client: TestClient) -> None:
"""Test advertisements page with public_key filter."""
response = client.get(
"/advertisements?public_key=abc123def456abc123def456abc123de"
)
assert response.status_code == 200
def test_advertisements_with_pagination(self, client: TestClient) -> None:
"""Test advertisements page with pagination parameters."""
response = client.get("/advertisements?page=1&limit=25")
assert response.status_code == 200
def test_advertisements_page_2(self, client: TestClient) -> None:
"""Test advertisements page 2."""
response = client.get("/advertisements?page=2")
assert response.status_code == 200
def test_advertisements_with_all_filters(self, client: TestClient) -> None:
"""Test advertisements page with multiple filters."""
response = client.get(
"/advertisements?search=test&member_id=alice&page=1&limit=10"
)
assert response.status_code == 200
class TestAdvertisementsPageDropdowns:
"""Tests for advertisements page dropdown data."""
def test_advertisements_loads_members_for_dropdown(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that advertisements page loads members for filter dropdown."""
# Set up members response
mock_http_client.set_response(
"GET",
"/api/v1/members",
200,
{
"items": [
{"id": "m1", "member_id": "alice", "name": "Alice"},
{"id": "m2", "member_id": "bob", "name": "Bob"},
],
"total": 2,
},
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
assert response.status_code == 200
# Members should be available for dropdown
assert "Alice" in response.text or "alice" in response.text
def test_advertisements_loads_nodes_for_dropdown(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that advertisements page loads nodes for filter dropdown."""
# Set up nodes response with tags
mock_http_client.set_response(
"GET",
"/api/v1/nodes",
200,
{
"items": [
{
"id": "n1",
"public_key": "abc123",
"name": "Node Alpha",
"tags": [{"key": "name", "value": "Custom Name"}],
},
{
"id": "n2",
"public_key": "def456",
"name": "Node Beta",
"tags": [],
},
],
"total": 2,
},
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
assert response.status_code == 200
class TestAdvertisementsNodeSorting:
"""Tests for node sorting in advertisements dropdown."""
def test_nodes_sorted_by_display_name(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that nodes are sorted alphabetically by display name."""
# Set up nodes with tags - "Zebra" should come after "Alpha"
mock_http_client.set_response(
"GET",
"/api/v1/nodes",
200,
{
"items": [
{
"id": "n1",
"public_key": "abc123",
"name": "Zebra Node",
"tags": [],
},
{
"id": "n2",
"public_key": "def456",
"name": "Alpha Node",
"tags": [],
},
],
"total": 2,
},
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
assert response.status_code == 200
# Both nodes should appear
text = response.text
assert "Alpha Node" in text or "alpha" in text.lower()
assert "Zebra Node" in text or "zebra" in text.lower()
def test_nodes_sorted_by_tag_name_when_present(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that nodes use tag name for sorting when available."""
mock_http_client.set_response(
"GET",
"/api/v1/nodes",
200,
{
"items": [
{
"id": "n1",
"public_key": "abc123",
"name": "Zebra",
"tags": [{"key": "name", "value": "Alpha Custom"}],
},
],
"total": 1,
},
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
assert response.status_code == 200
def test_nodes_fallback_to_public_key_when_no_name(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that nodes fall back to public_key when no name."""
mock_http_client.set_response(
"GET",
"/api/v1/nodes",
200,
{
"items": [
{
"id": "n1",
"public_key": "abc123def456",
"name": None,
"tags": [],
},
],
"total": 1,
},
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
assert response.status_code == 200
class TestAdvertisementsPageAPIErrors:
"""Tests for advertisements page handling API errors."""
def test_advertisements_handles_api_error(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that advertisements page handles API errors gracefully."""
mock_http_client.set_response(
"GET", "/api/v1/advertisements", status_code=500, json_data=None
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
# Should still return 200 (page renders with empty list)
assert response.status_code == 200
def test_advertisements_handles_api_not_found(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that advertisements page handles API 404 gracefully."""
mock_http_client.set_response(
"GET",
"/api/v1/advertisements",
status_code=404,
json_data={"detail": "Not found"},
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
# Should still return 200 (page renders with empty list)
assert response.status_code == 200
def test_advertisements_handles_members_api_error(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that page handles members API error gracefully."""
mock_http_client.set_response(
"GET", "/api/v1/members", status_code=500, json_data=None
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
# Should still return 200 (page renders without member dropdown)
assert response.status_code == 200
def test_advertisements_handles_nodes_api_error(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that page handles nodes API error gracefully."""
mock_http_client.set_response(
"GET", "/api/v1/nodes", status_code=500, json_data=None
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
# Should still return 200 (page renders without node dropdown)
assert response.status_code == 200
def test_advertisements_handles_empty_response(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that page handles empty advertisements list."""
mock_http_client.set_response(
"GET",
"/api/v1/advertisements",
200,
{"items": [], "total": 0},
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
assert response.status_code == 200
class TestAdvertisementsPagination:
"""Tests for advertisements pagination calculations."""
def test_pagination_calculates_total_pages(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that pagination correctly calculates total pages."""
mock_http_client.set_response(
"GET",
"/api/v1/advertisements",
200,
{"items": [], "total": 150},
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
# With limit=50 and total=150, should have 3 pages
response = client.get("/advertisements?limit=50")
assert response.status_code == 200
def test_pagination_with_zero_total(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test pagination with zero results shows at least 1 page."""
mock_http_client.set_response(
"GET",
"/api/v1/advertisements",
200,
{"items": [], "total": 0},
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/advertisements")
assert response.status_code == 200

View File

@@ -0,0 +1,87 @@
"""Tests for the health check endpoints."""
from typing import Any
from fastapi.testclient import TestClient
from meshcore_hub import __version__
from tests.test_web.conftest import MockHttpClient
class TestHealthEndpoint:
"""Tests for the /health endpoint."""
def test_health_returns_200(self, client: TestClient) -> None:
"""Test that health endpoint returns 200 status code."""
response = client.get("/health")
assert response.status_code == 200
def test_health_returns_json(self, client: TestClient) -> None:
"""Test that health endpoint returns JSON content."""
response = client.get("/health")
assert "application/json" in response.headers["content-type"]
def test_health_returns_healthy_status(self, client: TestClient) -> None:
"""Test that health endpoint returns healthy status."""
response = client.get("/health")
data = response.json()
assert data["status"] == "healthy"
def test_health_returns_version(self, client: TestClient) -> None:
"""Test that health endpoint returns version."""
response = client.get("/health")
data = response.json()
assert data["version"] == __version__
class TestHealthReadyEndpoint:
"""Tests for the /health/ready endpoint."""
def test_health_ready_returns_200(self, client: TestClient) -> None:
"""Test that health/ready endpoint returns 200 status code."""
response = client.get("/health/ready")
assert response.status_code == 200
def test_health_ready_returns_json(self, client: TestClient) -> None:
"""Test that health/ready endpoint returns JSON content."""
response = client.get("/health/ready")
assert "application/json" in response.headers["content-type"]
def test_health_ready_returns_ready_status(self, client: TestClient) -> None:
"""Test that health/ready returns ready status when API is connected."""
response = client.get("/health/ready")
data = response.json()
assert data["status"] == "ready"
assert data["api"] == "connected"
def test_health_ready_with_api_error(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that health/ready handles API errors gracefully."""
mock_http_client.set_response("GET", "/health", status_code=500, json_data=None)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/health/ready")
assert response.status_code == 200
data = response.json()
assert data["status"] == "not_ready"
assert "status 500" in data["api"]
def test_health_ready_with_api_404(
self, web_app: Any, mock_http_client: MockHttpClient
) -> None:
"""Test that health/ready handles API 404 response."""
mock_http_client.set_response(
"GET", "/health", status_code=404, json_data={"detail": "Not found"}
)
web_app.state.http_client = mock_http_client
client = TestClient(web_app, raise_server_exceptions=True)
response = client.get("/health/ready")
assert response.status_code == 200
data = response.json()
assert data["status"] == "not_ready"
assert "status 404" in data["api"]