feat: implement transport key generation and update API endpoints for optional transport key handling

This commit is contained in:
Lloyd
2025-11-16 21:37:17 +00:00
parent da4006d230
commit deaed4b376
3 changed files with 63 additions and 5 deletions
+36 -1
View File
@@ -2,6 +2,8 @@ import json
import logging
import sqlite3
import time
import secrets
import base64
from pathlib import Path
from typing import Optional, Dict, Any, List
@@ -664,8 +666,41 @@ class SQLiteHandler:
logger.error(f"Failed to get adverts by contact_type '{contact_type}': {e}")
return []
def create_transport_key(self, name: str, flood_policy: str, transport_key: str, parent_id: Optional[int] = None, last_used: Optional[float] = None) -> Optional[int]:
def generate_transport_key(self, key_length_bytes: int = 32) -> str:
"""
Generate a cryptographically secure transport key.
Args:
key_length_bytes: Length of the key in bytes (default: 32 bytes = 256 bits)
Returns:
A base64-encoded secure random key
"""
try:
# Generate cryptographically secure random bytes
random_bytes = secrets.token_bytes(key_length_bytes)
# Encode to base64 for safe storage and transmission
key = base64.b64encode(random_bytes).decode('utf-8')
logger.debug(f"Generated transport key with {key_length_bytes} bytes ({len(key)} base64 chars)")
return key
except Exception as e:
logger.error(f"Failed to generate transport key: {e}")
# Fallback to a simpler method if crypto fails
import random
import string
fallback_key = ''.join(random.choices(string.ascii_letters + string.digits, k=key_length_bytes))
logger.warning(f"Using fallback key generation method")
return base64.b64encode(fallback_key.encode()).decode('utf-8')
def create_transport_key(self, name: str, flood_policy: str, transport_key: Optional[str] = None, parent_id: Optional[int] = None, last_used: Optional[float] = None) -> Optional[int]:
try:
# Generate key if not provided
if transport_key is None:
transport_key = self.generate_transport_key()
current_time = time.time()
with sqlite3.connect(self.sqlite_path) as conn:
cursor = conn.execute("""
@@ -93,7 +93,7 @@ class StorageCollector:
def close(self):
self.mqtt_handler.close()
def create_transport_key(self, name: str, flood_policy: str, transport_key: str, parent_id: Optional[int] = None, last_used: Optional[float] = None) -> Optional[int]:
def create_transport_key(self, name: str, flood_policy: str, transport_key: Optional[str] = None, parent_id: Optional[int] = None, last_used: Optional[float] = None) -> Optional[int]:
return self.sqlite_handler.create_transport_key(name, flood_policy, transport_key, parent_id, last_used)
def get_transport_keys(self) -> list:
+26 -3
View File
@@ -684,6 +684,7 @@ class APIEndpoints:
@cherrypy.expose
@cherrypy.tools.json_out()
@cherrypy.tools.json_in()
@cors_enabled
def transport_keys(self):
if cherrypy.request.method == "GET":
@@ -700,16 +701,28 @@ class APIEndpoints:
data = cherrypy.request.json or {}
name = data.get("name")
flood_policy = data.get("flood_policy")
transport_key = data.get("transport_key")
transport_key = data.get("transport_key") # Optional now
parent_id = data.get("parent_id")
last_used = data.get("last_used")
if not name or not flood_policy or not transport_key:
return self._error("Missing required fields: name, flood_policy, transport_key")
if not name or not flood_policy:
return self._error("Missing required fields: name, flood_policy")
if flood_policy not in ["allow", "deny"]:
return self._error("flood_policy must be 'allow' or 'deny'")
# Convert ISO timestamp string to float if provided
if last_used:
try:
from datetime import datetime
dt = datetime.fromisoformat(last_used.replace('Z', '+00:00'))
last_used = dt.timestamp()
except (ValueError, AttributeError):
# If conversion fails, use current time
last_used = time.time()
else:
last_used = time.time()
storage = self._get_storage()
key_id = storage.create_transport_key(name, flood_policy, transport_key, parent_id, last_used)
@@ -723,6 +736,7 @@ class APIEndpoints:
@cherrypy.expose
@cherrypy.tools.json_out()
@cherrypy.tools.json_in()
@cors_enabled
def transport_key(self, key_id):
if cherrypy.request.method == "GET":
@@ -754,6 +768,15 @@ class APIEndpoints:
if flood_policy and flood_policy not in ["allow", "deny"]:
return self._error("flood_policy must be 'allow' or 'deny'")
# Convert ISO timestamp string to float if provided
if last_used:
try:
dt = datetime.fromisoformat(last_used.replace('Z', '+00:00'))
last_used = dt.timestamp()
except (ValueError, AttributeError):
# If conversion fails, leave as None to not update
last_used = None
storage = self._get_storage()
success = storage.update_transport_key(key_id, name, flood_policy, transport_key, parent_id, last_used)