Rename channel visibility 'public' to 'community'

- Rename ChannelVisibility.PUBLIC to ChannelVisibility.COMMUNITY
- Update stored value from 'public' to 'community' across model, schema, API, CLI, and frontend
- Add Alembic migration to update existing database rows
- Consolidate upgrade docs: merge v0.11.0, v0.12.0, v0.13.0 into single v0.11.0 section
- Add i18n visibility level translation keys (en, nl)
- Update section headings on channels page to use t() for i18n
- Keep visibility badges lowercase per UI design
This commit is contained in:
Louis King
2026-06-04 14:07:12 +01:00
parent 1491c49ef7
commit f8c2a7bb40
24 changed files with 1545 additions and 86 deletions
-10
View File
@@ -194,16 +194,6 @@ PACKETCAPTURE_EXIT_ON_RECONNECT_FAIL=true
# =============================================================================
# The collector subscribes to MQTT events and stores them in the database
# LetsMesh decoder support
# The native Python decoder is always enabled.
# Optional: channel secret keys (comma or space separated) used to decrypt GroupText
# -------------------
# Channel Settings
# -------------------
# Channel keys are now managed via the database (channels table).
# Use `meshcore-hub collector channel add --name X --key HEX` or seed via channels.yaml.
# See docs/seeding.md for the channels.yaml format.
#
# Refresh interval for reloading channel keys from the database (seconds).
# CHANNEL_REFRESH_INTERVAL_SECONDS=300
+1 -1
View File
@@ -154,7 +154,7 @@ Group/broadcast messages on specific channels.
**Field Descriptions**:
- `channel_idx`: Channel number (0-255) when available
- `channel_name`: Channel display label (e.g., `"Public"`, `"#test"`) when available
- `channel_name`: Channel display label (e.g., `"Public"`, `"Community"`, `"#test"`) when available
- `pubkey_prefix`: First 12 characters of the source public key prefix, used for message identification when available
- `path_len`: Number of hops message traveled
- `txt_type`: Message type indicator (0=plain, 2=signed, etc.)
@@ -0,0 +1,29 @@
"""rename channel visibility public to community
Revision ID: 20260604_1200
Revises: 82dff87d6576
Create Date: 2026-06-04 12:00:00.000000+00:00
"""
from typing import Sequence, Union
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "20260604_1200"
down_revision: Union[str, None] = "82dff87d6576"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
op.execute(
"UPDATE channels SET visibility = 'community' WHERE visibility = 'public'"
)
def downgrade() -> None:
op.execute(
"UPDATE channels SET visibility = 'public' WHERE visibility = 'community'"
)
+4
View File
@@ -390,6 +390,10 @@ Channel management and filter UI:
| `name_label` | Channel Name | Form label |
| `key_label` | Channel Key (hex) | Form label |
| `visibility_label` | Visibility | Form label |
| `visibility_community` | Community | Community visibility section heading |
| `visibility_member` | Member | Member visibility section heading |
| `visibility_operator` | Operator | Operator visibility section heading |
| `visibility_admin` | Admin | Admin visibility section heading |
| `enabled_label` | Enabled | Form label |
| `channel_hash_label` | Hash | Column header |
| `disabled` | Disabled | Disabled channel badge |
+1 -1
View File
@@ -86,7 +86,7 @@ MyChannel:
Key rules:
- Keys must be uppercase hex, 32 characters (AES-128) or 64 characters (AES-256)
- Seeded channels always have `visibility: public` — to set member/operator/admin visibility, use the CLI or API
- Seeded channels always have `visibility: community` — to set member/operator/admin visibility, use the CLI or API
- The `Public` and `test` built-in keys are always loaded into the decoder regardless of database contents
- Test channel messages are only stored when a `test` channel row exists in the database with `enabled: true`
+13 -34
View File
@@ -2,7 +2,15 @@
This guide covers upgrading from a previous MeshCore Hub release to the current version. Check the relevant version section below before upgrading.
## v0.13.0
## v0.11.0
### Channel Visibility Rename: "public" → "community"
The channel visibility level `"public"` has been renamed to `"community"` to avoid confusion with MeshCore's concept of public channels. All MeshCore channels are private (encrypted) in protocol terms, so "community" better reflects the access level.
The Alembic migration automatically updates existing `visibility='public'` rows to `visibility='community'`. No manual database changes are required.
API consumers that filter channels by `visibility=public` must update to `visibility=community`.
### Database-Backed Channel Keys
@@ -16,7 +24,7 @@ Channel decryption keys are now managed via the `channels` database table instea
| `name` | `VARCHAR(100), UNIQUE` | Channel display name |
| `key_hex` | `VARCHAR(64), UNIQUE` | Uppercase hex key (32 or 64 chars) |
| `channel_hash` | `VARCHAR(2)` | First byte of SHA-256 of key |
| `visibility` | `VARCHAR(20)` | `public`, `member`, `operator`, or `admin` |
| `visibility` | `VARCHAR(20)` | `community`, `member`, `operator`, or `admin` |
| `enabled` | `BOOLEAN` | Whether the channel is active |
| `created_at`, `updated_at` | `DATETIME` | Timestamps |
@@ -30,7 +38,7 @@ Channel decryption keys are now managed via the `channels` database table instea
**Migration steps:**
1. Run `meshcore-hub db upgrade` to create the `channels` table
1. Run `meshcore-hub db upgrade` to create the `channels` table and update visibility values
2. Convert any `COLLECTOR_CHANNEL_KEYS` values to either:
- A `channels.yaml` seed file in `SEED_HOME` (see `docs/seeding.md`)
- Database rows via CLI: `meshcore-hub collector channel add --name X --key HEX`
@@ -39,38 +47,9 @@ Channel decryption keys are now managed via the `channels` database table instea
**Test channel behavior change:** Test channel messages (channel_idx 217) are now discarded by default unless a `test` channel row exists in the database with `enabled=true`. Previously this was controlled by `COLLECTOR_INCLUDE_TEST_CHANNEL`.
## v0.12.0
### Advertisement Route Type & Deduplication
### Advertisement Route Type & Deduplication Improvements
This release adds route type tracking and improves advertisement deduplication to better distinguish between flood and zero-hop (local) advertisements.
**New database columns on `advertisements` table:**
| Column | Type | Description |
|--------|------|-------------|
| `route_type` | `VARCHAR(20), nullable` | Route type: `flood`, `transport_flood`, `direct`, `transport_direct` |
| `advert_timestamp` | `DATETIME, nullable` | Node's own Unix timestamp from the advert payload |
Both columns are nullable — existing records will have `NULL` values. The Alembic migration adds these columns automatically.
**Default API filter change:**
`GET /api/v1/advertisements` now defaults to `route_type=flood,transport_flood`, showing only flood advertisements. Existing records with `route_type=NULL` are included in all default queries to avoid hiding historical data. Pass `route_type=all` to see all types.
**Dashboard metrics now flood-only:**
All dashboard advertisement counts (`total_advertisements`, `advertisements_24h`, `advertisements_7d`, `recent_advertisements`, and `/activity`) now count only flood/transport_flood adverts plus NULL (historical records).
**Deduplication bucket increased from 120s to 300s:**
Both `compute_advertisement_hash()` and `compute_telemetry_hash()` now use a 5-minute (300-second) deduplication bucket instead of the previous 2-minute (120-second) bucket. This reduces duplicate records when multiple observers report the same event within a 5-minute window.
**Advertisement deduplication now uses node timestamp:**
When available, the node's own `advert_timestamp` is used for deduplication bucketing instead of `received_at`. This means the same flood advertisement observed by multiple receivers will correctly deduplicate even if received several minutes apart. Node timestamps that deviate by more than 4 hours from `received_at` are rejected for bucketing (the raw value is still stored).
## v0.11.0
Advertisement route type tracking and improved deduplication are included. New `route_type` and `advert_timestamp` columns are added to the `advertisements` table automatically by the migration. The API defaults to showing flood advertisements only. Deduplication uses a 5-minute bucket with node timestamps when available.
### Async SQLite Foreign Key Fix
+2 -2
View File
@@ -10,7 +10,7 @@ from sqlalchemy import select
from meshcore_hub.api.dependencies import DbSession
from meshcore_hub.common.models.channel import Channel
VISIBILITY_LEVELS = {"public": 0, "member": 1, "operator": 2, "admin": 3}
VISIBILITY_LEVELS = {"community": 0, "member": 1, "operator": 2, "admin": 3}
def resolve_user_role(request: Request) -> str | None:
@@ -34,7 +34,7 @@ def resolve_user_role(request: Request) -> str | None:
def get_max_visibility_level(role: str | None) -> int:
"""Get the maximum visibility level for a given role.
Returns 0 for anonymous users (public only).
Returns 0 for anonymous users (community only).
"""
if role is None:
return 0
+4 -4
View File
@@ -348,9 +348,9 @@ def channel_list_cmd(ctx: click.Context) -> None:
)
@click.option(
"--visibility",
type=click.Choice(["public", "member", "operator", "admin"]),
default="public",
help="Channel visibility level (default: public)",
type=click.Choice(["community", "member", "operator", "admin"]),
default="community",
help="Channel visibility level (default: community)",
)
@click.pass_context
def channel_add_cmd(
@@ -633,7 +633,7 @@ def _import_channels(
name=name,
key_hex=key_hex,
channel_hash=Channel.compute_channel_hash(key_hex),
visibility="public",
visibility="community",
enabled=enabled,
)
session.add(channel)
+27 -12
View File
@@ -1,11 +1,11 @@
"""Database connection and session management."""
from contextlib import asynccontextmanager, contextmanager
from typing import AsyncGenerator, Generator
from typing import Any, AsyncGenerator, Generator
from sqlalchemy import create_engine, event
from sqlalchemy.engine import Engine
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy.orm import Session, sessionmaker
from meshcore_hub.common.models.base import Base
@@ -88,6 +88,8 @@ class DatabaseManager:
"""Database connection manager.
Manages database engine and session creation for a component.
The async engine is created lazily on first async session access
to avoid leaking connections when only sync operations are needed.
"""
def __init__(self, database_url: str, echo: bool = False):
@@ -98,6 +100,7 @@ class DatabaseManager:
echo: Enable SQL query logging
"""
self.database_url = database_url
self._echo = echo
# Ensure parent directory exists for SQLite databases
if database_url.startswith("sqlite:///"):
@@ -110,14 +113,24 @@ class DatabaseManager:
self.engine = create_database_engine(database_url, echo=echo)
self.session_factory = create_session_factory(self.engine)
# Create async engine for async operations
async_url = database_url.replace("sqlite://", "sqlite+aiosqlite://")
self.async_engine = create_async_engine(async_url, echo=echo)
# Lazy-initialized async engine (created on first async_session call)
self._async_engine: AsyncEngine | None = None
self._async_session_factory: Any = None
def _ensure_async_engine(self) -> None:
"""Create the async engine and session factory on first use."""
if self._async_engine is not None:
return
from sqlalchemy.ext.asyncio import async_sessionmaker
async_url = self.database_url.replace("sqlite://", "sqlite+aiosqlite://")
self._async_engine = create_async_engine(async_url, echo=self._echo)
# Enable foreign keys for async SQLite engine
if database_url.startswith("sqlite"):
if self.database_url.startswith("sqlite"):
@event.listens_for(self.async_engine.sync_engine, "connect")
@event.listens_for(self._async_engine.sync_engine, "connect")
def set_sqlite_pragma_async(
dbapi_connection: object, connection_record: object
) -> None:
@@ -125,10 +138,8 @@ class DatabaseManager:
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
from sqlalchemy.ext.asyncio import async_sessionmaker
self.async_session_factory = async_sessionmaker(
self.async_engine,
self._async_session_factory = async_sessionmaker(
self._async_engine,
class_=AsyncSession,
expire_on_commit=False,
)
@@ -183,12 +194,16 @@ class DatabaseManager:
result = await session.execute(select(Node))
await session.commit()
"""
async with self.async_session_factory() as session:
self._ensure_async_engine()
assert self._async_session_factory is not None
async with self._async_session_factory() as session:
yield session
def dispose(self) -> None:
"""Dispose of the database engine and connection pool."""
self.engine.dispose()
if self._async_engine is not None:
self._async_engine.sync_engine.dispose()
# Global database manager instance (initialized at runtime)
+3 -3
View File
@@ -12,7 +12,7 @@ from meshcore_hub.common.models.base import Base, TimestampMixin, UUIDMixin
class ChannelVisibility(str, Enum):
"""Channel visibility/permission levels."""
PUBLIC = "public"
COMMUNITY = "community"
MEMBER = "member"
OPERATOR = "operator"
ADMIN = "admin"
@@ -26,7 +26,7 @@ class Channel(Base, UUIDMixin, TimestampMixin):
name: Channel display name (unique, non-empty)
key_hex: Secret key as uppercase hex (supports AES-128 and AES-256)
channel_hash: First byte of SHA-256 of key_hex (2-char uppercase hex)
visibility: Permission level (public, member, operator, admin)
visibility: Permission level (community, member, operator, admin)
enabled: Whether the channel is active
created_at: Record creation timestamp
updated_at: Record update timestamp
@@ -40,7 +40,7 @@ class Channel(Base, UUIDMixin, TimestampMixin):
key_hex: Mapped[str] = mapped_column(String(64), unique=True, nullable=False)
channel_hash: Mapped[str] = mapped_column(String(2), nullable=False)
visibility: Mapped[str] = mapped_column(
String(20), default=ChannelVisibility.PUBLIC.value, nullable=False
String(20), default=ChannelVisibility.COMMUNITY.value, nullable=False
)
enabled: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
+3 -3
View File
@@ -22,8 +22,8 @@ class ChannelCreate(BaseModel):
max_length=64,
description="Channel secret key as uppercase hex (32 or 64 chars)",
)
visibility: Literal["public", "member", "operator", "admin"] = Field(
default="public",
visibility: Literal["community", "member", "operator", "admin"] = Field(
default="community",
description="Channel visibility/permission level",
)
enabled: bool = Field(
@@ -52,7 +52,7 @@ class ChannelUpdate(BaseModel):
max_length=64,
description="Channel secret key as uppercase hex",
)
visibility: Optional[Literal["public", "member", "operator", "admin"]] = Field(
visibility: Optional[Literal["community", "member", "operator", "admin"]] = Field(
default=None,
description="Channel visibility/permission level",
)
+13 -9
View File
@@ -180,16 +180,20 @@ def _build_channel_labels() -> dict[str, str]:
from meshcore_hub.common.database import DatabaseManager
db = DatabaseManager(settings.effective_database_url)
from meshcore_hub.common.models.channel import Channel
try:
from meshcore_hub.common.models.channel import Channel
with db.session_scope() as session:
channels = session.query(Channel).filter(Channel.enabled.is_(True)).all()
db_decoder = LetsMeshPacketDecoder(
channel_keys=[f"{ch.name}={ch.key_hex}" for ch in channels]
)
db_labels = db_decoder.channel_labels_by_index()
labels.update(db_labels)
db.dispose()
with db.session_scope() as session:
channels = (
session.query(Channel).filter(Channel.enabled.is_(True)).all()
)
db_decoder = LetsMeshPacketDecoder(
channel_keys=[f"{ch.name}={ch.key_hex}" for ch in channels]
)
db_labels = db_decoder.channel_labels_by_index()
labels.update(db_labels)
finally:
db.dispose()
except Exception as e:
logger.warning("Failed to load channel labels from database: %s", e)
@@ -2,7 +2,7 @@ import { apiGet, apiPost, apiPut, apiDelete } from '../api.js';
import { html, litRender, nothing, t, errorAlert, getConfig, hasRole } from '../components.js';
import { iconChannel, iconPlus, iconEdit, iconTrash, iconLock } from '../icons.js';
const VISIBILITY_ORDER = ['public', 'member', 'operator', 'admin'];
const VISIBILITY_ORDER = ['community', 'member', 'operator', 'admin'];
function renderVisibilityBadge(visibility, oidcEnabled) {
if (!oidcEnabled) return nothing;
@@ -83,7 +83,7 @@ function renderChannelModal({ channel, isEdit, onSave, onCancel }) {
pattern="[0-9A-Fa-f]{32,64}" />` : nothing}
<label class="label-text text-right">${t('channels.visibility_label')}</label>
<select id="channel-modal-visibility" class="select select-bordered select-sm">
<option value="public" .selected=${channel?.visibility === 'public' || !channel}>public</option>
<option value="community" .selected=${channel?.visibility === 'community' || !channel}>community</option>
<option value="member" .selected=${channel?.visibility === 'member'}>member</option>
<option value="operator" .selected=${channel?.visibility === 'operator'}>operator</option>
<option value="admin" .selected=${channel?.visibility === 'admin'}>admin</option>
@@ -151,7 +151,7 @@ export async function render(container, params, router) {
groups.set(vis, []);
}
for (const ch of channelsList) {
const vis = ch.visibility || 'public';
const vis = ch.visibility || 'community';
if (!groups.has(vis)) groups.set(vis, []);
groups.get(vis).push(ch);
}
@@ -169,7 +169,7 @@ export async function render(container, params, router) {
const group = groups.get(vis);
if (!group || group.length === 0) continue;
groupedSections.push(html`
<h2 class="text-lg font-semibold mt-6 mb-3 opacity-70">${vis.charAt(0).toUpperCase() + vis.slice(1)}</h2>
<h2 class="text-lg font-semibold mt-6 mb-3 opacity-70">${t(`channels.visibility_${vis}`)}</h2>
<div class="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 gap-4">
${group.map(ch => renderChannelCard(ch, cardOpts))}
</div>
@@ -220,7 +220,7 @@ export async function render(container, params, router) {
}
function handleAdd() {
modalState = { type: 'add', channel: { visibility: 'public', enabled: true } };
modalState = { type: 'add', channel: { visibility: 'community', enabled: true } };
renderPage(channels);
}
@@ -233,6 +233,10 @@
"name_label": "Channel Name",
"key_label": "Channel Key (hex)",
"visibility_label": "Visibility",
"visibility_community": "Community",
"visibility_member": "Member",
"visibility_operator": "Operator",
"visibility_admin": "Admin",
"enabled_label": "Enabled",
"channel_hash_label": "Hash",
"disabled": "Disabled",
@@ -170,6 +170,12 @@
"empty_state_description": "Nog geen leden.",
"empty_description": "Leden verschijnen hier zodra gebruikers inloggen en knooppunten adopteren."
},
"channels": {
"visibility_community": "Community",
"visibility_member": "Lid",
"visibility_operator": "Operator",
"visibility_admin": "Beheerder"
},
"not_found": {
"description": "De pagina die u zoekt bestaat niet of is verplaatst."
},
+51
View File
@@ -21,6 +21,7 @@ from meshcore_hub.common.database import DatabaseManager
from meshcore_hub.common.models import (
Advertisement,
Base,
Channel,
Message,
Node,
NodeTag,
@@ -445,3 +446,53 @@ def sample_adopted_node(api_db_session, sample_user_profile, sample_node):
api_db_session.commit()
api_db_session.refresh(association)
return association
@pytest.fixture
def sample_channel(api_db_session):
"""Create a sample community channel in the database."""
channel = Channel(
name="TestChannel",
key_hex="AABBCCDDEEFF00112233445566778899",
channel_hash=Channel.compute_channel_hash("AABBCCDDEEFF00112233445566778899"),
visibility="community",
enabled=True,
)
api_db_session.add(channel)
api_db_session.commit()
api_db_session.refresh(channel)
return channel
@pytest.fixture
def sample_member_channel(api_db_session):
"""Create a sample member-only channel in the database."""
key = "11223344556677889900AABBCCDDEEFF"
channel = Channel(
name="MemberChannel",
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility="member",
enabled=True,
)
api_db_session.add(channel)
api_db_session.commit()
api_db_session.refresh(channel)
return channel
@pytest.fixture
def sample_admin_channel(api_db_session):
"""Create a sample admin-only channel in the database."""
key = "FFEEDDCCBBAA99887766554433221100"
channel = Channel(
name="AdminChannel",
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility="admin",
enabled=True,
)
api_db_session.add(channel)
api_db_session.commit()
api_db_session.refresh(channel)
return channel
+312
View File
@@ -0,0 +1,312 @@
"""Tests for channel_visibility helpers."""
from unittest.mock import MagicMock
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from meshcore_hub.api.channel_visibility import (
get_all_known_channel_indices,
get_max_visibility_level,
get_visible_channel_indices,
resolve_user_role,
)
from meshcore_hub.common.models import Base
from meshcore_hub.common.models.channel import Channel
@pytest.fixture
def db_session():
"""Create an in-memory SQLite database session."""
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(engine)
engine.dispose()
def _make_request(
headers: dict | None = None, app_state: dict | None = None
) -> MagicMock:
"""Create a mock FastAPI Request."""
from types import SimpleNamespace
request = MagicMock()
request.headers = headers or {}
state = SimpleNamespace(**(app_state or {}))
request.app.state = state
return request
class TestResolveUserRole:
"""Tests for resolve_user_role()."""
def test_no_header_returns_none(self) -> None:
"""No X-User-Roles header returns None."""
request = _make_request(headers={})
assert resolve_user_role(request) is None
def test_empty_header_returns_none(self) -> None:
"""Empty X-User-Roles header returns None."""
request = _make_request(headers={"x-user-roles": ""})
assert resolve_user_role(request) is None
def test_admin_role(self) -> None:
"""Admin role is resolved correctly."""
request = _make_request(headers={"x-user-roles": "admin"})
assert resolve_user_role(request) == "admin"
def test_operator_role(self) -> None:
"""Operator role is resolved correctly."""
request = _make_request(headers={"x-user-roles": "operator"})
assert resolve_user_role(request) == "operator"
def test_member_role(self) -> None:
"""Member role is resolved correctly."""
request = _make_request(headers={"x-user-roles": "member"})
assert resolve_user_role(request) == "member"
def test_admin_takes_precedence_over_member(self) -> None:
"""Admin takes precedence when multiple roles present."""
request = _make_request(headers={"x-user-roles": "member,admin"})
assert resolve_user_role(request) == "admin"
def test_operator_takes_precedence_over_member(self) -> None:
"""Operator takes precedence over member."""
request = _make_request(headers={"x-user-roles": "member,operator"})
assert resolve_user_role(request) == "operator"
def test_admin_takes_precedence_over_all(self) -> None:
"""Admin takes precedence over operator and member."""
request = _make_request(headers={"x-user-roles": "member,operator,admin"})
assert resolve_user_role(request) == "admin"
def test_unknown_role_returns_none(self) -> None:
"""Unknown role returns None."""
request = _make_request(headers={"x-user-roles": "viewer"})
assert resolve_user_role(request) is None
def test_custom_role_names(self) -> None:
"""Custom OIDC role names from app.state are recognized."""
request = _make_request(
headers={"x-user-roles": "superadmin,moderator"},
app_state={
"oidc_role_admin": "superadmin",
"oidc_role_operator": "moderator",
"oidc_role_member": "user",
},
)
assert resolve_user_role(request) == "admin"
def test_custom_member_role_name(self) -> None:
"""Custom member role name is recognized."""
request = _make_request(
headers={"x-user-roles": "user"},
app_state={
"oidc_role_member": "user",
},
)
assert resolve_user_role(request) == "member"
def test_whitespace_in_header(self) -> None:
"""Whitespace around role names is handled."""
request = _make_request(headers={"x-user-roles": " admin , member "})
assert resolve_user_role(request) == "admin"
class TestGetMaxVisibilityLevel:
"""Tests for get_max_visibility_level()."""
def test_none_returns_zero(self) -> None:
"""Anonymous users get level 0 (community only)."""
assert get_max_visibility_level(None) == 0
def test_community_returns_zero(self) -> None:
assert get_max_visibility_level("community") == 0
def test_member_returns_one(self) -> None:
assert get_max_visibility_level("member") == 1
def test_operator_returns_two(self) -> None:
assert get_max_visibility_level("operator") == 2
def test_admin_returns_three(self) -> None:
assert get_max_visibility_level("admin") == 3
def test_unknown_returns_zero(self) -> None:
assert get_max_visibility_level("unknown") == 0
class TestGetVisibleChannelIndices:
"""Tests for get_visible_channel_indices()."""
def test_always_includes_idx_17(self, db_session) -> None:
"""Built-in Public channel (idx 17) is always visible."""
indices = get_visible_channel_indices(db_session, 0)
assert 17 in indices
def test_community_channels_visible_at_level_0(self, db_session) -> None:
"""Community channels are visible at level 0."""
key = "AABBCCDDEEFF00112233445566778899"
ch = Channel(
name="Community",
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility="community",
)
db_session.add(ch)
db_session.commit()
indices = get_visible_channel_indices(db_session, 0)
expected_idx = int(ch.channel_hash, 16)
assert expected_idx in indices
assert 17 in indices
def test_member_channels_hidden_at_level_0(self, db_session) -> None:
"""Member channels are hidden at level 0."""
key = "11223344556677889900AABBCCDDEEFF"
ch = Channel(
name="MembersOnly",
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility="member",
)
db_session.add(ch)
db_session.commit()
indices = get_visible_channel_indices(db_session, 0)
ch_idx = int(ch.channel_hash, 16)
assert ch_idx not in indices
def test_member_channels_visible_at_level_1(self, db_session) -> None:
"""Member channels are visible at level 1."""
key = "11223344556677889900AABBCCDDEEFF"
ch = Channel(
name="MemberCh",
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility="member",
)
db_session.add(ch)
db_session.commit()
indices = get_visible_channel_indices(db_session, 1)
ch_idx = int(ch.channel_hash, 16)
assert ch_idx in indices
def test_admin_channels_visible_at_level_3(self, db_session) -> None:
"""Admin channels are visible at level 3."""
key = "FFEEDDCCBBAA99887766554433221100"
ch = Channel(
name="AdminCh",
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility="admin",
)
db_session.add(ch)
db_session.commit()
indices = get_visible_channel_indices(db_session, 3)
ch_idx = int(ch.channel_hash, 16)
assert ch_idx in indices
def test_admin_channels_hidden_at_level_1(self, db_session) -> None:
"""Admin channels are hidden at level 1."""
key = "FFEEDDCCBBAA99887766554433221100"
ch = Channel(
name="AdminCh",
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility="admin",
)
db_session.add(ch)
db_session.commit()
indices = get_visible_channel_indices(db_session, 1)
ch_idx = int(ch.channel_hash, 16)
assert ch_idx not in indices
def test_mixed_visibility_channels(self, db_session) -> None:
"""Multiple channels with different visibility levels."""
pub_key = "AABBCCDDEEFF00112233445566778899"
mem_key = "11223344556677889900AABBCCDDEEFF"
adm_key = "FFEEDDCCBBAA99887766554433221100"
for name, key, vis in [
("Community", pub_key, "community"),
("Member", mem_key, "member"),
("Admin", adm_key, "admin"),
]:
db_session.add(
Channel(
name=name,
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility=vis,
)
)
db_session.commit()
level_0 = get_visible_channel_indices(db_session, 0)
level_1 = get_visible_channel_indices(db_session, 1)
level_3 = get_visible_channel_indices(db_session, 3)
pub_idx = int(Channel.compute_channel_hash(pub_key), 16)
mem_idx = int(Channel.compute_channel_hash(mem_key), 16)
adm_idx = int(Channel.compute_channel_hash(adm_key), 16)
assert pub_idx in level_0
assert mem_idx not in level_0
assert adm_idx not in level_0
assert pub_idx in level_1
assert mem_idx in level_1
assert adm_idx not in level_1
assert pub_idx in level_3
assert mem_idx in level_3
assert adm_idx in level_3
assert 17 in level_0
assert 17 in level_1
assert 17 in level_3
class TestGetAllKnownChannelIndices:
"""Tests for get_all_known_channel_indices()."""
def test_empty_db(self, db_session) -> None:
"""Empty DB returns empty set."""
indices = get_all_known_channel_indices(db_session)
assert indices == set()
def test_returns_all_indices(self, db_session) -> None:
"""Returns all channel indices from DB."""
key1 = "AABBCCDDEEFF00112233445566778899"
key2 = "11223344556677889900AABBCCDDEEFF"
for name, key in [("Ch1", key1), ("Ch2", key2)]:
db_session.add(
Channel(
name=name,
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
)
)
db_session.commit()
indices = get_all_known_channel_indices(db_session)
idx1 = int(Channel.compute_channel_hash(key1), 16)
idx2 = int(Channel.compute_channel_hash(key2), 16)
assert indices == {idx1, idx2}
def test_does_not_include_builtin_17(self, db_session) -> None:
"""Does not include the built-in Public channel (17) unless in DB."""
indices = get_all_known_channel_indices(db_session)
assert 17 not in indices
+323
View File
@@ -0,0 +1,323 @@
"""Tests for channel API routes."""
from meshcore_hub.common.models import Channel
VALID_KEY_32 = "A" * 32
VALID_KEY_64 = "B" * 64
ALT_KEY_32 = "C" * 32
class TestListChannels:
"""Tests for GET /channels endpoint."""
def test_list_channels_empty(self, client_no_auth):
"""Test listing channels when database is empty."""
response = client_no_auth.get("/api/v1/channels")
assert response.status_code == 200
data = response.json()
assert data["items"] == []
assert data["total"] == 0
def test_list_channels_with_data(self, client_no_auth, sample_channel):
"""Test listing channels with data in database."""
response = client_no_auth.get("/api/v1/channels")
assert response.status_code == 200
data = response.json()
assert len(data["items"]) == 1
assert data["total"] == 1
assert data["items"][0]["name"] == "TestChannel"
assert data["items"][0]["key_hex"] is not None
assert data["items"][0]["masked_key"] is not None
def test_list_channels_anonymous_only_community(
self, client_no_auth, api_db_session
):
"""Anonymous users only see community channels."""
pub_key = "AABBCCDDEEFF00112233445566778899"
mem_key = "11223344556677889900AABBCCDDEEFF"
for name, key, vis in [
("Community", pub_key, "community"),
("Secret", mem_key, "member"),
]:
ch = Channel(
name=name,
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility=vis,
enabled=True,
)
api_db_session.add(ch)
api_db_session.commit()
response = client_no_auth.get("/api/v1/channels")
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert data["items"][0]["name"] == "Community"
def test_list_channels_admin_sees_all(self, client_no_auth, api_db_session):
"""Admin role header allows seeing all channels."""
pub_key = "AABBCCDDEEFF00112233445566778899"
mem_key = "11223344556677889900AABBCCDDEEFF"
adm_key = "FFEEDDCCBBAA99887766554433221100"
for name, key, vis in [
("Community", pub_key, "community"),
("Member", mem_key, "member"),
("Admin", adm_key, "admin"),
]:
ch = Channel(
name=name,
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility=vis,
enabled=True,
)
api_db_session.add(ch)
api_db_session.commit()
response = client_no_auth.get(
"/api/v1/channels",
headers={"X-User-Roles": "admin"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 3
def test_list_channels_member_sees_community_and_member(
self, client_no_auth, api_db_session
):
"""Member role sees community and member channels, not admin."""
pub_key = "AABBCCDDEEFF00112233445566778899"
mem_key = "11223344556677889900AABBCCDDEEFF"
adm_key = "FFEEDDCCBBAA99887766554433221100"
for name, key, vis in [
("Community", pub_key, "community"),
("MemberCh", mem_key, "member"),
("AdminCh", adm_key, "admin"),
]:
ch = Channel(
name=name,
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
visibility=vis,
enabled=True,
)
api_db_session.add(ch)
api_db_session.commit()
response = client_no_auth.get(
"/api/v1/channels",
headers={"X-User-Roles": "member"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 2
names = {item["name"] for item in data["items"]}
assert names == {"Community", "MemberCh"}
class TestCreateChannel:
"""Tests for POST /channels endpoint."""
def test_create_channel_success(self, client_no_auth):
"""Test creating a channel successfully."""
response = client_no_auth.post(
"/api/v1/channels",
json={
"name": "NewChannel",
"key_hex": VALID_KEY_32,
"visibility": "community",
"enabled": True,
},
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "NewChannel"
assert data["visibility"] == "community"
assert data["enabled"] is True
assert data["key_hex"] == VALID_KEY_32
assert data["masked_key"] == f"{VALID_KEY_32[:4]}...{VALID_KEY_32[-4:]}"
assert data["channel_hash"] == Channel.compute_channel_hash(VALID_KEY_32)
assert data["id"] is not None
assert data["created_at"] is not None
def test_create_channel_duplicate_name(self, client_no_auth, sample_channel):
"""Test creating channel with duplicate name returns 409."""
response = client_no_auth.post(
"/api/v1/channels",
json={
"name": "TestChannel",
"key_hex": ALT_KEY_32,
},
)
assert response.status_code == 409
assert "already exists" in response.json()["detail"]
def test_create_channel_duplicate_key(self, client_no_auth, sample_channel):
"""Test creating channel with duplicate key returns 409."""
response = client_no_auth.post(
"/api/v1/channels",
json={
"name": "DifferentName",
"key_hex": sample_channel.key_hex,
},
)
assert response.status_code == 409
assert "Key already in use" in response.json()["detail"]
def test_create_channel_invalid_key(self, client_no_auth):
"""Test creating channel with invalid key returns 422."""
response = client_no_auth.post(
"/api/v1/channels",
json={
"name": "BadKey",
"key_hex": "NOT-HEX",
},
)
assert response.status_code == 422
def test_create_channel_aes256_key(self, client_no_auth):
"""Test creating channel with AES-256 key (64 hex chars)."""
response = client_no_auth.post(
"/api/v1/channels",
json={
"name": "AES256",
"key_hex": VALID_KEY_64,
},
)
assert response.status_code == 201
assert response.json()["key_hex"] == VALID_KEY_64
def test_create_channel_with_auth(self, client_with_auth):
"""Test creating channel requires admin key."""
response = client_with_auth.post(
"/api/v1/channels",
json={
"name": "AuthChannel",
"key_hex": VALID_KEY_32,
},
)
assert response.status_code == 401
response = client_with_auth.post(
"/api/v1/channels",
headers={"Authorization": "Bearer test-admin-key"},
json={
"name": "AuthChannel",
"key_hex": VALID_KEY_32,
},
)
assert response.status_code == 201
class TestUpdateChannel:
"""Tests for PUT /channels/{channel_id} endpoint."""
def test_update_channel_visibility(self, client_no_auth, sample_channel):
"""Test updating channel visibility."""
response = client_no_auth.put(
f"/api/v1/channels/{sample_channel.id}",
json={"visibility": "member"},
)
assert response.status_code == 200
assert response.json()["visibility"] == "member"
def test_update_channel_key(self, client_no_auth, sample_channel):
"""Test updating channel key regenerates hash."""
response = client_no_auth.put(
f"/api/v1/channels/{sample_channel.id}",
json={"key_hex": ALT_KEY_32},
)
assert response.status_code == 200
data = response.json()
assert data["key_hex"] == ALT_KEY_32
assert data["channel_hash"] == Channel.compute_channel_hash(ALT_KEY_32)
def test_update_channel_enabled(self, client_no_auth, sample_channel):
"""Test disabling a channel."""
response = client_no_auth.put(
f"/api/v1/channels/{sample_channel.id}",
json={"enabled": False},
)
assert response.status_code == 200
assert response.json()["enabled"] is False
def test_update_channel_not_found(self, client_no_auth):
"""Test updating non-existent channel returns 404."""
response = client_no_auth.put(
"/api/v1/channels/nonexistent-id",
json={"visibility": "admin"},
)
assert response.status_code == 404
def test_update_channel_duplicate_key(self, client_no_auth, api_db_session):
"""Test updating key to one already in use returns 409."""
key1 = "AABBCCDDEEFF00112233445566778899"
key2 = "11223344556677889900AABBCCDDEEFF"
ch1 = Channel(
name="Ch1",
key_hex=key1,
channel_hash=Channel.compute_channel_hash(key1),
)
ch2 = Channel(
name="Ch2",
key_hex=key2,
channel_hash=Channel.compute_channel_hash(key2),
)
api_db_session.add_all([ch1, ch2])
api_db_session.commit()
response = client_no_auth.put(
f"/api/v1/channels/{ch1.id}",
json={"key_hex": key2},
)
assert response.status_code == 409
def test_update_channel_same_key_allowed(self, client_no_auth, sample_channel):
"""Test updating channel with its own key is allowed."""
response = client_no_auth.put(
f"/api/v1/channels/{sample_channel.id}",
json={"key_hex": sample_channel.key_hex},
)
assert response.status_code == 200
class TestDeleteChannel:
"""Tests for DELETE /channels/{channel_id} endpoint."""
def test_delete_channel_success(self, client_no_auth, sample_channel):
"""Test deleting a channel."""
response = client_no_auth.delete(f"/api/v1/channels/{sample_channel.id}")
assert response.status_code == 204
response = client_no_auth.get("/api/v1/channels")
assert response.status_code == 200
assert response.json()["total"] == 0
def test_delete_channel_not_found(self, client_no_auth):
"""Test deleting non-existent channel returns 404."""
response = client_no_auth.delete("/api/v1/channels/nonexistent-id")
assert response.status_code == 404
def test_delete_channel_with_auth(self, client_with_auth, api_db_session):
"""Test deleting channel requires admin key."""
key = "AABBCCDDEEFF00112233445566778899"
ch = Channel(
name="ToDelete",
key_hex=key,
channel_hash=Channel.compute_channel_hash(key),
)
api_db_session.add(ch)
api_db_session.commit()
response = client_with_auth.delete(f"/api/v1/channels/{ch.id}")
assert response.status_code == 401
response = client_with_auth.delete(
f"/api/v1/channels/{ch.id}",
headers={"Authorization": "Bearer test-admin-key"},
)
assert response.status_code == 204
+137 -1
View File
@@ -5,7 +5,7 @@ from unittest.mock import patch
import pytest
from meshcore_hub.common.models import Advertisement, Message, Node
from meshcore_hub.common.models import Advertisement, Message, Node, Channel
from meshcore_hub.common.models import UserProfile
@@ -439,3 +439,139 @@ class TestDashboardFloodOnlyFilter:
data = response.json()
total_count = sum(point["count"] for point in data["data"])
assert total_count == 1
class TestDashboardChannelVisibility:
"""Tests for channel visibility filtering on dashboard stats."""
@pytest.fixture
def channels_with_messages(self, api_db_session):
"""Create public and admin channels with messages."""
pub_key = "AABBCCDDEEFF00112233445566778899"
adm_key = "FFEEDDCCBBAA99887766554433221100"
pub_idx = int(Channel.compute_channel_hash(pub_key), 16)
adm_idx = int(Channel.compute_channel_hash(adm_key), 16)
pub_ch = Channel(
name="CommunityCh",
key_hex=pub_key,
channel_hash=Channel.compute_channel_hash(pub_key),
visibility="community",
enabled=True,
)
adm_ch = Channel(
name="AdminCh",
key_hex=adm_key,
channel_hash=Channel.compute_channel_hash(adm_key),
visibility="admin",
enabled=True,
)
api_db_session.add_all([pub_ch, adm_ch])
pub_msg = Message(
message_type="channel",
channel_idx=pub_idx,
text="Public message",
received_at=datetime.now(timezone.utc),
)
adm_msg = Message(
message_type="channel",
channel_idx=adm_idx,
text="Admin message",
received_at=datetime.now(timezone.utc),
)
direct_msg = Message(
message_type="direct",
pubkey_prefix="abc123",
text="Direct message",
received_at=datetime.now(timezone.utc),
)
api_db_session.add_all([pub_msg, adm_msg, direct_msg])
api_db_session.commit()
return pub_idx, adm_idx
def test_anonymous_sees_only_community_messages(
self, client_no_auth, channels_with_messages
):
"""Anonymous users only see community and direct messages in stats."""
response = client_no_auth.get("/api/v1/dashboard/stats")
assert response.status_code == 200
data = response.json()
assert data["total_messages"] == 2
def test_admin_sees_all_messages(self, client_no_auth, channels_with_messages):
"""Admin users see all messages in stats."""
response = client_no_auth.get(
"/api/v1/dashboard/stats",
headers={"X-User-Roles": "admin"},
)
assert response.status_code == 200
data = response.json()
assert data["total_messages"] == 3
def test_channel_message_counts_filtered(
self, client_no_auth, channels_with_messages
):
"""Channel message counts exclude hidden channels."""
pub_idx, adm_idx = channels_with_messages
response = client_no_auth.get("/api/v1/dashboard/stats")
assert response.status_code == 200
data = response.json()
assert str(pub_idx) in data["channel_message_counts"]
assert str(adm_idx) not in data["channel_message_counts"]
def test_admin_channel_message_counts_all(
self, client_no_auth, channels_with_messages
):
"""Admin users see all channel message counts."""
pub_idx, adm_idx = channels_with_messages
response = client_no_auth.get(
"/api/v1/dashboard/stats",
headers={"X-User-Roles": "admin"},
)
assert response.status_code == 200
data = response.json()
assert str(pub_idx) in data["channel_message_counts"]
assert str(adm_idx) in data["channel_message_counts"]
def test_message_activity_respects_visibility(self, client_no_auth, api_db_session):
"""Message activity endpoint filters by channel visibility."""
adm_key = "FFEEDDCCBBAA99887766554433221100"
adm_idx = int(Channel.compute_channel_hash(adm_key), 16)
adm_ch = Channel(
name="AdminCh",
key_hex=adm_key,
channel_hash=Channel.compute_channel_hash(adm_key),
visibility="admin",
enabled=True,
)
api_db_session.add(adm_ch)
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
adm_msg = Message(
message_type="channel",
channel_idx=adm_idx,
text="Admin msg",
received_at=yesterday,
)
api_db_session.add(adm_msg)
api_db_session.commit()
response_anon = client_no_auth.get("/api/v1/dashboard/message-activity")
assert response_anon.status_code == 200
anon_data = response_anon.json()
anon_total = sum(p["count"] for p in anon_data["data"])
assert anon_total == 0
response_admin = client_no_auth.get(
"/api/v1/dashboard/message-activity",
headers={"X-User-Roles": "admin"},
)
assert response_admin.status_code == 200
admin_data = response_admin.json()
admin_total = sum(p["count"] for p in admin_data["data"])
assert admin_total >= 1
+125 -1
View File
@@ -2,7 +2,9 @@
from datetime import datetime, timedelta, timezone
from meshcore_hub.common.models import EventObserver, Message, Node, NodeTag
import pytest
from meshcore_hub.common.models import EventObserver, Message, Node, NodeTag, Channel
class TestListMessages:
@@ -465,3 +467,125 @@ class TestMessageSort:
assert response.status_code == 200
items = response.json()["items"]
assert items[0]["text"] == "New"
class TestMessageChannelVisibility:
"""Tests for channel visibility filtering on messages."""
@pytest.fixture
def messages_with_visibility(self, api_db_session):
"""Create messages on public and admin channels."""
pub_key = "AABBCCDDEEFF00112233445566778899"
adm_key = "FFEEDDCCBBAA99887766554433221100"
pub_idx = int(Channel.compute_channel_hash(pub_key), 16)
adm_idx = int(Channel.compute_channel_hash(adm_key), 16)
pub_ch = Channel(
name="CommunityCh",
key_hex=pub_key,
channel_hash=Channel.compute_channel_hash(pub_key),
visibility="community",
enabled=True,
)
adm_ch = Channel(
name="AdminCh",
key_hex=adm_key,
channel_hash=Channel.compute_channel_hash(adm_key),
visibility="admin",
enabled=True,
)
api_db_session.add_all([pub_ch, adm_ch])
pub_msg = Message(
message_type="channel",
channel_idx=pub_idx,
text="Community channel message",
received_at=datetime.now(timezone.utc),
)
adm_msg = Message(
message_type="channel",
channel_idx=adm_idx,
text="Admin channel message",
received_at=datetime.now(timezone.utc),
)
direct_msg = Message(
message_type="direct",
pubkey_prefix="abc123",
text="Direct message",
received_at=datetime.now(timezone.utc),
)
api_db_session.add_all([pub_msg, adm_msg, direct_msg])
api_db_session.commit()
return pub_msg, adm_msg, direct_msg
def test_anonymous_sees_only_community_channel_messages(
self, client_no_auth, messages_with_visibility
):
"""Anonymous users see community channel and direct messages only."""
response = client_no_auth.get("/api/v1/messages")
assert response.status_code == 200
data = response.json()
assert data["total"] == 2
texts = {item["text"] for item in data["items"]}
assert "Community channel message" in texts
assert "Direct message" in texts
assert "Admin channel message" not in texts
def test_admin_sees_all_channel_messages(
self, client_no_auth, messages_with_visibility
):
"""Admin users see all channel messages."""
response = client_no_auth.get(
"/api/v1/messages",
headers={"X-User-Roles": "admin"},
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 3
texts = {item["text"] for item in data["items"]}
assert "Community channel message" in texts
assert "Admin channel message" in texts
assert "Direct message" in texts
def test_get_message_hidden_channel_returns_404(
self, client_no_auth, messages_with_visibility
):
"""Getting a message on a hidden channel returns 404."""
pub_msg, adm_msg, direct_msg = messages_with_visibility
response = client_no_auth.get(f"/api/v1/messages/{adm_msg.id}")
assert response.status_code == 404
def test_get_message_hidden_channel_visible_to_admin(
self, client_no_auth, messages_with_visibility
):
"""Admin can get a message on an admin channel."""
pub_msg, adm_msg, direct_msg = messages_with_visibility
response = client_no_auth.get(
f"/api/v1/messages/{adm_msg.id}",
headers={"X-User-Roles": "admin"},
)
assert response.status_code == 200
assert response.json()["text"] == "Admin channel message"
def test_get_message_community_channel_visible(
self, client_no_auth, messages_with_visibility
):
"""Anonymous can get a message on a community channel."""
pub_msg, adm_msg, direct_msg = messages_with_visibility
response = client_no_auth.get(f"/api/v1/messages/{pub_msg.id}")
assert response.status_code == 200
assert response.json()["text"] == "Community channel message"
def test_direct_messages_always_visible(
self, client_no_auth, messages_with_visibility
):
"""Direct messages are always visible regardless of channel visibility."""
pub_msg, adm_msg, direct_msg = messages_with_visibility
response = client_no_auth.get(f"/api/v1/messages/{direct_msg.id}")
assert response.status_code == 200
assert response.json()["text"] == "Direct message"
+201
View File
@@ -984,3 +984,204 @@ class TestCreateSubscriber:
assert subscriber is not None
MockMQTT.assert_called_once()
class TestChannelKeyRefresh:
"""Tests for channel key loading and refresh from database."""
@pytest.fixture
def mock_mqtt_client(self):
"""Create a mock MQTT client."""
client = MagicMock()
client.topic_builder = MagicMock()
client.topic_builder.prefix = "meshcore"
client.topic_builder.parse_letsmesh_upload_topic.return_value = (
"a" * 64,
"status",
)
return client
def test_load_channel_keys_from_db(self, mock_mqtt_client, db_manager):
"""Test loading channel keys from database."""
from meshcore_hub.common.models.channel import Channel
with db_manager.session_scope() as session:
ch = Channel(
name="TestCh",
key_hex="AABBCCDDEEFF00112233445566778899",
channel_hash=Channel.compute_channel_hash(
"AABBCCDDEEFF00112233445566778899"
),
visibility="community",
enabled=True,
)
session.add(ch)
subscriber = Subscriber(mock_mqtt_client, db_manager)
assert len(subscriber._db_channel_keys) == 1
assert "TestCh=AABBCCDDEEFF00112233445566778899" in subscriber._db_channel_keys
def test_load_channel_keys_detects_test_channel(self, mock_mqtt_client, db_manager):
"""Test that test channel is detected by name."""
from meshcore_hub.common.models.channel import Channel
with db_manager.session_scope() as session:
ch = Channel(
name="Test",
key_hex="AABBCCDDEEFF00112233445566778899",
channel_hash=Channel.compute_channel_hash(
"AABBCCDDEEFF00112233445566778899"
),
visibility="community",
enabled=True,
)
session.add(ch)
subscriber = Subscriber(mock_mqtt_client, db_manager)
assert subscriber._include_test_channel is True
def test_load_channel_keys_only_enabled(self, mock_mqtt_client, db_manager):
"""Test that only enabled channels are loaded."""
from meshcore_hub.common.models.channel import Channel
with db_manager.session_scope() as session:
ch1 = Channel(
name="Enabled",
key_hex="AABBCCDDEEFF00112233445566778899",
channel_hash=Channel.compute_channel_hash(
"AABBCCDDEEFF00112233445566778899"
),
enabled=True,
)
ch2 = Channel(
name="Disabled",
key_hex="11223344556677889900AABBCCDDEEFF",
channel_hash=Channel.compute_channel_hash(
"11223344556677889900AABBCCDDEEFF"
),
enabled=False,
)
session.add_all([ch1, ch2])
subscriber = Subscriber(mock_mqtt_client, db_manager)
assert len(subscriber._db_channel_keys) == 1
assert "Enabled=" in subscriber._db_channel_keys[0]
def test_load_channel_keys_handles_db_error(self, mock_mqtt_client, db_manager):
"""Test graceful handling of database errors during key loading."""
broken_db = MagicMock()
broken_db.session_scope.side_effect = Exception("DB connection failed")
subscriber = Subscriber(mock_mqtt_client, broken_db)
assert subscriber._db_channel_keys == []
assert subscriber._include_test_channel is False
def test_refresh_channel_keys_from_db(self, mock_mqtt_client, db_manager):
"""Test refreshing channel keys reloads the decoder."""
from meshcore_hub.common.models.channel import Channel
subscriber = Subscriber(mock_mqtt_client, db_manager)
assert len(subscriber._db_channel_keys) == 0
with db_manager.session_scope() as session:
ch = Channel(
name="NewCh",
key_hex="CCDDEEFF00112233445566778899AABB",
channel_hash=Channel.compute_channel_hash(
"CCDDEEFF00112233445566778899AABB"
),
visibility="community",
enabled=True,
)
session.add(ch)
with patch.object(subscriber._letsmesh_decoder, "reload_keys") as mock_reload:
subscriber._refresh_channel_keys_from_db()
assert len(subscriber._db_channel_keys) == 1
mock_reload.assert_called_once_with(subscriber._db_channel_keys)
def test_refresh_handles_db_error(self, mock_mqtt_client, db_manager):
"""Test refresh handles database errors gracefully."""
broken_db = MagicMock()
def broken_scope():
raise Exception("DB error")
broken_db.session_scope.side_effect = broken_scope
subscriber = Subscriber(broken_db, broken_db)
with patch.object(subscriber._letsmesh_decoder, "reload_keys"):
subscriber._refresh_channel_keys_from_db()
assert subscriber._db_channel_keys == []
def test_channel_refresh_scheduler_starts(self, mock_mqtt_client, db_manager):
"""Test channel refresh scheduler starts a daemon thread."""
subscriber = Subscriber(
mock_mqtt_client, db_manager, channel_refresh_interval_seconds=300
)
subscriber._running = True
subscriber._start_channel_refresh_scheduler()
assert subscriber._channel_refresh_thread is not None
assert subscriber._channel_refresh_thread.daemon is True
subscriber._running = False
subscriber._channel_refresh_thread.join(timeout=2.0)
def test_channel_refresh_scheduler_disabled(self, mock_mqtt_client, db_manager):
"""Test channel refresh scheduler is disabled when interval is 0."""
subscriber = Subscriber(
mock_mqtt_client, db_manager, channel_refresh_interval_seconds=0
)
subscriber._running = True
subscriber._start_channel_refresh_scheduler()
assert subscriber._channel_refresh_thread is None
subscriber._running = False
def test_channel_refresh_scheduler_stop(self, mock_mqtt_client, db_manager):
"""Test stopping the channel refresh scheduler."""
subscriber = Subscriber(
mock_mqtt_client, db_manager, channel_refresh_interval_seconds=300
)
subscriber._running = True
subscriber._start_channel_refresh_scheduler()
subscriber._running = False
subscriber._stop_channel_refresh_scheduler()
assert subscriber._channel_refresh_thread is not None
assert not subscriber._channel_refresh_thread.is_alive()
def test_load_channel_keys_empty_db(self, mock_mqtt_client, db_manager):
"""Test loading channel keys from empty database."""
subscriber = Subscriber(mock_mqtt_client, db_manager)
assert subscriber._db_channel_keys == []
assert subscriber._include_test_channel is False
def test_decoder_initialized_with_db_keys(self, mock_mqtt_client, db_manager):
"""Test decoder is initialized with database channel keys."""
from meshcore_hub.common.models.channel import Channel
key_hex = "DDEEFF00112233445566778899AABBCC"
with db_manager.session_scope() as session:
ch = Channel(
name="DecCh",
key_hex=key_hex,
channel_hash=Channel.compute_channel_hash(key_hex),
visibility="community",
enabled=True,
)
session.add(ch)
subscriber = Subscriber(mock_mqtt_client, db_manager)
assert key_hex in subscriber._letsmesh_decoder._channel_keys
+241
View File
@@ -0,0 +1,241 @@
"""Tests for Channel model and channel Pydantic schemas."""
import hashlib
import pytest
from pydantic import ValidationError
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from meshcore_hub.common.models import Base, Channel, ChannelVisibility
from meshcore_hub.common.schemas.channels import ChannelCreate, ChannelUpdate
@pytest.fixture
def db_session():
"""Create an in-memory SQLite database session."""
engine = create_engine(
"sqlite:///:memory:",
connect_args={"check_same_thread": False},
)
Base.metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()
yield session
session.close()
Base.metadata.drop_all(engine)
engine.dispose()
class TestChannelModel:
"""Tests for Channel SQLAlchemy model."""
def test_create_channel(self, db_session) -> None:
"""Test creating a channel in the database."""
key_hex = "AABBCCDDEEFF00112233445566778899"
channel = Channel(
name="TestCh",
key_hex=key_hex,
channel_hash=Channel.compute_channel_hash(key_hex),
visibility="community",
enabled=True,
)
db_session.add(channel)
db_session.commit()
assert channel.id is not None
assert channel.name == "TestCh"
assert channel.key_hex == key_hex
assert channel.visibility == "community"
assert channel.enabled is True
def test_channel_repr(self, db_session) -> None:
"""Test channel string representation."""
channel = Channel(
name="MyChannel",
key_hex="AA" * 16,
channel_hash="AB",
visibility="member",
)
assert repr(channel) == "<Channel(name=MyChannel, hash=AB, visibility=member)>"
def test_compute_channel_hash_aes128(self) -> None:
"""Test channel hash computation for AES-128 key (32 hex chars)."""
key_hex = "8B3387E9C5CDEA6AC9E5EDBAA115CD72"
expected = hashlib.sha256(bytes.fromhex(key_hex)).digest()[:1].hex().upper()
result = Channel.compute_channel_hash(key_hex)
assert result == expected
assert len(result) == 2
def test_compute_channel_hash_aes256(self) -> None:
"""Test channel hash computation for AES-256 key (64 hex chars)."""
key_hex = "A" * 64
expected = hashlib.sha256(bytes.fromhex(key_hex)).digest()[:1].hex().upper()
result = Channel.compute_channel_hash(key_hex)
assert result == expected
assert len(result) == 2
def test_masked_key_normal(self) -> None:
"""Test masked key shows first/last 4 chars for long keys."""
channel = Channel(
name="Ch",
key_hex="AABBCCDDEEFF00112233445566778899",
channel_hash="AB",
)
assert channel.masked_key == "AABB...8899"
def test_masked_key_short(self) -> None:
"""Test masked key returns full key when <= 8 chars."""
channel = Channel(
name="Ch",
key_hex="AABBCCDD",
channel_hash="AB",
)
assert channel.masked_key == "AABBCCDD"
def test_channel_visibility_enum(self) -> None:
"""Test ChannelVisibility enum values."""
assert ChannelVisibility.COMMUNITY.value == "community"
assert ChannelVisibility.MEMBER.value == "member"
assert ChannelVisibility.OPERATOR.value == "operator"
assert ChannelVisibility.ADMIN.value == "admin"
def test_channel_unique_name_constraint(self, db_session) -> None:
"""Test that duplicate channel names raise an error."""
key1 = "AABBCCDDEEFF00112233445566778899"
key2 = "11223344556677889900AABBCCDDEEFF"
ch1 = Channel(
name="Unique",
key_hex=key1,
channel_hash=Channel.compute_channel_hash(key1),
)
ch2 = Channel(
name="Unique",
key_hex=key2,
channel_hash=Channel.compute_channel_hash(key2),
)
db_session.add(ch1)
db_session.commit()
db_session.add(ch2)
with pytest.raises(Exception, match=""):
db_session.commit()
def test_channel_default_values(self, db_session) -> None:
"""Test channel default visibility and enabled."""
key_hex = "AABBCCDDEEFF00112233445566778899"
channel = Channel(
name="Defaults",
key_hex=key_hex,
channel_hash=Channel.compute_channel_hash(key_hex),
)
db_session.add(channel)
db_session.commit()
assert channel.visibility == "community"
assert channel.enabled is True
class TestChannelCreateSchema:
"""Tests for ChannelCreate Pydantic schema."""
def test_valid_32_char_key(self) -> None:
"""Test valid AES-128 key (32 hex chars)."""
schema = ChannelCreate(
name="Test",
key_hex="aabbccddeeff00112233445566778899",
)
assert schema.key_hex == "AABBCCDDEEFF00112233445566778899"
def test_valid_64_char_key(self) -> None:
"""Test valid AES-256 key (64 hex chars)."""
key = "A" * 64
schema = ChannelCreate(name="Test", key_hex=key)
assert schema.key_hex == key
def test_key_hex_uppercases(self) -> None:
"""Test that key_hex is normalized to uppercase."""
schema = ChannelCreate(
name="Test",
key_hex="aabbccddeeff00112233445566778899",
)
assert schema.key_hex == "AABBCCDDEEFF00112233445566778899"
def test_key_hex_strips_whitespace(self) -> None:
"""Test that key_hex strips whitespace."""
schema = ChannelCreate(
name="Test",
key_hex=" AABBCCDDEEFF00112233445566778899 ",
)
assert schema.key_hex == "AABBCCDDEEFF00112233445566778899"
def test_invalid_key_non_hex(self) -> None:
"""Test that non-hex characters are rejected."""
with pytest.raises(ValidationError, match="hexadecimal"):
ChannelCreate(name="Test", key_hex="G" * 32)
def test_invalid_key_wrong_length(self) -> None:
"""Test that wrong-length keys are rejected."""
with pytest.raises(ValidationError):
ChannelCreate(name="Test", key_hex="A" * 16)
def test_invalid_key_too_long(self) -> None:
"""Test that keys longer than 64 chars are rejected."""
with pytest.raises(ValidationError):
ChannelCreate(name="Test", key_hex="A" * 128)
def test_name_required(self) -> None:
"""Test that name is required."""
with pytest.raises(ValidationError):
ChannelCreate(key_hex="A" * 32) # type: ignore[call-arg]
def test_name_too_long(self) -> None:
"""Test that name max length is 100."""
with pytest.raises(ValidationError):
ChannelCreate(name="X" * 101, key_hex="A" * 32)
def test_default_visibility(self) -> None:
"""Test default visibility is community."""
schema = ChannelCreate(name="Test", key_hex="A" * 32)
assert schema.visibility == "community"
def test_default_enabled(self) -> None:
"""Test default enabled is True."""
schema = ChannelCreate(name="Test", key_hex="A" * 32)
assert schema.enabled is True
class TestChannelUpdateSchema:
"""Tests for ChannelUpdate Pydantic schema."""
def test_key_hex_none_passthrough(self) -> None:
"""Test that None key_hex is allowed."""
schema = ChannelUpdate()
assert schema.key_hex is None
def test_valid_key_hex(self) -> None:
"""Test valid key_hex update."""
schema = ChannelUpdate(key_hex="A" * 32)
assert schema.key_hex == "A" * 32
def test_key_hex_uppercases(self) -> None:
"""Test that key_hex is normalized to uppercase."""
schema = ChannelUpdate(key_hex="a" * 32)
assert schema.key_hex == "A" * 32
def test_invalid_key_non_hex(self) -> None:
"""Test that non-hex characters are rejected."""
with pytest.raises(ValidationError, match="hexadecimal"):
ChannelUpdate(key_hex="Z" * 32)
def test_invalid_key_wrong_length(self) -> None:
"""Test that wrong-length keys are rejected."""
with pytest.raises(ValidationError):
ChannelUpdate(key_hex="A" * 48)
def test_all_fields_optional(self) -> None:
"""Test that all fields are optional."""
schema = ChannelUpdate()
assert schema.key_hex is None
assert schema.visibility is None
assert schema.enabled is None
+33
View File
@@ -74,6 +74,19 @@ class TestCollectorSettings:
assert settings.channel_refresh_interval_seconds == 60
def test_channels_file_path(self) -> None:
"""channels_file property resolves to seed_home/channels.yaml."""
settings = CollectorSettings(_env_file=None, seed_home="/seed/data")
assert settings.channels_file == "/seed/data/channels.yaml"
def test_channels_file_default(self) -> None:
"""channels_file uses default seed_home."""
settings = CollectorSettings(_env_file=None)
assert settings.channels_file.endswith("channels.yaml")
assert "seed" in settings.channels_file
class TestAPISettings:
"""Tests for APISettings."""
@@ -109,3 +122,23 @@ class TestWebSettings:
settings = WebSettings(_env_file=None)
assert settings.network_announcement is None
def test_feature_channels_default_true(self) -> None:
"""Test that feature_channels defaults to True."""
settings = WebSettings(_env_file=None)
assert settings.feature_channels is True
def test_feature_channels_override(self) -> None:
"""Test that feature_channels can be disabled."""
settings = WebSettings(_env_file=None, feature_channels=False)
assert settings.feature_channels is False
def test_features_dict_includes_channels(self) -> None:
"""Test that features dict includes channels key."""
settings = WebSettings(_env_file=None)
features = settings.features
assert "channels" in features
assert features["channels"] is True
+7
View File
@@ -157,3 +157,10 @@ class TestEnJsonCompleteness:
"""Channel optgroup labels exist and resolve correctly."""
assert t("channels.optgroup_standard") == "Standard"
assert t("channels.optgroup_custom") == "Custom"
def test_channels_visibility_keys(self):
"""Channel visibility level labels exist and resolve correctly."""
assert t("channels.visibility_community") == "Community"
assert t("channels.visibility_member") == "Member"
assert t("channels.visibility_operator") == "Operator"
assert t("channels.visibility_admin") == "Admin"