mirror of
https://github.com/ipnet-mesh/meshcore-hub.git
synced 2026-03-28 17:42:56 +01:00
Refactor tags.json format to use public_key as object key
Change tag import format from flat list with repeated public_keys to an
object keyed by public_key with nested tags. This makes the JSON more
intuitive and reduces redundancy.
New format supports both shorthand (string values) and full format
(with value and type):
{
"0123456789abcdef...": {
"friendly_name": "My Node",
"location": {"value": "52.0,1.0", "type": "coordinate"}
}
}
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -1,28 +1,16 @@
|
||||
{
|
||||
"tags": [
|
||||
{
|
||||
"public_key": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
"key": "location",
|
||||
"value": "San Francisco, CA",
|
||||
"value_type": "string"
|
||||
},
|
||||
{
|
||||
"public_key": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
"key": "role",
|
||||
"value": "gateway",
|
||||
"value_type": "string"
|
||||
},
|
||||
{
|
||||
"public_key": "fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210",
|
||||
"key": "location",
|
||||
"value": "Oakland, CA",
|
||||
"value_type": "string"
|
||||
},
|
||||
{
|
||||
"public_key": "fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210",
|
||||
"key": "altitude",
|
||||
"value": "150",
|
||||
"value_type": "number"
|
||||
}
|
||||
]
|
||||
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef": {
|
||||
"friendly_name": "Gateway Node",
|
||||
"location": {"value": "37.7749,-122.4194", "type": "coordinate"},
|
||||
"lat": {"value": "37.7749", "type": "number"},
|
||||
"lon": {"value": "-122.4194", "type": "number"},
|
||||
"role": "gateway"
|
||||
},
|
||||
"fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210": {
|
||||
"friendly_name": "Oakland Repeater",
|
||||
"location": {"value": "37.8044,-122.2712", "type": "coordinate"},
|
||||
"lat": {"value": "37.8044", "type": "number"},
|
||||
"lon": {"value": "-122.2712", "type": "number"},
|
||||
"altitude": {"value": "150", "type": "number"}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -230,20 +230,26 @@ def import_tags_cmd(
|
||||
FILE is the path to the JSON file containing tags.
|
||||
If not provided, defaults to {DATA_HOME}/collector/tags.json.
|
||||
|
||||
Expected JSON format:
|
||||
Expected JSON format (keyed by public_key):
|
||||
\b
|
||||
{
|
||||
"tags": [
|
||||
{
|
||||
"public_key": "64-char-hex-string",
|
||||
"key": "tag-name",
|
||||
"value": "tag-value",
|
||||
"value_type": "string"
|
||||
}
|
||||
]
|
||||
"0123456789abcdef...": {
|
||||
"friendly_name": "My Node",
|
||||
"location": {"value": "52.0,1.0", "type": "coordinate"},
|
||||
"altitude": {"value": "150", "type": "number"}
|
||||
}
|
||||
}
|
||||
|
||||
Supported value_type: string, number, boolean, coordinate
|
||||
Shorthand is also supported (string values with default type):
|
||||
\b
|
||||
{
|
||||
"0123456789abcdef...": {
|
||||
"friendly_name": "My Node",
|
||||
"role": "gateway"
|
||||
}
|
||||
}
|
||||
|
||||
Supported types: string, number, boolean, coordinate
|
||||
"""
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@@ -6,7 +6,7 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field, field_validator
|
||||
from pydantic import BaseModel, Field, model_validator
|
||||
from sqlalchemy import select
|
||||
|
||||
from meshcore_hub.common.database import DatabaseManager
|
||||
@@ -15,44 +15,83 @@ from meshcore_hub.common.models import Node, NodeTag
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TagEntry(BaseModel):
|
||||
"""Schema for a tag entry in the import file."""
|
||||
class TagValue(BaseModel):
|
||||
"""Schema for a tag value with type."""
|
||||
|
||||
public_key: str = Field(..., min_length=64, max_length=64)
|
||||
key: str = Field(..., min_length=1, max_length=100)
|
||||
value: str | None = None
|
||||
value_type: str = Field(
|
||||
default="string", pattern=r"^(string|number|boolean|coordinate)$"
|
||||
)
|
||||
type: str = Field(default="string", pattern=r"^(string|number|boolean|coordinate)$")
|
||||
|
||||
@field_validator("public_key")
|
||||
|
||||
class NodeTags(BaseModel):
|
||||
"""Schema for tags associated with a node.
|
||||
|
||||
Each key in the model is a tag name, value is TagValue.
|
||||
"""
|
||||
|
||||
model_config = {"extra": "allow"}
|
||||
|
||||
@model_validator(mode="before")
|
||||
@classmethod
|
||||
def validate_public_key(cls, v: str) -> str:
|
||||
"""Validate that public_key is a valid hex string."""
|
||||
if not all(c in "0123456789abcdefABCDEF" for c in v):
|
||||
raise ValueError("public_key must be a valid hex string")
|
||||
return v.lower()
|
||||
def validate_tags(cls, data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Validate that all values are valid tag entries."""
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("Node tags must be a dictionary")
|
||||
|
||||
validated = {}
|
||||
for key, value in data.items():
|
||||
if isinstance(value, dict):
|
||||
# Full format: {"value": "...", "type": "..."}
|
||||
validated[key] = value
|
||||
elif isinstance(value, str):
|
||||
# Shorthand: just a string value
|
||||
validated[key] = {"value": value, "type": "string"}
|
||||
elif value is None:
|
||||
validated[key] = {"value": None, "type": "string"}
|
||||
else:
|
||||
# Convert other types to string
|
||||
validated[key] = {"value": str(value), "type": "string"}
|
||||
|
||||
return validated
|
||||
|
||||
|
||||
class TagsFile(BaseModel):
|
||||
"""Schema for the tags JSON file."""
|
||||
|
||||
tags: list[TagEntry]
|
||||
def validate_public_key(public_key: str) -> str:
|
||||
"""Validate that public_key is a valid 64-char hex string."""
|
||||
if len(public_key) != 64:
|
||||
raise ValueError(f"public_key must be 64 characters, got {len(public_key)}")
|
||||
if not all(c in "0123456789abcdefABCDEF" for c in public_key):
|
||||
raise ValueError("public_key must be a valid hex string")
|
||||
return public_key.lower()
|
||||
|
||||
|
||||
def load_tags_file(file_path: str | Path) -> TagsFile:
|
||||
def load_tags_file(file_path: str | Path) -> dict[str, dict[str, Any]]:
|
||||
"""Load and validate tags from a JSON file.
|
||||
|
||||
New format - dictionary keyed by public_key:
|
||||
{
|
||||
"0123456789abcdef...": {
|
||||
"friendly_name": "My Node",
|
||||
"location": {"value": "52.0,1.0", "type": "coordinate"},
|
||||
"altitude": {"value": "150", "type": "number"}
|
||||
}
|
||||
}
|
||||
|
||||
Shorthand is allowed - string values are auto-converted:
|
||||
{
|
||||
"0123456789abcdef...": {
|
||||
"friendly_name": "My Node"
|
||||
}
|
||||
}
|
||||
|
||||
Args:
|
||||
file_path: Path to the tags JSON file
|
||||
|
||||
Returns:
|
||||
Validated TagsFile instance
|
||||
Dictionary mapping public_key to tag dictionary
|
||||
|
||||
Raises:
|
||||
FileNotFoundError: If file does not exist
|
||||
json.JSONDecodeError: If file is not valid JSON
|
||||
pydantic.ValidationError: If file content is invalid
|
||||
ValueError: If file content is invalid
|
||||
"""
|
||||
path = Path(file_path)
|
||||
if not path.exists():
|
||||
@@ -61,7 +100,39 @@ def load_tags_file(file_path: str | Path) -> TagsFile:
|
||||
with open(path, "r") as f:
|
||||
data = json.load(f)
|
||||
|
||||
return TagsFile.model_validate(data)
|
||||
if not isinstance(data, dict):
|
||||
raise ValueError("Tags file must contain a JSON object")
|
||||
|
||||
# Validate each entry
|
||||
validated: dict[str, dict[str, Any]] = {}
|
||||
for public_key, tags in data.items():
|
||||
# Validate public key
|
||||
validated_key = validate_public_key(public_key)
|
||||
|
||||
# Validate tags
|
||||
if not isinstance(tags, dict):
|
||||
raise ValueError(f"Tags for {public_key[:12]}... must be a dictionary")
|
||||
|
||||
validated_tags: dict[str, Any] = {}
|
||||
for tag_key, tag_value in tags.items():
|
||||
if isinstance(tag_value, dict):
|
||||
# Full format with value and type
|
||||
validated_tags[tag_key] = {
|
||||
"value": tag_value.get("value"),
|
||||
"type": tag_value.get("type", "string"),
|
||||
}
|
||||
elif isinstance(tag_value, str):
|
||||
# Shorthand: just a string value
|
||||
validated_tags[tag_key] = {"value": tag_value, "type": "string"}
|
||||
elif tag_value is None:
|
||||
validated_tags[tag_key] = {"value": None, "type": "string"}
|
||||
else:
|
||||
# Convert other types to string
|
||||
validated_tags[tag_key] = {"value": str(tag_value), "type": "string"}
|
||||
|
||||
validated[validated_key] = validated_tags
|
||||
|
||||
return validated
|
||||
|
||||
|
||||
def import_tags(
|
||||
@@ -99,81 +170,93 @@ def import_tags(
|
||||
|
||||
# Load and validate file
|
||||
try:
|
||||
tags_file = load_tags_file(file_path)
|
||||
tags_data = load_tags_file(file_path)
|
||||
except Exception as e:
|
||||
stats["errors"].append(f"Failed to load tags file: {e}")
|
||||
return stats
|
||||
|
||||
stats["total"] = len(tags_file.tags)
|
||||
# Count total tags
|
||||
for tags in tags_data.values():
|
||||
stats["total"] += len(tags)
|
||||
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
with db.session_scope() as session:
|
||||
# Cache nodes by public_key to reduce queries
|
||||
node_cache: dict[str, Node] = {}
|
||||
|
||||
for tag_entry in tags_file.tags:
|
||||
for public_key, tags in tags_data.items():
|
||||
try:
|
||||
# Get or create node
|
||||
node = node_cache.get(tag_entry.public_key)
|
||||
node = node_cache.get(public_key)
|
||||
if node is None:
|
||||
query = select(Node).where(Node.public_key == tag_entry.public_key)
|
||||
query = select(Node).where(Node.public_key == public_key)
|
||||
node = session.execute(query).scalar_one_or_none()
|
||||
|
||||
if node is None:
|
||||
if create_nodes:
|
||||
node = Node(
|
||||
public_key=tag_entry.public_key,
|
||||
public_key=public_key,
|
||||
first_seen=now,
|
||||
last_seen=now,
|
||||
)
|
||||
session.add(node)
|
||||
session.flush()
|
||||
stats["nodes_created"] += 1
|
||||
logger.debug(
|
||||
f"Created node for {tag_entry.public_key[:12]}..."
|
||||
)
|
||||
logger.debug(f"Created node for {public_key[:12]}...")
|
||||
else:
|
||||
stats["skipped"] += 1
|
||||
stats["skipped"] += len(tags)
|
||||
logger.debug(
|
||||
f"Skipped tag for unknown node {tag_entry.public_key[:12]}..."
|
||||
f"Skipped {len(tags)} tags for unknown node {public_key[:12]}..."
|
||||
)
|
||||
continue
|
||||
|
||||
node_cache[tag_entry.public_key] = node
|
||||
node_cache[public_key] = node
|
||||
|
||||
# Find or create tag
|
||||
tag_query = select(NodeTag).where(
|
||||
NodeTag.node_id == node.id,
|
||||
NodeTag.key == tag_entry.key,
|
||||
)
|
||||
existing_tag = session.execute(tag_query).scalar_one_or_none()
|
||||
# Process each tag
|
||||
for tag_key, tag_data in tags.items():
|
||||
try:
|
||||
tag_value = tag_data.get("value")
|
||||
tag_type = tag_data.get("type", "string")
|
||||
|
||||
if existing_tag:
|
||||
# Update existing tag
|
||||
existing_tag.value = tag_entry.value
|
||||
existing_tag.value_type = tag_entry.value_type
|
||||
stats["updated"] += 1
|
||||
logger.debug(
|
||||
f"Updated tag {tag_entry.key}={tag_entry.value} "
|
||||
f"for {tag_entry.public_key[:12]}..."
|
||||
)
|
||||
else:
|
||||
# Create new tag
|
||||
new_tag = NodeTag(
|
||||
node_id=node.id,
|
||||
key=tag_entry.key,
|
||||
value=tag_entry.value,
|
||||
value_type=tag_entry.value_type,
|
||||
)
|
||||
session.add(new_tag)
|
||||
stats["created"] += 1
|
||||
logger.debug(
|
||||
f"Created tag {tag_entry.key}={tag_entry.value} "
|
||||
f"for {tag_entry.public_key[:12]}..."
|
||||
)
|
||||
# Find or create tag
|
||||
tag_query = select(NodeTag).where(
|
||||
NodeTag.node_id == node.id,
|
||||
NodeTag.key == tag_key,
|
||||
)
|
||||
existing_tag = session.execute(tag_query).scalar_one_or_none()
|
||||
|
||||
if existing_tag:
|
||||
# Update existing tag
|
||||
existing_tag.value = tag_value
|
||||
existing_tag.value_type = tag_type
|
||||
stats["updated"] += 1
|
||||
logger.debug(
|
||||
f"Updated tag {tag_key}={tag_value} "
|
||||
f"for {public_key[:12]}..."
|
||||
)
|
||||
else:
|
||||
# Create new tag
|
||||
new_tag = NodeTag(
|
||||
node_id=node.id,
|
||||
key=tag_key,
|
||||
value=tag_value,
|
||||
value_type=tag_type,
|
||||
)
|
||||
session.add(new_tag)
|
||||
stats["created"] += 1
|
||||
logger.debug(
|
||||
f"Created tag {tag_key}={tag_value} "
|
||||
f"for {public_key[:12]}..."
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error processing tag {tag_key} for {public_key[:12]}...: {e}"
|
||||
stats["errors"].append(error_msg)
|
||||
logger.error(error_msg)
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Error processing tag {tag_entry.key} for {tag_entry.public_key[:12]}...: {e}"
|
||||
error_msg = f"Error processing node {public_key[:12]}...: {e}"
|
||||
stats["errors"].append(error_msg)
|
||||
logger.error(error_msg)
|
||||
|
||||
|
||||
@@ -5,134 +5,61 @@ import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
from pydantic import ValidationError
|
||||
from sqlalchemy import select
|
||||
|
||||
from meshcore_hub.collector.tag_import import (
|
||||
TagEntry,
|
||||
TagsFile,
|
||||
import_tags,
|
||||
load_tags_file,
|
||||
validate_public_key,
|
||||
)
|
||||
from meshcore_hub.common.database import DatabaseManager
|
||||
from meshcore_hub.common.models import Node, NodeTag
|
||||
|
||||
|
||||
class TestTagEntry:
|
||||
"""Tests for TagEntry model."""
|
||||
class TestValidatePublicKey:
|
||||
"""Tests for validate_public_key function."""
|
||||
|
||||
def test_valid_tag_entry(self):
|
||||
"""Test creating a valid tag entry."""
|
||||
entry = TagEntry(
|
||||
public_key="0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
key="location",
|
||||
value="San Francisco",
|
||||
value_type="string",
|
||||
def test_valid_public_key(self):
|
||||
"""Test valid public key is returned lowercase."""
|
||||
result = validate_public_key(
|
||||
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
)
|
||||
assert (
|
||||
entry.public_key
|
||||
== "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
result == "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
)
|
||||
assert entry.key == "location"
|
||||
assert entry.value == "San Francisco"
|
||||
assert entry.value_type == "string"
|
||||
|
||||
def test_public_key_lowercase(self):
|
||||
"""Test that public key is normalized to lowercase."""
|
||||
entry = TagEntry(
|
||||
public_key="0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF",
|
||||
key="test",
|
||||
result = validate_public_key(
|
||||
"0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF"
|
||||
)
|
||||
assert (
|
||||
entry.public_key
|
||||
== "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
result == "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
)
|
||||
|
||||
def test_default_value_type(self):
|
||||
"""Test default value_type is string."""
|
||||
entry = TagEntry(
|
||||
public_key="0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
key="test",
|
||||
)
|
||||
assert entry.value_type == "string"
|
||||
|
||||
def test_invalid_public_key_length(self):
|
||||
"""Test that short public key is rejected."""
|
||||
with pytest.raises(ValidationError):
|
||||
TagEntry(
|
||||
public_key="0123456789abcdef",
|
||||
key="test",
|
||||
)
|
||||
with pytest.raises(ValueError, match="must be 64 characters"):
|
||||
validate_public_key("0123456789abcdef")
|
||||
|
||||
def test_invalid_public_key_chars(self):
|
||||
"""Test that non-hex public key is rejected."""
|
||||
with pytest.raises(ValidationError):
|
||||
TagEntry(
|
||||
public_key="zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz",
|
||||
key="test",
|
||||
with pytest.raises(ValueError, match="valid hex string"):
|
||||
validate_public_key(
|
||||
"zzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzzz"
|
||||
)
|
||||
|
||||
def test_invalid_value_type(self):
|
||||
"""Test that invalid value_type is rejected."""
|
||||
with pytest.raises(ValidationError):
|
||||
TagEntry(
|
||||
public_key="0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
key="test",
|
||||
value_type="invalid",
|
||||
)
|
||||
|
||||
def test_valid_value_types(self):
|
||||
"""Test all valid value types."""
|
||||
for vt in ["string", "number", "boolean", "coordinate"]:
|
||||
entry = TagEntry(
|
||||
public_key="0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
key="test",
|
||||
value_type=vt,
|
||||
)
|
||||
assert entry.value_type == vt
|
||||
|
||||
|
||||
class TestTagsFile:
|
||||
"""Tests for TagsFile model."""
|
||||
|
||||
def test_valid_tags_file(self):
|
||||
"""Test creating a valid tags file."""
|
||||
tags_file = TagsFile(
|
||||
tags=[
|
||||
TagEntry(
|
||||
public_key="0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
key="location",
|
||||
value="SF",
|
||||
),
|
||||
TagEntry(
|
||||
public_key="fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210",
|
||||
key="role",
|
||||
value="gateway",
|
||||
),
|
||||
]
|
||||
)
|
||||
assert len(tags_file.tags) == 2
|
||||
|
||||
def test_empty_tags(self):
|
||||
"""Test tags file with empty tags list."""
|
||||
tags_file = TagsFile(tags=[])
|
||||
assert len(tags_file.tags) == 0
|
||||
|
||||
|
||||
class TestLoadTagsFile:
|
||||
"""Tests for load_tags_file function."""
|
||||
|
||||
def test_load_valid_file(self):
|
||||
"""Test loading a valid tags file."""
|
||||
"""Test loading a valid tags file with new format."""
|
||||
data = {
|
||||
"tags": [
|
||||
{
|
||||
"public_key": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
"key": "location",
|
||||
"value": "San Francisco",
|
||||
"value_type": "string",
|
||||
}
|
||||
]
|
||||
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef": {
|
||||
"location": "San Francisco",
|
||||
"role": {"value": "gateway", "type": "string"},
|
||||
}
|
||||
}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
@@ -140,8 +67,51 @@ class TestLoadTagsFile:
|
||||
f.flush()
|
||||
|
||||
result = load_tags_file(f.name)
|
||||
assert len(result.tags) == 1
|
||||
assert result.tags[0].key == "location"
|
||||
assert len(result) == 1
|
||||
key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
assert key in result
|
||||
assert result[key]["location"]["value"] == "San Francisco"
|
||||
assert result[key]["location"]["type"] == "string"
|
||||
assert result[key]["role"]["value"] == "gateway"
|
||||
|
||||
Path(f.name).unlink()
|
||||
|
||||
def test_load_shorthand_format(self):
|
||||
"""Test loading file with shorthand string values."""
|
||||
data = {
|
||||
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef": {
|
||||
"friendly_name": "My Node",
|
||||
}
|
||||
}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(data, f)
|
||||
f.flush()
|
||||
|
||||
result = load_tags_file(f.name)
|
||||
key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
assert result[key]["friendly_name"]["value"] == "My Node"
|
||||
assert result[key]["friendly_name"]["type"] == "string"
|
||||
|
||||
Path(f.name).unlink()
|
||||
|
||||
def test_load_full_format(self):
|
||||
"""Test loading file with full format (value and type)."""
|
||||
data = {
|
||||
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef": {
|
||||
"location": {"value": "52.0,1.0", "type": "coordinate"},
|
||||
"altitude": {"value": "150", "type": "number"},
|
||||
}
|
||||
}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(data, f)
|
||||
f.flush()
|
||||
|
||||
result = load_tags_file(f.name)
|
||||
key = "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
|
||||
assert result[key]["location"]["type"] == "coordinate"
|
||||
assert result[key]["altitude"]["type"] == "number"
|
||||
|
||||
Path(f.name).unlink()
|
||||
|
||||
@@ -161,19 +131,45 @@ class TestLoadTagsFile:
|
||||
|
||||
Path(f.name).unlink()
|
||||
|
||||
def test_invalid_schema(self):
|
||||
"""Test loading file with invalid schema."""
|
||||
data: dict[str, list[str]] = {"not_tags": []}
|
||||
def test_invalid_schema_not_dict(self):
|
||||
"""Test loading file with invalid schema (not a dict)."""
|
||||
data = [{"public_key": "abc"}]
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(data, f)
|
||||
f.flush()
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
with pytest.raises(ValueError, match="must contain a JSON object"):
|
||||
load_tags_file(f.name)
|
||||
|
||||
Path(f.name).unlink()
|
||||
|
||||
def test_invalid_public_key(self):
|
||||
"""Test loading file with invalid public key."""
|
||||
data = {"invalid_key": {"tag": "value"}}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(data, f)
|
||||
f.flush()
|
||||
|
||||
with pytest.raises(ValueError, match="must be 64 characters"):
|
||||
load_tags_file(f.name)
|
||||
|
||||
Path(f.name).unlink()
|
||||
|
||||
def test_load_empty_file(self):
|
||||
"""Test loading empty tags file."""
|
||||
data: dict[str, dict[str, str]] = {}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(data, f)
|
||||
f.flush()
|
||||
|
||||
result = load_tags_file(f.name)
|
||||
assert len(result) == 0
|
||||
|
||||
Path(f.name).unlink()
|
||||
|
||||
|
||||
class TestImportTags:
|
||||
"""Tests for import_tags function."""
|
||||
@@ -188,28 +184,15 @@ class TestImportTags:
|
||||
|
||||
@pytest.fixture
|
||||
def sample_tags_file(self):
|
||||
"""Create a sample tags file."""
|
||||
"""Create a sample tags file with new format."""
|
||||
data = {
|
||||
"tags": [
|
||||
{
|
||||
"public_key": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
"key": "location",
|
||||
"value": "San Francisco",
|
||||
"value_type": "string",
|
||||
},
|
||||
{
|
||||
"public_key": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
"key": "role",
|
||||
"value": "gateway",
|
||||
"value_type": "string",
|
||||
},
|
||||
{
|
||||
"public_key": "fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210",
|
||||
"key": "altitude",
|
||||
"value": "100",
|
||||
"value_type": "number",
|
||||
},
|
||||
]
|
||||
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef": {
|
||||
"location": "San Francisco",
|
||||
"role": {"value": "gateway", "type": "string"},
|
||||
},
|
||||
"fedcba9876543210fedcba9876543210fedcba9876543210fedcba9876543210": {
|
||||
"altitude": {"value": "100", "type": "number"},
|
||||
},
|
||||
}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
@@ -294,8 +277,8 @@ class TestImportTags:
|
||||
assert "Failed to load" in stats["errors"][0]
|
||||
|
||||
def test_import_empty_file(self, db_manager):
|
||||
"""Test import with empty tags list."""
|
||||
data: dict[str, list[str]] = {"tags": []}
|
||||
"""Test import with empty tags object."""
|
||||
data: dict[str, dict[str, str]] = {}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(data, f)
|
||||
@@ -311,20 +294,10 @@ class TestImportTags:
|
||||
def test_import_preserves_value_type(self, db_manager):
|
||||
"""Test that import preserves value_type correctly."""
|
||||
data = {
|
||||
"tags": [
|
||||
{
|
||||
"public_key": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
"key": "count",
|
||||
"value": "42",
|
||||
"value_type": "number",
|
||||
},
|
||||
{
|
||||
"public_key": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef",
|
||||
"key": "active",
|
||||
"value": "true",
|
||||
"value_type": "boolean",
|
||||
},
|
||||
]
|
||||
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef": {
|
||||
"count": {"value": "42", "type": "number"},
|
||||
"active": {"value": "true", "type": "boolean"},
|
||||
}
|
||||
}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
@@ -341,3 +314,51 @@ class TestImportTags:
|
||||
assert tag_dict["active"].value_type == "boolean"
|
||||
|
||||
Path(f.name).unlink()
|
||||
|
||||
def test_import_null_value(self, db_manager):
|
||||
"""Test that null values are handled correctly."""
|
||||
data = {
|
||||
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef": {
|
||||
"empty_tag": None,
|
||||
}
|
||||
}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(data, f)
|
||||
f.flush()
|
||||
|
||||
stats = import_tags(f.name, db_manager, create_nodes=True)
|
||||
|
||||
assert stats["created"] == 1
|
||||
|
||||
with db_manager.session_scope() as session:
|
||||
tag = session.execute(select(NodeTag)).scalar_one()
|
||||
assert tag.key == "empty_tag"
|
||||
assert tag.value is None
|
||||
assert tag.value_type == "string"
|
||||
|
||||
Path(f.name).unlink()
|
||||
|
||||
def test_import_numeric_value_converted(self, db_manager):
|
||||
"""Test that numeric values are converted to strings."""
|
||||
data = {
|
||||
"0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef": {
|
||||
"num_val": 42,
|
||||
}
|
||||
}
|
||||
|
||||
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
|
||||
json.dump(data, f)
|
||||
f.flush()
|
||||
|
||||
stats = import_tags(f.name, db_manager, create_nodes=True)
|
||||
|
||||
assert stats["created"] == 1
|
||||
|
||||
with db_manager.session_scope() as session:
|
||||
tag = session.execute(select(NodeTag)).scalar_one()
|
||||
assert tag.key == "num_val"
|
||||
assert tag.value == "42"
|
||||
assert tag.value_type == "string"
|
||||
|
||||
Path(f.name).unlink()
|
||||
|
||||
Reference in New Issue
Block a user