Extend test to: serialization/deserialization with multi-byte paths

- Functionality of Packet.apply_path_hash_mode and get_path_hashes
- Engine flood_forward and direct_forward with real multi-byte encoded packets
- PacketBuilder.create_trace payload structure and TraceHandler parsing
- Enforcement of max-hop boundaries per hash size
This commit is contained in:
Lloyd
2026-03-11 14:23:29 +00:00
parent 155575865a
commit 596c96d1f4
26 changed files with 1551 additions and 31 deletions

View File

@@ -1003,8 +1003,15 @@ class RepeaterHandler(BaseHandler):
# Get neighbors from database
neighbors = self.storage.get_neighbors() if self.storage else {}
# Format local_hash respecting path_hash_mode
phm = self.config.get("mesh", {}).get("path_hash_mode", 0)
_bc = {0: 1, 1: 2, 2: 3}.get(phm, 1)
_hc = _bc * 2
_val = int.from_bytes(bytes(self.local_hash_bytes[:_bc]), "big")
local_hash_str = f"0x{_val:0{_hc}x}"
stats = {
"local_hash": f"0x{self.local_hash:02x}",
"local_hash": local_hash_str,
"duplicate_cache_size": len(self.seen_packets),
"cache_ttl": self.cache_ttl,
"rx_count": self.rx_count,

View File

@@ -241,6 +241,19 @@ class APIEndpoints:
cherrypy.response.headers["Allow"] = "POST"
raise cherrypy.HTTPError(405, "Method not allowed. This endpoint requires POST.")
def _fmt_hash(self, pubkey: bytes) -> str:
"""Format a node hash as a hex string respecting the configured path_hash_mode.
path_hash_mode 0 (default) → 1-byte "0x19"
path_hash_mode 1 → 2-byte "0x1927"
path_hash_mode 2 → 3-byte "0x192722"
"""
mode = self.config.get("mesh", {}).get("path_hash_mode", 0)
byte_count = {0: 1, 1: 2, 2: 3}.get(mode, 1)
hex_chars = byte_count * 2
value = int.from_bytes(bytes(pubkey[:byte_count]), "big")
return f"0x{value:0{hex_chars}X}"
def _get_time_range(self, hours):
end_time = int(time.time())
return end_time - (hours * 3600), end_time
@@ -2354,7 +2367,7 @@ class APIEndpoints:
if runtime_info:
identity_obj, config, identity_type = runtime_info
identity_config["runtime"] = {
"hash": f"0x{identity_obj.get_public_key()[0]:02X}",
"hash": self._fmt_hash(identity_obj.get_public_key()),
"address": identity_obj.get_address_bytes().hex(),
"type": identity_type,
"registered": True,
@@ -3090,7 +3103,7 @@ class APIEndpoints:
{
"name": "repeater",
"type": "repeater",
"hash": f"0x{repeater_hash:02X}",
"hash": self._fmt_hash(self.daemon_instance.local_identity.get_public_key()),
"max_clients": repeater_acl.max_clients,
"authenticated_clients": repeater_acl.get_num_clients(),
"has_admin_password": bool(repeater_acl.admin_password),
@@ -3109,7 +3122,7 @@ class APIEndpoints:
{
"name": name,
"type": "room_server",
"hash": f"0x{hash_byte:02X}",
"hash": self._fmt_hash(identity.get_public_key()),
"max_clients": acl.max_clients,
"authenticated_clients": acl.get_num_clients(),
"has_admin_password": bool(acl.admin_password),
@@ -3173,7 +3186,7 @@ class APIEndpoints:
identity_map[repeater_hash] = {
"name": "repeater",
"type": "repeater",
"hash": f"0x{repeater_hash:02X}",
"hash": self._fmt_hash(self.daemon_instance.local_identity.get_public_key()),
}
# Add room servers
@@ -3182,7 +3195,7 @@ class APIEndpoints:
identity_map[hash_byte] = {
"name": name,
"type": "room_server",
"hash": f"0x{hash_byte:02X}",
"hash": self._fmt_hash(identity.get_public_key()),
}
# Filter by identity if requested

View File

@@ -1 +1 @@
import{a as p,b as n,g as m,e as t,s as g,t as s,j as d,p as l}from"./index-DPDXVCFJ.js";const f={class:"flex items-center justify-between mb-4"},w={class:"text-xl font-semibold text-content-primary dark:text-content-primary"},v={class:"mb-6"},h={key:0,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},y={key:1,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},C={key:2,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},B={class:"text-content-secondary dark:text-content-primary/80 text-base leading-relaxed"},j={class:"flex gap-3"},_=p({__name:"ConfirmDialog",props:{show:{type:Boolean},title:{default:"Confirm Action"},message:{},confirmText:{default:"Confirm"},cancelText:{default:"Cancel"},variant:{default:"warning"}},emits:["close","confirm"],setup(c,{emit:b}){const o=c,r=b,u=i=>{i.target===i.currentTarget&&r("close")},k={danger:"bg-red-100 dark:bg-red-500/20 border-red-500/30 text-red-600 dark:text-red-400",warning:"bg-yellow-100 dark:bg-yellow-500/20 border-yellow-500/30 text-yellow-600 dark:text-yellow-400",info:"bg-blue-500/20 border-blue-500/30 text-blue-600 dark:text-blue-400"},x={danger:"bg-red-500 hover:bg-red-600",warning:"bg-yellow-500 hover:bg-yellow-600",info:"bg-blue-500 hover:bg-blue-600"};return(i,e)=>o.show?(l(),n("div",{key:0,onClick:u,class:"fixed inset-0 bg-black/40 backdrop-blur-lg z-[99999] flex items-center justify-center p-4",style:{"backdrop-filter":"blur(8px) saturate(180%)",position:"fixed",top:"0",left:"0",right:"0",bottom:"0"}},[t("div",{class:"bg-white dark:bg-surface-elevated backdrop-blur-xl rounded-[20px] p-6 w-full max-w-md border border-stroke-subtle dark:border-white/10",onClick:e[3]||(e[3]=g(()=>{},["stop"]))},[t("div",f,[t("h3",w,s(o.title),1),t("button",{onClick:e[0]||(e[0]=a=>r("close")),class:"text-content-secondary dark:text-content-muted hover:text-content-primary dark:hover:text-content-primary transition-colors"},e[4]||(e[4]=[t("svg",{class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M6 18L18 6M6 6l12 12"})],-1)]))]),t("div",v,[t("div",{class:d(["inline-flex p-3 rounded-xl mb-4",k[o.variant]])},[o.variant==="danger"?(l(),n("svg",h,e[5]||(e[5]=[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z"},null,-1)]))):o.variant==="warning"?(l(),n("svg",y,e[6]||(e[6]=[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z"},null,-1)]))):(l(),n("svg",C,e[7]||(e[7]=[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"},null,-1)])))],2),t("p",B,s(o.message),1)]),t("div",j,[t("button",{onClick:e[1]||(e[1]=a=>r("close")),class:"flex-1 px-4 py-3 rounded-xl bg-background-mute dark:bg-white/5 hover:bg-stroke-subtle dark:hover:bg-white/10 text-content-primary dark:text-content-primary transition-all duration-200 border border-stroke-subtle dark:border-stroke/10"},s(o.cancelText),1),t("button",{onClick:e[2]||(e[2]=a=>r("confirm")),class:d(["flex-1 px-4 py-3 rounded-xl text-white transition-all duration-200",x[o.variant]])},s(o.confirmText),3)])])])):m("",!0)}});export{_};
import{a as p,b as n,g as m,e as t,s as g,t as s,j as d,p as l}from"./index-oQ7o_f66.js";const f={class:"flex items-center justify-between mb-4"},w={class:"text-xl font-semibold text-content-primary dark:text-content-primary"},v={class:"mb-6"},h={key:0,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},y={key:1,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},C={key:2,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},B={class:"text-content-secondary dark:text-content-primary/80 text-base leading-relaxed"},j={class:"flex gap-3"},_=p({__name:"ConfirmDialog",props:{show:{type:Boolean},title:{default:"Confirm Action"},message:{},confirmText:{default:"Confirm"},cancelText:{default:"Cancel"},variant:{default:"warning"}},emits:["close","confirm"],setup(c,{emit:b}){const o=c,r=b,u=i=>{i.target===i.currentTarget&&r("close")},k={danger:"bg-red-100 dark:bg-red-500/20 border-red-500/30 text-red-600 dark:text-red-400",warning:"bg-yellow-100 dark:bg-yellow-500/20 border-yellow-500/30 text-yellow-600 dark:text-yellow-400",info:"bg-blue-500/20 border-blue-500/30 text-blue-600 dark:text-blue-400"},x={danger:"bg-red-500 hover:bg-red-600",warning:"bg-yellow-500 hover:bg-yellow-600",info:"bg-blue-500 hover:bg-blue-600"};return(i,e)=>o.show?(l(),n("div",{key:0,onClick:u,class:"fixed inset-0 bg-black/40 backdrop-blur-lg z-[99999] flex items-center justify-center p-4",style:{"backdrop-filter":"blur(8px) saturate(180%)",position:"fixed",top:"0",left:"0",right:"0",bottom:"0"}},[t("div",{class:"bg-white dark:bg-surface-elevated backdrop-blur-xl rounded-[20px] p-6 w-full max-w-md border border-stroke-subtle dark:border-white/10",onClick:e[3]||(e[3]=g(()=>{},["stop"]))},[t("div",f,[t("h3",w,s(o.title),1),t("button",{onClick:e[0]||(e[0]=a=>r("close")),class:"text-content-secondary dark:text-content-muted hover:text-content-primary dark:hover:text-content-primary transition-colors"},e[4]||(e[4]=[t("svg",{class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M6 18L18 6M6 6l12 12"})],-1)]))]),t("div",v,[t("div",{class:d(["inline-flex p-3 rounded-xl mb-4",k[o.variant]])},[o.variant==="danger"?(l(),n("svg",h,e[5]||(e[5]=[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z"},null,-1)]))):o.variant==="warning"?(l(),n("svg",y,e[6]||(e[6]=[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z"},null,-1)]))):(l(),n("svg",C,e[7]||(e[7]=[t("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"},null,-1)])))],2),t("p",B,s(o.message),1)]),t("div",j,[t("button",{onClick:e[1]||(e[1]=a=>r("close")),class:"flex-1 px-4 py-3 rounded-xl bg-background-mute dark:bg-white/5 hover:bg-stroke-subtle dark:hover:bg-white/10 text-content-primary dark:text-content-primary transition-all duration-200 border border-stroke-subtle dark:border-stroke/10"},s(o.cancelText),1),t("button",{onClick:e[2]||(e[2]=a=>r("confirm")),class:d(["flex-1 px-4 py-3 rounded-xl text-white transition-all duration-200",x[o.variant]])},s(o.confirmText),3)])])])):m("",!0)}});export{_};

View File

@@ -1 +1 @@
import{a as e,b as r,i as o,p as n}from"./index-DPDXVCFJ.js";const d=e({name:"HelpView",__name:"Help",setup(a){return(i,t)=>(n(),r("div",null,t[0]||(t[0]=[o('<div class="glass-card backdrop-blur border border-stroke-subtle dark:border-white/10 rounded-[15px] p-8"><h1 class="text-content-primary dark:text-content-primary text-2xl font-semibold mb-6">Help &amp; Documentation</h1><div class="text-center py-12"><div class="text-primary mb-6"><svg class="w-20 h-20 mx-auto mb-4" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 6.253v13m0-13C10.832 5.477 9.246 5 7.5 5S4.168 5.477 3 6.253v13C4.168 18.477 5.754 18 7.5 18s3.332.477 4.5 1.253m0-13C13.168 5.477 14.754 5 16.5 5c1.746 0 3.332.477 4.5 1.253v13C19.832 18.477 18.246 18 16.5 18c-1.746 0-3.332.477-4.5 1.253"></path></svg></div><h2 class="text-content-primary dark:text-content-primary text-xl font-medium mb-3">pyMC Repeater Wiki</h2><p class="text-content-secondary dark:text-content-muted mb-8 max-w-md mx-auto"> Access documentation, setup guides, troubleshooting tips, and community resources on our official wiki. </p><a href="https://github.com/rightup/pyMC_Repeater/wiki" target="_blank" rel="noopener noreferrer" class="inline-flex items-center gap-2 bg-primary hover:bg-primary/80 text-white dark:text-background font-medium py-3 px-6 rounded-xl transition-colors duration-200"><svg class="w-5 h-5" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10 6H6a2 2 0 00-2 2v10a2 2 0 002 2h10a2 2 0 002-2v-4M14 4h6m0 0v6m0-6L10 14"></path></svg> Visit Wiki Documentation </a><div class="mt-8 text-xs text-content-muted dark:text-content-muted"> Opens in a new tab </div></div></div>',1)])))}});export{d as default};
import{a as e,b as r,i as o,p as n}from"./index-oQ7o_f66.js";const d=e({name:"HelpView",__name:"Help",setup(a){return(i,t)=>(n(),r("div",null,t[0]||(t[0]=[o('<div class="glass-card backdrop-blur border border-stroke-subtle dark:border-white/10 rounded-[15px] p-8"><h1 class="text-content-primary dark:text-content-primary text-2xl font-semibold mb-6">Help &amp; Documentation</h1><div class="text-center py-12"><div class="text-primary mb-6"><svg class="w-20 h-20 mx-auto mb-4" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 6.253v13m0-13C10.832 5.477 9.246 5 7.5 5S4.168 5.477 3 6.253v13C4.168 18.477 5.754 18 7.5 18s3.332.477 4.5 1.253m0-13C13.168 5.477 14.754 5 16.5 5c1.746 0 3.332.477 4.5 1.253v13C19.832 18.477 18.246 18 16.5 18c-1.746 0-3.332.477-4.5 1.253"></path></svg></div><h2 class="text-content-primary dark:text-content-primary text-xl font-medium mb-3">pyMC Repeater Wiki</h2><p class="text-content-secondary dark:text-content-muted mb-8 max-w-md mx-auto"> Access documentation, setup guides, troubleshooting tips, and community resources on our official wiki. </p><a href="https://github.com/rightup/pyMC_Repeater/wiki" target="_blank" rel="noopener noreferrer" class="inline-flex items-center gap-2 bg-primary hover:bg-primary/80 text-white dark:text-background font-medium py-3 px-6 rounded-xl transition-colors duration-200"><svg class="w-5 h-5" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10 6H6a2 2 0 00-2 2v10a2 2 0 002 2h10a2 2 0 002-2v-4M14 4h6m0 0v6m0-6L10 14"></path></svg> Visit Wiki Documentation </a><div class="mt-8 text-xs text-content-muted dark:text-content-muted"> Opens in a new tab </div></div></div>',1)])))}});export{d as default};

File diff suppressed because one or more lines are too long

View File

@@ -1 +1 @@
import{a as k,b as o,g,e as r,j as a,t as p,s as x,p as s}from"./index-DPDXVCFJ.js";const f={class:"mb-6"},m={key:0,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},v={key:1,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},h={key:2,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},w={class:"text-content-secondary dark:text-content-primary/80 text-base leading-relaxed"},C={class:"flex"},B=k({__name:"MessageDialog",props:{show:{type:Boolean},message:{},variant:{default:"success"}},emits:["close"],setup(i,{emit:d}){const t=i,l=d,c=n=>{n.target===n.currentTarget&&l("close")},b={success:"bg-green-100 dark:bg-green-500/20 border-green-600/40 dark:border-green-500/30 text-green-600 dark:text-green-400",error:"bg-red-100 dark:bg-red-500/20 border-red-500/30 text-red-600 dark:text-red-400",info:"bg-blue-500/20 border-blue-500/30 text-blue-600 dark:text-blue-400"},u={success:"bg-green-500 hover:bg-green-600",error:"bg-red-500 hover:bg-red-600",info:"bg-blue-500 hover:bg-blue-600"};return(n,e)=>t.show?(s(),o("div",{key:0,onClick:c,class:"fixed inset-0 bg-black/40 backdrop-blur-lg z-[99999] flex items-center justify-center p-4",style:{"backdrop-filter":"blur(8px) saturate(180%)",position:"fixed",top:"0",left:"0",right:"0",bottom:"0"}},[r("div",{class:"bg-white dark:bg-surface-elevated backdrop-blur-xl rounded-[20px] p-6 w-full max-w-md border border-stroke-subtle dark:border-white/10",onClick:e[1]||(e[1]=x(()=>{},["stop"]))},[r("div",f,[r("div",{class:a(["inline-flex p-3 rounded-xl mb-4",b[t.variant]])},[t.variant==="success"?(s(),o("svg",m,e[2]||(e[2]=[r("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M5 13l4 4L19 7"},null,-1)]))):t.variant==="error"?(s(),o("svg",v,e[3]||(e[3]=[r("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M6 18L18 6M6 6l12 12"},null,-1)]))):(s(),o("svg",h,e[4]||(e[4]=[r("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"},null,-1)])))],2),r("p",w,p(t.message),1)]),r("div",C,[r("button",{onClick:e[0]||(e[0]=y=>l("close")),class:a(["flex-1 px-4 py-3 rounded-xl text-white transition-all duration-200",u[t.variant]])}," OK ",2)])])])):g("",!0)}});export{B as _};
import{a as k,b as o,g,e as r,j as a,t as p,s as x,p as s}from"./index-oQ7o_f66.js";const f={class:"mb-6"},m={key:0,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},v={key:1,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},h={key:2,class:"w-6 h-6",fill:"none",stroke:"currentColor",viewBox:"0 0 24 24"},w={class:"text-content-secondary dark:text-content-primary/80 text-base leading-relaxed"},C={class:"flex"},B=k({__name:"MessageDialog",props:{show:{type:Boolean},message:{},variant:{default:"success"}},emits:["close"],setup(i,{emit:d}){const t=i,l=d,c=n=>{n.target===n.currentTarget&&l("close")},b={success:"bg-green-100 dark:bg-green-500/20 border-green-600/40 dark:border-green-500/30 text-green-600 dark:text-green-400",error:"bg-red-100 dark:bg-red-500/20 border-red-500/30 text-red-600 dark:text-red-400",info:"bg-blue-500/20 border-blue-500/30 text-blue-600 dark:text-blue-400"},u={success:"bg-green-500 hover:bg-green-600",error:"bg-red-500 hover:bg-red-600",info:"bg-blue-500 hover:bg-blue-600"};return(n,e)=>t.show?(s(),o("div",{key:0,onClick:c,class:"fixed inset-0 bg-black/40 backdrop-blur-lg z-[99999] flex items-center justify-center p-4",style:{"backdrop-filter":"blur(8px) saturate(180%)",position:"fixed",top:"0",left:"0",right:"0",bottom:"0"}},[r("div",{class:"bg-white dark:bg-surface-elevated backdrop-blur-xl rounded-[20px] p-6 w-full max-w-md border border-stroke-subtle dark:border-white/10",onClick:e[1]||(e[1]=x(()=>{},["stop"]))},[r("div",f,[r("div",{class:a(["inline-flex p-3 rounded-xl mb-4",b[t.variant]])},[t.variant==="success"?(s(),o("svg",m,e[2]||(e[2]=[r("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M5 13l4 4L19 7"},null,-1)]))):t.variant==="error"?(s(),o("svg",v,e[3]||(e[3]=[r("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M6 18L18 6M6 6l12 12"},null,-1)]))):(s(),o("svg",h,e[4]||(e[4]=[r("path",{"stroke-linecap":"round","stroke-linejoin":"round","stroke-width":"2",d:"M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"},null,-1)])))],2),r("p",w,p(t.message),1)]),r("div",C,[r("button",{onClick:e[0]||(e[0]=y=>l("close")),class:a(["flex-1 px-4 py-3 rounded-xl text-white transition-all duration-200",u[t.variant]])}," OK ",2)])])])):g("",!0)}});export{B as _};

View File

@@ -1,4 +1,4 @@
import{L as J,a as zl,r as ut,o as Hl,$ as Ul,P as Rn,D as ql,b as tt,e as Z,g as Yt,t as Is,w as Kl,v as Vl,X as Ji,j as Tn,s as jl,p as it,x as Gl}from"./index-DPDXVCFJ.js";/**
import{L as J,a as zl,r as ut,o as Hl,$ as Ul,P as Rn,D as ql,b as tt,e as Z,g as Yt,t as Is,w as Kl,v as Vl,X as Ji,j as Tn,s as jl,p as it,x as Gl}from"./index-oQ7o_f66.js";/**
* Copyright (c) 2014-2024 The xterm.js authors. All rights reserved.
* @license MIT
*

View File

@@ -1 +1 @@
import{M as x,c as s}from"./index-DPDXVCFJ.js";const l={7:-7.5,8:-10,9:-12.5,10:-15,11:-17.5,12:-20},d=-116,i=8,u=5;function y(t,e){return t-e}function S(t){return l[t]??l[i]}function f(t,e){const r=e+u;if(t<=e){const o=t<=e-5?0:1;return{bars:o,color:"text-red-600 dark:text-red-400",snr:t,quality:o===0?"none":"poor"}}if(t<r){const n=(t-e)/u<.5?2:3;return{bars:n,color:n===2?"text-orange-600 dark:text-orange-400":"text-yellow-600 dark:text-yellow-400",snr:t,quality:"fair"}}const a=t-r>=10?5:4;return{bars:a,color:a===5?"text-green-600 dark:text-green-400":"text-green-600 dark:text-green-300",snr:t,quality:a===5?"excellent":"good"}}function N(){const t=x(),e=s(()=>t.noiseFloorDbm??d),r=s(()=>t.stats?.config?.radio?.spreading_factor??i),c=s(()=>S(r.value));return{getSignalQuality:o=>{if(!o||o>0||o<-120)return{bars:0,color:"text-gray-400 dark:text-gray-500",snr:-999,quality:"none"};const n=y(o,e.value),g=Math.max(-30,Math.min(20,n));return f(g,c.value)},noiseFloor:e,spreadingFactor:r,minSNR:c}}export{N as u};
import{M as x,c as s}from"./index-oQ7o_f66.js";const l={7:-7.5,8:-10,9:-12.5,10:-15,11:-17.5,12:-20},d=-116,i=8,u=5;function y(t,e){return t-e}function S(t){return l[t]??l[i]}function f(t,e){const r=e+u;if(t<=e){const o=t<=e-5?0:1;return{bars:o,color:"text-red-600 dark:text-red-400",snr:t,quality:o===0?"none":"poor"}}if(t<r){const n=(t-e)/u<.5?2:3;return{bars:n,color:n===2?"text-orange-600 dark:text-orange-400":"text-yellow-600 dark:text-yellow-400",snr:t,quality:"fair"}}const a=t-r>=10?5:4;return{bars:a,color:a===5?"text-green-600 dark:text-green-400":"text-green-600 dark:text-green-300",snr:t,quality:a===5?"excellent":"good"}}function N(){const t=x(),e=s(()=>t.noiseFloorDbm??d),r=s(()=>t.stats?.config?.radio?.spreading_factor??i),c=s(()=>S(r.value));return{getSignalQuality:o=>{if(!o||o>0||o<-120)return{bars:0,color:"text-gray-400 dark:text-gray-500",snr:-999,quality:"none"};const n=y(o,e.value),g=Math.max(-30,Math.min(20,n));return f(g,c.value)},noiseFloor:e,spreadingFactor:r,minSNR:c}}export{N as u};

View File

@@ -8,7 +8,7 @@
<link rel="preconnect" href="https://fonts.googleapis.com">
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin>
<link href="https://fonts.googleapis.com/css2?family=Noto+Sans:wght@400;500;600;700&display=swap" rel="stylesheet">
<script type="module" crossorigin src="/assets/index-DPDXVCFJ.js"></script>
<script type="module" crossorigin src="/assets/index-oQ7o_f66.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-CZAQFiLW.css">
</head>
<body>

View File

@@ -1,5 +1,5 @@
"""
Comprehensive tests for pyMC_Repeater engine.py — RepeaterHandler.
tests for pyMC_Repeater engine.py — RepeaterHandler.
Covers: flood_forward, direct_forward, process_packet, duplicate detection,
mark_seen, validate_packet, packet scoring, TX delay, cache management,

View File

@@ -0,0 +1,720 @@
"""
Tests for flood packet loop detection and duplicate suppression.
Exercises the real RepeaterHandler engine with real pymc_core Packet/PathUtils
objects to verify:
- Duplicate packet suppression via calculate_packet_hash (SHA256-based)
- Loop detection modes (off, minimal, moderate, strict) with real path bytes
- Flood re-forwarding prevention (own hash already in path)
- Multi-byte hash mode interaction with loop/dedup
- Global flood policy enforcement
- mark_seen / is_duplicate cache behaviour
- do_not_retransmit flag handling
"""
from unittest.mock import MagicMock, patch
import pytest
from pymc_core.protocol import Packet, PathUtils
from pymc_core.protocol.constants import (
MAX_PATH_SIZE,
ROUTE_TYPE_FLOOD,
ROUTE_TYPE_TRANSPORT_FLOOD,
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
LOCAL_HASH_BYTES = bytes([0xAB, 0xCD, 0xEF])
def _make_flood_packet(
path_bytes: bytes = b"",
hash_size: int = 1,
hash_count: int = 0,
payload: bytes = b"\x01\x02\x03\x04",
) -> Packet:
"""Create a real flood Packet with the given path encoding."""
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.path = bytearray(path_bytes)
pkt.path_len = PathUtils.encode_path_len(hash_size, hash_count)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
return pkt
def _make_handler(
loop_detect="off",
path_hash_mode=0,
local_hash_bytes=None,
global_flood_allow=True,
):
"""Create a RepeaterHandler with real engine logic, mocking only hardware."""
lhb = local_hash_bytes or LOCAL_HASH_BYTES
config = {
"repeater": {
"mode": "forward",
"cache_ttl": 3600,
"use_score_for_tx": False,
"score_threshold": 0.3,
"send_advert_interval_hours": 0,
"node_name": "test-node",
},
"mesh": {
"global_flood_allow": global_flood_allow,
"loop_detect": loop_detect,
"path_hash_mode": path_hash_mode,
},
"delays": {"tx_delay_factor": 1.0, "direct_tx_delay_factor": 0.5},
"duty_cycle": {"max_airtime_per_minute": 3600, "enforcement_enabled": True},
"radio": {
"spreading_factor": 8,
"bandwidth": 125000,
"coding_rate": 8,
"preamble_length": 17,
},
}
dispatcher = MagicMock()
dispatcher.radio = MagicMock(
spreading_factor=8,
bandwidth=125000,
coding_rate=8,
preamble_length=17,
frequency=915000000,
tx_power=14,
)
dispatcher.local_identity = MagicMock()
with (
patch("repeater.engine.StorageCollector"),
patch("repeater.engine.RepeaterHandler._start_background_tasks"),
):
from repeater.engine import RepeaterHandler
h = RepeaterHandler(config, dispatcher, lhb[0], local_hash_bytes=lhb)
return h
# ===================================================================
# 1. Duplicate suppression — real packet hash (SHA256)
# ===================================================================
class TestDuplicateSuppression:
"""Verify duplicate packets are detected via real calculate_packet_hash."""
def test_same_packet_forwarded_twice_is_duplicate(self):
"""Forwarding the same packet a second time must be rejected as duplicate."""
h = _make_handler()
pkt1 = _make_flood_packet(payload=b"\xDE\xAD")
result1 = h.flood_forward(pkt1)
assert result1 is not None
# Same content in a fresh Packet object
pkt2 = _make_flood_packet(payload=b"\xDE\xAD")
result2 = h.flood_forward(pkt2)
assert result2 is None
assert pkt2.drop_reason == "Duplicate"
def test_different_payload_not_duplicate(self):
"""Packets with different payloads have different hashes."""
h = _make_handler()
pkt1 = _make_flood_packet(payload=b"\x01\x02")
assert h.flood_forward(pkt1) is not None
pkt2 = _make_flood_packet(payload=b"\x03\x04")
assert h.flood_forward(pkt2) is not None
def test_mark_seen_makes_is_duplicate_true(self):
"""mark_seen records the hash; is_duplicate finds it."""
h = _make_handler()
pkt = _make_flood_packet(payload=b"\xAA\xBB")
assert not h.is_duplicate(pkt)
h.mark_seen(pkt)
assert h.is_duplicate(pkt)
def test_packet_hash_uses_real_sha256(self):
"""Verify the hash comes from real Packet.calculate_packet_hash."""
pkt = _make_flood_packet(payload=b"\x01\x02\x03")
hash_bytes = pkt.calculate_packet_hash()
assert isinstance(hash_bytes, bytes)
assert len(hash_bytes) > 0
# Same content → same hash
pkt2 = _make_flood_packet(payload=b"\x01\x02\x03")
assert pkt2.calculate_packet_hash() == hash_bytes
def test_different_path_same_payload_same_hash(self):
"""
Packet hash is based on payload_type + payload (not path),
except for TRACE packets. Two flood packets with different paths
but same payload have the same hash.
"""
pkt_a = _make_flood_packet(path_bytes=b"\x11", hash_size=1, hash_count=1,
payload=b"\xFF")
pkt_b = _make_flood_packet(path_bytes=b"\x22", hash_size=1, hash_count=1,
payload=b"\xFF")
assert pkt_a.calculate_packet_hash() == pkt_b.calculate_packet_hash()
def test_seen_cache_eviction(self):
"""When cache exceeds max_cache_size, oldest entries are evicted."""
h = _make_handler()
h.max_cache_size = 3
packets = [_make_flood_packet(payload=bytes([i, i + 1])) for i in range(5)]
for p in packets:
h.mark_seen(p)
# Oldest entries (0, 1) should have been evicted
assert not h.is_duplicate(packets[0])
assert not h.is_duplicate(packets[1])
# Recent entries still present
assert h.is_duplicate(packets[2])
assert h.is_duplicate(packets[3])
assert h.is_duplicate(packets[4])
# ===================================================================
# 2. Loop detection modes — 1-byte hash
# ===================================================================
class TestLoopDetection1Byte:
"""Loop detection with 1-byte hash paths (mode 0)."""
def test_loop_detect_off_allows_own_hash(self):
"""With loop_detect=off, packet with our hash in path is forwarded."""
h = _make_handler(loop_detect="off",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path contains our 1-byte hash (0xAB) once
pkt = _make_flood_packet(b"\xAB", hash_size=1, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
def test_loop_detect_strict_blocks_single_occurrence(self):
"""strict mode (threshold=1): one occurrence of our hash → loop."""
h = _make_handler(loop_detect="strict",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\xAB", hash_size=1, hash_count=1)
result = h.flood_forward(pkt)
assert result is None
assert "loop" in pkt.drop_reason.lower()
def test_loop_detect_moderate_allows_one_occurrence(self):
"""moderate mode (threshold=2): one occurrence is fine."""
h = _make_handler(loop_detect="moderate",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\xAB", hash_size=1, hash_count=2)
result = h.flood_forward(pkt)
assert result is not None
def test_loop_detect_moderate_blocks_two_occurrences(self):
"""moderate mode (threshold=2): two occurrences → loop."""
h = _make_handler(loop_detect="moderate",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\xAB\x11\xAB", hash_size=1, hash_count=3)
result = h.flood_forward(pkt)
assert result is None
assert "loop" in pkt.drop_reason.lower()
def test_loop_detect_minimal_allows_three_occurrences(self):
"""minimal mode (threshold=4): three occurrences still OK."""
h = _make_handler(loop_detect="minimal",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\xAB\x11\xAB\x22\xAB", hash_size=1, hash_count=5)
result = h.flood_forward(pkt)
assert result is not None
def test_loop_detect_minimal_blocks_four_occurrences(self):
"""minimal mode (threshold=4): four occurrences → loop."""
h = _make_handler(loop_detect="minimal",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(
b"\xAB\x11\xAB\x22\xAB\x33\xAB", hash_size=1, hash_count=7
)
result = h.flood_forward(pkt)
assert result is None
assert "loop" in pkt.drop_reason.lower()
def test_loop_detect_no_match_passes(self):
"""Strict mode still passes if our hash is not in the path."""
h = _make_handler(loop_detect="strict",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22\x33", hash_size=1, hash_count=3)
result = h.flood_forward(pkt)
assert result is not None
# ===================================================================
# 3. Own-hash re-forwarding prevention
# ===================================================================
class TestOwnHashReForwarding:
"""
After a repeater flood_forwards a packet (appending its own hash),
receiving that same packet back should be handled correctly.
"""
def test_forward_then_receive_again_is_duplicate(self):
"""
After forwarding, the packet hash is in seen_packets.
Receiving an identical packet is a duplicate.
"""
h = _make_handler(loop_detect="off")
pkt = _make_flood_packet(b"\x11", hash_size=1, hash_count=1,
payload=b"\xAA\xBB")
result = h.flood_forward(pkt)
assert result is not None
# The original packet's payload hash was marked seen
# Receiving same original packet again (before our hop was appended)
pkt2 = _make_flood_packet(b"\x11", hash_size=1, hash_count=1,
payload=b"\xAA\xBB")
result2 = h.flood_forward(pkt2)
assert result2 is None
assert pkt2.drop_reason == "Duplicate"
def test_strict_detects_own_hash_after_flood_chain(self):
"""
If the forwarded packet (with our hash appended) loops back to us,
strict mode detects our hash in the path.
"""
our_hash = bytes([0xAB, 0xCD, 0xEF])
h = _make_handler(loop_detect="strict", local_hash_bytes=our_hash)
# Original packet arrives, we forward (appending 0xAB)
pkt = _make_flood_packet(b"\x11", hash_size=1, hash_count=1,
payload=b"\xDD\xEE")
result = h.flood_forward(pkt)
assert result is not None
# Now path is [0x11, 0xAB], and this exact payload is in seen_packets
# Suppose another node re-forwards it with an additional hop,
# so it's a new payload in the packet hash sense (different path iteration)
# but path contains our hash 0xAB
looped_pkt = _make_flood_packet(
b"\x11\xAB\x22", hash_size=1, hash_count=3,
payload=b"\xDD\xEE\xFF" # different payload → not a duplicate
)
result2 = h.flood_forward(looped_pkt)
assert result2 is None
assert "loop" in looped_pkt.drop_reason.lower()
def test_off_mode_still_catches_duplicate_after_own_forward(self):
"""Even with loop_detect=off, duplicate suppression still works."""
h = _make_handler(loop_detect="off")
pkt = _make_flood_packet(payload=b"\x42\x43")
assert h.flood_forward(pkt) is not None
pkt2 = _make_flood_packet(payload=b"\x42\x43")
result = h.flood_forward(pkt2)
assert result is None
assert pkt2.drop_reason == "Duplicate"
# ===================================================================
# 4. Multi-byte hash + loop detection
# ===================================================================
class TestLoopDetectionMultiByte:
"""
Loop detection currently counts byte-level matches against local_hash
(single int). In multi-byte mode the per-hop hash is >1 byte, so
individual bytes in the path may coincidentally match.
These tests verify the actual engine behaviour.
"""
def test_2_byte_mode_strict_byte_level_match(self):
"""
In 2-byte mode with strict, _is_flood_looped scans individual bytes.
If local_hash (0xAB) appears as a byte anywhere in the 2-byte path
entries, it counts as a match.
"""
h = _make_handler(loop_detect="strict",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path: 2-byte hop [0xAB, 0x11] — byte 0xAB appears once
pkt = _make_flood_packet(b"\xAB\x11", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
# strict threshold=1, 0xAB appears once in raw bytes → loop detected
assert result is None
assert "loop" in pkt.drop_reason.lower()
def test_2_byte_mode_off_ignores_byte_match(self):
"""With loop_detect=off, even byte-level 0xAB matches are ignored."""
h = _make_handler(loop_detect="off",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\xAB\x11", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
def test_2_byte_no_local_hash_byte_passes_strict(self):
"""If local_hash byte doesn't appear anywhere in the 2-byte path, strict passes."""
h = _make_handler(loop_detect="strict",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path: [0x11, 0x22] — no 0xAB byte
pkt = _make_flood_packet(b"\x11\x22", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
def test_3_byte_mode_local_hash_byte_in_path(self):
"""In 3-byte mode, the 0xAB byte anywhere triggers strict loop detection."""
h = _make_handler(loop_detect="strict",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# 3-byte hop: [0x11, 0xAB, 0x33] — 0xAB in the middle
pkt = _make_flood_packet(b"\x11\xAB\x33", hash_size=3, hash_count=1)
result = h.flood_forward(pkt)
assert result is None
def test_moderate_multi_byte_counts_all_byte_occurrences(self):
"""
moderate threshold=2. With 2-byte hops, each byte is counted
independently, so two occurrences of 0xAB across different hops
triggers the loop.
"""
h = _make_handler(loop_detect="moderate",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Two 2-byte hops: [0xAB, 0x11, 0xAB, 0x22] — 0xAB appears twice
pkt = _make_flood_packet(b"\xAB\x11\xAB\x22", hash_size=2, hash_count=2)
result = h.flood_forward(pkt)
assert result is None
assert "loop" in pkt.drop_reason.lower()
def test_2_byte_flood_forward_appends_correctly(self):
"""
After flood_forward in 2-byte mode, verify the path contains
only the expected bytes (no extra, no corruption).
"""
h = _make_handler(loop_detect="off",
local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 2
hashes = result.get_path_hashes()
assert hashes == [b"\x11\x22", b"\xAB\xCD"]
# ===================================================================
# 5. Global flood policy
# ===================================================================
class TestGlobalFloodPolicy:
"""Test global_flood_allow=False blocks flood packets."""
def test_global_flood_disabled_drops_flood(self):
h = _make_handler(global_flood_allow=False)
pkt = _make_flood_packet(payload=b"\x01\x02")
result = h.flood_forward(pkt)
assert result is None
assert pkt.drop_reason is not None
def test_global_flood_enabled_allows_flood(self):
h = _make_handler(global_flood_allow=True)
pkt = _make_flood_packet(payload=b"\x01\x02")
result = h.flood_forward(pkt)
assert result is not None
def test_transport_flood_without_codes_drops(self):
"""ROUTE_TYPE_TRANSPORT_FLOOD with global_flood_allow=False and no valid codes."""
h = _make_handler(global_flood_allow=False)
# Nullify the storage to ensure transport code check fails
h.storage = None
pkt = Packet()
pkt.header = ROUTE_TYPE_TRANSPORT_FLOOD
pkt.path = bytearray()
pkt.path_len = PathUtils.encode_path_len(1, 0)
pkt.payload = bytearray(b"\x01\x02")
pkt.payload_len = 2
pkt.transport_codes = [0x1234, 0x5678]
result = h.flood_forward(pkt)
assert result is None
# ===================================================================
# 6. do_not_retransmit flag
# ===================================================================
class TestDoNotRetransmit:
"""Verify packets flagged do_not_retransmit are dropped."""
def test_flagged_packet_dropped(self):
h = _make_handler()
pkt = _make_flood_packet(payload=b"\x01\x02")
pkt.mark_do_not_retransmit()
result = h.flood_forward(pkt)
assert result is None
assert "retransmit" in pkt.drop_reason.lower()
def test_unflagged_packet_passes(self):
h = _make_handler()
pkt = _make_flood_packet(payload=b"\x01\x02")
assert not pkt.is_marked_do_not_retransmit()
result = h.flood_forward(pkt)
assert result is not None
# ===================================================================
# 7. validate_packet edge cases
# ===================================================================
class TestValidatePacket:
"""Test packet validation rules used by flood_forward."""
def test_empty_payload_rejected(self):
h = _make_handler()
pkt = _make_flood_packet(payload=b"")
pkt.payload = bytearray()
result = h.flood_forward(pkt)
assert result is None
assert "Empty payload" in (pkt.drop_reason or "")
def test_max_path_size_rejected(self):
"""Path at MAX_PATH_SIZE (64 bytes) is rejected before forwarding."""
h = _make_handler()
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.path = bytearray(range(64))
# Manually set path_len to bypass encode_path_len validation
pkt.path_len = PathUtils.encode_path_len(1, 63)
pkt.payload = bytearray(b"\x01\x02")
pkt.payload_len = 2
# validate_packet checks len(path) >= MAX_PATH_SIZE
result = h.flood_forward(pkt)
assert result is None
def test_none_payload_rejected(self):
h = _make_handler()
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.payload = None
result = h.flood_forward(pkt)
assert result is None
def test_hop_count_63_rejected(self):
"""At 63 hops (1-byte mode), appending would overflow 6-bit counter."""
h = _make_handler()
# 63 bytes of path with hash_count=63
pkt = _make_flood_packet(bytes(63), hash_size=1, hash_count=63)
result = h.flood_forward(pkt)
assert result is None
assert "maximum" in (pkt.drop_reason or "").lower() or \
"exceed" in (pkt.drop_reason or "").lower()
# ===================================================================
# 8. Serialization round-trip after dedup/loop decisions
# ===================================================================
class TestSerializationAfterForward:
"""
Verify packets that survive loop/dedup checks can serialize
and deserialize correctly, preserving the appended path.
"""
def test_forwarded_1_byte_round_trips(self):
h = _make_handler(loop_detect="moderate",
local_hash_bytes=bytes([0x42, 0x00, 0x00]))
pkt = _make_flood_packet(b"\x11\x22", hash_size=1, hash_count=2,
payload=b"\xAA\xBB")
result = h.flood_forward(pkt)
assert result is not None
wire = result.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_count() == 3
assert pkt2.get_path_hashes() == [b"\x11", b"\x22", b"\x42"]
assert pkt2.get_payload() == b"\xAA\xBB"
def test_forwarded_2_byte_round_trips(self):
h = _make_handler(loop_detect="off",
local_hash_bytes=bytes([0xAA, 0xBB, 0xCC]))
pkt = _make_flood_packet(b"\x11\x22", hash_size=2, hash_count=1,
payload=b"\xDE\xAD")
result = h.flood_forward(pkt)
assert result is not None
wire = result.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 2
assert pkt2.get_path_hash_count() == 2
assert pkt2.get_path_hashes() == [b"\x11\x22", b"\xAA\xBB"]
def test_forwarded_3_byte_round_trips(self):
h = _make_handler(loop_detect="off",
local_hash_bytes=bytes([0xAA, 0xBB, 0xCC]))
pkt = _make_flood_packet(b"\x11\x22\x33", hash_size=3, hash_count=1,
payload=b"\xBE\xEF")
result = h.flood_forward(pkt)
assert result is not None
wire = result.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 3
assert pkt2.get_path_hash_count() == 2
assert pkt2.get_path_hashes() == [b"\x11\x22\x33", b"\xAA\xBB\xCC"]
# ===================================================================
# 9. Multi-repeater flood chain with loop detection
# ===================================================================
class TestFloodChainLoopDetection:
"""
Simulate a flood packet traversing multiple repeaters and verify
loop detection works across the chain.
"""
def test_three_repeater_chain_no_loop(self):
"""A→B→C with distinct hashes: no loop at any step (strict mode)."""
hashes = [
bytes([0x11, 0x00, 0x00]),
bytes([0x22, 0x00, 0x00]),
bytes([0x33, 0x00, 0x00]),
]
handlers = [_make_handler(loop_detect="strict", local_hash_bytes=h) for h in hashes]
pkt = _make_flood_packet(payload=b"\xFE\xED")
for i, h in enumerate(handlers):
result = h.flood_forward(pkt)
assert result is not None, f"repeater {i} unexpectedly dropped packet"
# Use different payload to avoid dedup between handlers
# (In real life they'd be on different nodes)
pkt = _make_flood_packet(
path_bytes=bytes(result.path),
hash_size=1,
hash_count=result.get_path_hash_count(),
payload=b"\xFE\xED" + bytes([i + 1]),
)
assert pkt.get_path_hash_count() == 3
def test_circular_topology_strict_blocks_loop(self):
"""
A→B→C→A: when the packet returns to A, strict mode detects
A's hash (0x11) already in the path.
"""
hash_a = bytes([0x11, 0x00, 0x00])
hash_b = bytes([0x22, 0x00, 0x00])
hash_c = bytes([0x33, 0x00, 0x00])
h_a = _make_handler(loop_detect="strict", local_hash_bytes=hash_a)
h_b = _make_handler(loop_detect="strict", local_hash_bytes=hash_b)
h_c = _make_handler(loop_detect="strict", local_hash_bytes=hash_c)
# A originates
pkt = _make_flood_packet(payload=b"\x01\x02\x03")
pkt = h_a.flood_forward(pkt)
assert pkt is not None # path: [0x11]
# B forwards (new payload to avoid dedup)
pkt_b = _make_flood_packet(
bytes(pkt.path), hash_size=1,
hash_count=pkt.get_path_hash_count(),
payload=b"\x01\x02\x03\x04",
)
pkt_b = h_b.flood_forward(pkt_b)
assert pkt_b is not None # path: [0x11, 0x22]
# C forwards
pkt_c = _make_flood_packet(
bytes(pkt_b.path), hash_size=1,
hash_count=pkt_b.get_path_hash_count(),
payload=b"\x01\x02\x03\x04\x05",
)
pkt_c = h_c.flood_forward(pkt_c)
assert pkt_c is not None # path: [0x11, 0x22, 0x33]
# Back to A — 0x11 is already in path → strict blocks it
pkt_a2 = _make_flood_packet(
bytes(pkt_c.path), hash_size=1,
hash_count=pkt_c.get_path_hash_count(),
payload=b"\x01\x02\x03\x04\x05\x06",
)
result = h_a.flood_forward(pkt_a2)
assert result is None
assert "loop" in pkt_a2.drop_reason.lower()
def test_circular_topology_off_allows_loop_but_dedup_catches(self):
"""
With loop_detect=off and the exact same payload, duplicate
suppression catches the re-visit even without loop detection.
"""
h = _make_handler(loop_detect="off", local_hash_bytes=bytes([0x11, 0x00, 0x00]))
pkt = _make_flood_packet(payload=b"\xAA\xBB")
assert h.flood_forward(pkt) is not None
# Same payload comes back
pkt2 = _make_flood_packet(payload=b"\xAA\xBB")
result = h.flood_forward(pkt2)
assert result is None
assert pkt2.drop_reason == "Duplicate"
def test_two_byte_chain_loop_detected(self):
"""2-byte mode: circular path A→B→A detected via byte-level scan."""
hash_a = bytes([0xAA, 0xBB, 0x00])
hash_b = bytes([0xCC, 0xDD, 0x00])
h_a = _make_handler(loop_detect="strict", local_hash_bytes=hash_a)
h_b = _make_handler(loop_detect="strict", local_hash_bytes=hash_b)
# A forwards (2-byte mode)
pkt = _make_flood_packet(b"", hash_size=2, hash_count=0, payload=b"\x01\x02")
pkt = h_a.flood_forward(pkt)
assert pkt is not None # path: [0xAA, 0xBB]
# B forwards
pkt_b = _make_flood_packet(
bytes(pkt.path), hash_size=2,
hash_count=pkt.get_path_hash_count(),
payload=b"\x01\x02\x03",
)
pkt_b = h_b.flood_forward(pkt_b)
assert pkt_b is not None # path: [0xAA, 0xBB, 0xCC, 0xDD]
# Back to A — byte 0xAA is in path → strict detects it
pkt_a2 = _make_flood_packet(
bytes(pkt_b.path), hash_size=2,
hash_count=pkt_b.get_path_hash_count(),
payload=b"\x01\x02\x03\x04",
)
result = h_a.flood_forward(pkt_a2)
assert result is None
assert "loop" in pkt_a2.drop_reason.lower()
# ===================================================================
# 10. Normalize loop_detect_mode edge cases
# ===================================================================
class TestNormalizeLoopDetectMode:
"""Verify _normalize_loop_detect_mode handles edge cases."""
def test_uppercase_normalized(self):
h = _make_handler(loop_detect="STRICT")
assert h.loop_detect_mode == "strict"
def test_whitespace_stripped(self):
h = _make_handler(loop_detect=" moderate ")
assert h.loop_detect_mode == "moderate"
def test_invalid_defaults_to_off(self):
h = _make_handler(loop_detect="invalid_value")
assert h.loop_detect_mode == "off"
def test_numeric_defaults_to_off(self):
h = _make_handler(loop_detect=42)
assert h.loop_detect_mode == "off"
def test_none_defaults_to_off(self):
h = _make_handler(loop_detect=None)
assert h.loop_detect_mode == "off"

View File

@@ -0,0 +1,780 @@
"""
Integration tests for multi-byte path hash support using real pymc_core protocol objects.
Exercises actual Packet, PathUtils, PacketBuilder, and engine forwarding
rather than mocking the protocol layer. Covers:
- PathUtils encode/decode round-trips for all hash sizes
- Packet serialization/deserialization with multi-byte paths
- Packet.apply_path_hash_mode and get_path_hashes
- Engine flood_forward with real multi-byte encoded packets
- Engine direct_forward with real multi-byte encoded packets
- PacketBuilder.create_trace payload structure + TraceHandler parsing
- Max-hop boundary enforcement per hash size
"""
import struct
from collections import OrderedDict
from unittest.mock import MagicMock, patch
import pytest
from pymc_core.protocol import Packet, PacketBuilder, PathUtils
from pymc_core.protocol.constants import (
MAX_PATH_SIZE,
PATH_HASH_COUNT_MASK,
PATH_HASH_SIZE_SHIFT,
PAYLOAD_TYPE_TRACE,
PH_TYPE_SHIFT,
ROUTE_TYPE_DIRECT,
ROUTE_TYPE_FLOOD,
)
from pymc_core.node.handlers.trace import TraceHandler
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
LOCAL_HASH_BYTES = bytes([0xAB, 0xCD, 0xEF])
def _make_flood_packet(path_bytes: bytes, hash_size: int, hash_count: int,
payload: bytes = b"\x01\x02\x03\x04") -> Packet:
"""Create a real flood Packet with the given multi-byte path encoding."""
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.path = bytearray(path_bytes)
pkt.path_len = PathUtils.encode_path_len(hash_size, hash_count)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
return pkt
def _make_direct_packet(path_bytes: bytes, hash_size: int, hash_count: int,
payload: bytes = b"\x01\x02\x03\x04") -> Packet:
"""Create a real direct-routed Packet."""
pkt = Packet()
pkt.header = ROUTE_TYPE_DIRECT
pkt.path = bytearray(path_bytes)
pkt.path_len = PathUtils.encode_path_len(hash_size, hash_count)
pkt.payload = bytearray(payload)
pkt.payload_len = len(payload)
return pkt
def _make_handler(path_hash_mode=0, local_hash_bytes=None):
"""Create a real RepeaterHandler with minimal mocking (only radio/storage)."""
lhb = local_hash_bytes or LOCAL_HASH_BYTES
config = {
"repeater": {
"mode": "forward",
"cache_ttl": 3600,
"use_score_for_tx": False,
"score_threshold": 0.3,
"send_advert_interval_hours": 0,
"node_name": "test-node",
},
"mesh": {
"global_flood_allow": True,
"loop_detect": "off",
"path_hash_mode": path_hash_mode,
},
"delays": {"tx_delay_factor": 1.0, "direct_tx_delay_factor": 0.5},
"duty_cycle": {"max_airtime_per_minute": 3600, "enforcement_enabled": True},
"radio": {
"spreading_factor": 8,
"bandwidth": 125000,
"coding_rate": 8,
"preamble_length": 17,
},
}
dispatcher = MagicMock()
dispatcher.radio = MagicMock(
spreading_factor=8, bandwidth=125000, coding_rate=8,
preamble_length=17, frequency=915000000, tx_power=14,
)
dispatcher.local_identity = MagicMock()
with (
patch("repeater.engine.StorageCollector"),
patch("repeater.engine.RepeaterHandler._start_background_tasks"),
):
from repeater.engine import RepeaterHandler
h = RepeaterHandler(config, dispatcher, lhb[0], local_hash_bytes=lhb)
return h
# ===================================================================
# 1. PathUtils — encode/decode round-trips
# ===================================================================
class TestPathUtilsRoundTrip:
"""Verify PathUtils encode/decode for all valid hash sizes and hop counts."""
@pytest.mark.parametrize("hash_size", [1, 2, 3])
def test_encode_decode_hash_size(self, hash_size):
encoded = PathUtils.encode_path_len(hash_size, 0)
assert PathUtils.get_path_hash_size(encoded) == hash_size
assert PathUtils.get_path_hash_count(encoded) == 0
@pytest.mark.parametrize("hash_size,count", [
(1, 1), (1, 10), (1, 63),
(2, 1), (2, 15), (2, 32),
(3, 1), (3, 10), (3, 21),
])
def test_encode_decode_round_trip(self, hash_size, count):
encoded = PathUtils.encode_path_len(hash_size, count)
assert PathUtils.get_path_hash_size(encoded) == hash_size
assert PathUtils.get_path_hash_count(encoded) == count
assert PathUtils.get_path_byte_len(encoded) == hash_size * count
@pytest.mark.parametrize("hash_size", [1, 2, 3])
def test_encode_zero_hops(self, hash_size):
encoded = PathUtils.encode_path_len(hash_size, 0)
assert PathUtils.get_path_byte_len(encoded) == 0
assert PathUtils.is_valid_path_len(encoded)
def test_encode_preserves_bit_layout(self):
"""Verify the actual bit layout: bits 6-7 = (hash_size-1), bits 0-5 = count."""
for hs in (1, 2, 3):
for count in (0, 1, 30, 63):
if count * hs > MAX_PATH_SIZE:
continue
encoded = PathUtils.encode_path_len(hs, count)
assert (encoded >> PATH_HASH_SIZE_SHIFT) == hs - 1
assert (encoded & PATH_HASH_COUNT_MASK) == count
def test_1_byte_backward_compatible(self):
"""For hash_size=1, encoded byte == raw hop count (legacy compat)."""
for count in range(64):
encoded = PathUtils.encode_path_len(1, count)
assert encoded == count
def test_encode_hop_count_overflow_raises(self):
with pytest.raises(ValueError, match="hop count must be 0-63"):
PathUtils.encode_path_len(1, 64)
@pytest.mark.parametrize("hash_size,max_hops", [(1, 63), (2, 32), (3, 21)])
def test_max_hops_boundary(self, hash_size, max_hops):
at_max = PathUtils.encode_path_len(hash_size, max_hops)
assert PathUtils.is_path_at_max_hops(at_max)
assert PathUtils.is_valid_path_len(at_max)
@pytest.mark.parametrize("hash_size,below_max", [(1, 62), (2, 31), (3, 20)])
def test_below_max_hops_not_at_max(self, hash_size, below_max):
encoded = PathUtils.encode_path_len(hash_size, below_max)
assert not PathUtils.is_path_at_max_hops(encoded)
def test_zero_path_len_not_at_max(self):
assert not PathUtils.is_path_at_max_hops(0)
def test_invalid_path_len_too_many_bytes(self):
"""33 hops of 2 bytes = 66, exceeds MAX_PATH_SIZE=64."""
encoded = PathUtils.encode_path_len(2, 33)
assert not PathUtils.is_valid_path_len(encoded)
def test_hash_size_4_reserved_invalid(self):
"""hash_size=4 (bits 6-7 = 0b11) is reserved and invalid."""
# Manually construct: (3 << 6) | 1 = 0xC1
raw = (3 << PATH_HASH_SIZE_SHIFT) | 1
assert not PathUtils.is_valid_path_len(raw)
# ===================================================================
# 2. Packet — multi-byte path serialization round-trip
# ===================================================================
class TestPacketMultiBytePath:
"""Verify Packet write_to/read_from preserves multi-byte path encoding."""
def test_1_byte_path_round_trip(self):
pkt = _make_flood_packet(b"\xAA\xBB\xCC", hash_size=1, hash_count=3)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 1
assert pkt2.get_path_hash_count() == 3
assert bytes(pkt2.path) == b"\xAA\xBB\xCC"
def test_2_byte_path_round_trip(self):
path = b"\xAA\xBB\xCC\xDD" # 2 hops of 2 bytes
pkt = _make_flood_packet(path, hash_size=2, hash_count=2)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 2
assert pkt2.get_path_hash_count() == 2
assert bytes(pkt2.path) == path
def test_3_byte_path_round_trip(self):
path = b"\xAA\xBB\xCC\xDD\xEE\xFF" # 2 hops of 3 bytes
pkt = _make_flood_packet(path, hash_size=3, hash_count=2)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 3
assert pkt2.get_path_hash_count() == 2
assert bytes(pkt2.path) == path
def test_empty_path_2_byte_mode(self):
pkt = _make_flood_packet(b"", hash_size=2, hash_count=0)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 2
assert pkt2.get_path_hash_count() == 0
assert bytes(pkt2.path) == b""
def test_payload_preserved_after_multibyte_path(self):
"""Payload bytes after a multi-byte path are correctly sliced."""
payload = b"\xDE\xAD\xBE\xEF"
path = b"\x11\x22\x33\x44\x55\x66"
pkt = _make_flood_packet(path, hash_size=3, hash_count=2, payload=payload)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_payload() == payload
def test_path_len_byte_on_wire(self):
"""The encoded path_len byte on the wire has the correct bit pattern."""
pkt = _make_flood_packet(b"\x11\x22\x33\x44", hash_size=2, hash_count=2)
wire = pkt.write_to()
# Wire: header(1) + path_len(1) + path(4) + payload(4)
# For ROUTE_TYPE_FLOOD (no transport codes), path_len is at index 1
path_len_on_wire = wire[1]
assert PathUtils.get_path_hash_size(path_len_on_wire) == 2
assert PathUtils.get_path_hash_count(path_len_on_wire) == 2
class TestPacketGetPathHashes:
"""Verify Packet.get_path_hashes splits path into per-hop byte entries."""
def test_1_byte_hashes(self):
pkt = _make_flood_packet(b"\xAA\xBB\xCC", hash_size=1, hash_count=3)
hashes = pkt.get_path_hashes()
assert hashes == [b"\xAA", b"\xBB", b"\xCC"]
def test_2_byte_hashes(self):
pkt = _make_flood_packet(b"\xAA\xBB\xCC\xDD", hash_size=2, hash_count=2)
hashes = pkt.get_path_hashes()
assert hashes == [b"\xAA\xBB", b"\xCC\xDD"]
def test_3_byte_hashes(self):
pkt = _make_flood_packet(
b"\xAA\xBB\xCC\xDD\xEE\xFF", hash_size=3, hash_count=2
)
hashes = pkt.get_path_hashes()
assert hashes == [b"\xAA\xBB\xCC", b"\xDD\xEE\xFF"]
def test_empty_path(self):
pkt = _make_flood_packet(b"", hash_size=2, hash_count=0)
assert pkt.get_path_hashes() == []
def test_hashes_hex_output(self):
pkt = _make_flood_packet(b"\x0A\x0B\x0C\x0D", hash_size=2, hash_count=2)
hex_hashes = pkt.get_path_hashes_hex()
assert hex_hashes == ["0A0B", "0C0D"]
class TestPacketApplyPathHashMode:
"""Verify Packet.apply_path_hash_mode sets encoding for 0-hop packets."""
@pytest.mark.parametrize("mode,expected_hash_size", [(0, 1), (1, 2), (2, 3)])
def test_apply_mode_sets_hash_size(self, mode, expected_hash_size):
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.payload = bytearray(b"\x01")
pkt.payload_len = 1
pkt.apply_path_hash_mode(mode)
assert pkt.get_path_hash_size() == expected_hash_size
assert pkt.get_path_hash_count() == 0
def test_apply_mode_skips_nonzero_hop_count(self):
"""Mode should not be re-applied if path already has hops."""
pkt = _make_flood_packet(b"\xAA\xBB", hash_size=2, hash_count=1)
original_path_len = pkt.path_len
pkt.apply_path_hash_mode(0) # try to override to 1-byte
assert pkt.path_len == original_path_len # unchanged
def test_apply_mode_skips_trace_packets(self):
"""Trace packets should never have path_hash_mode applied."""
pkt = PacketBuilder.create_trace(tag=1, auth_code=2, flags=0)
pkt.apply_path_hash_mode(2)
# Trace packet path_len stays 0 (no routing path)
assert pkt.path_len == 0
def test_apply_invalid_mode_raises(self):
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
with pytest.raises(ValueError, match="path_hash_mode must be 0, 1, or 2"):
pkt.apply_path_hash_mode(3)
class TestPacketSetPath:
"""Verify Packet.set_path with explicit path_len_encoded."""
def test_set_path_with_encoded_len(self):
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
path = b"\xAA\xBB\xCC\xDD"
encoded = PathUtils.encode_path_len(2, 2)
pkt.set_path(path, path_len_encoded=encoded)
assert pkt.get_path_hash_size() == 2
assert pkt.get_path_hash_count() == 2
assert bytes(pkt.path) == path
def test_set_path_without_encoded_defaults_1_byte(self):
"""Without explicit path_len_encoded, defaults to 1-byte hash_size."""
pkt = Packet()
pkt.header = ROUTE_TYPE_FLOOD
pkt.set_path(b"\xAA\xBB\xCC")
assert pkt.get_path_hash_size() == 1
assert pkt.get_path_hash_count() == 3
# ===================================================================
# 3. Engine flood_forward — real Packet objects
# ===================================================================
class TestFloodForwardMultiByte:
"""Test RepeaterHandler.flood_forward with real multi-byte Packet objects."""
def test_1_byte_mode_appends_single_byte(self):
h = _make_handler(path_hash_mode=0, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11", hash_size=1, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 2
assert result.get_path_hash_size() == 1
hashes = result.get_path_hashes()
assert hashes[0] == b"\x11"
assert hashes[1] == b"\xAB" # first byte of local_hash_bytes
def test_2_byte_mode_appends_two_bytes(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 2
assert result.get_path_hash_size() == 2
hashes = result.get_path_hashes()
assert hashes[0] == b"\x11\x22"
assert hashes[1] == b"\xAB\xCD"
def test_3_byte_mode_appends_three_bytes(self):
h = _make_handler(path_hash_mode=2, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22\x33", hash_size=3, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 2
assert result.get_path_hash_size() == 3
hashes = result.get_path_hashes()
assert hashes[0] == b"\x11\x22\x33"
assert hashes[1] == b"\xAB\xCD\xEF"
def test_empty_path_gets_local_hash_appended(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"", hash_size=2, hash_count=0)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 1
hashes = result.get_path_hashes()
assert hashes[0] == b"\xAB\xCD"
def test_path_len_re_encoded_after_forward(self):
"""After appending, path_len byte should encode (hash_size, count+1)."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22\x33\x44", hash_size=2, hash_count=2)
result = h.flood_forward(pkt)
assert result is not None
expected_path_len = PathUtils.encode_path_len(2, 3)
assert result.path_len == expected_path_len
def test_forwarded_packet_serializes_correctly(self):
"""The forwarded packet should serialize and deserialize cleanly."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_flood_packet(b"\x11\x22", hash_size=2, hash_count=1)
result = h.flood_forward(pkt)
assert result is not None
wire = result.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 2
assert pkt2.get_path_hash_count() == 2
assert pkt2.get_path_hashes() == [b"\x11\x22", b"\xAB\xCD"]
def test_flood_rejects_at_max_hops_2_byte(self):
"""At 32 hops (2-byte mode), flood_forward should drop the packet."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
path = bytes([0x00, 0x01] * 32) # 32 hops × 2 bytes = 64 bytes
pkt = _make_flood_packet(path, hash_size=2, hash_count=32)
result = h.flood_forward(pkt)
assert result is None
assert pkt.drop_reason is not None
def test_flood_rejects_at_max_hops_3_byte(self):
"""At 21 hops (3-byte mode), adding one more would exceed MAX_PATH_SIZE."""
h = _make_handler(path_hash_mode=2, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
path = bytes([0x00, 0x01, 0x02] * 21) # 21 hops × 3 bytes = 63 bytes
pkt = _make_flood_packet(path, hash_size=3, hash_count=21)
result = h.flood_forward(pkt)
assert result is None
assert pkt.drop_reason is not None
def test_flood_allows_below_max_2_byte(self):
"""At 31 hops (2-byte), one more should succeed."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
path = bytes(range(62)) # 31 hops × 2 bytes
pkt = _make_flood_packet(path, hash_size=2, hash_count=31)
result = h.flood_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 32
def test_flood_rejects_empty_payload(self):
h = _make_handler(path_hash_mode=0)
pkt = _make_flood_packet(b"", hash_size=1, hash_count=0, payload=b"")
pkt.payload = bytearray()
result = h.flood_forward(pkt)
assert result is None
assert "Empty payload" in (pkt.drop_reason or "")
# ===================================================================
# 4. Engine direct_forward — real Packet objects
# ===================================================================
class TestDirectForwardMultiByte:
"""Test RepeaterHandler.direct_forward with real multi-byte Packet objects."""
def test_1_byte_match_strips_first_hop(self):
h = _make_handler(path_hash_mode=0, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path: [0xAB, 0x11] — first hop matches local_hash_bytes[0]
pkt = _make_direct_packet(b"\xAB\x11", hash_size=1, hash_count=2)
result = h.direct_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 1
assert result.get_path_hash_size() == 1
assert result.get_path_hashes() == [b"\x11"]
def test_2_byte_match_strips_first_hop(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path: [0xAB,0xCD, 0x11,0x22] — first 2-byte hop matches local_hash_bytes[:2]
pkt = _make_direct_packet(b"\xAB\xCD\x11\x22", hash_size=2, hash_count=2)
result = h.direct_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 1
assert result.get_path_hash_size() == 2
assert result.get_path_hashes() == [b"\x11\x22"]
def test_3_byte_match_strips_first_hop(self):
h = _make_handler(path_hash_mode=2, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(
b"\xAB\xCD\xEF\x11\x22\x33", hash_size=3, hash_count=2
)
result = h.direct_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 1
assert result.get_path_hash_size() == 3
assert result.get_path_hashes() == [b"\x11\x22\x33"]
def test_2_byte_mismatch_rejects(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
# Path: [0xFF,0xEE, ...] — first 2-byte hop doesn't match
pkt = _make_direct_packet(b"\xFF\xEE\x11\x22", hash_size=2, hash_count=2)
result = h.direct_forward(pkt)
assert result is None
assert "not for us" in (pkt.drop_reason or "")
def test_path_len_re_encoded_after_strip(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(b"\xAB\xCD\x11\x22\x33\x44", hash_size=2, hash_count=3)
result = h.direct_forward(pkt)
assert result is not None
expected_path_len = PathUtils.encode_path_len(2, 2)
assert result.path_len == expected_path_len
def test_last_hop_strips_to_empty(self):
"""When only one hop remains and it matches, path becomes empty."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(b"\xAB\xCD", hash_size=2, hash_count=1)
result = h.direct_forward(pkt)
assert result is not None
assert result.get_path_hash_count() == 0
assert bytes(result.path) == b""
def test_forwarded_direct_serializes_correctly(self):
"""After stripping, the packet should serialize/deserialize cleanly."""
h = _make_handler(path_hash_mode=2, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(
b"\xAB\xCD\xEF\x11\x22\x33\x44\x55\x66", hash_size=3, hash_count=3
)
result = h.direct_forward(pkt)
assert result is not None
wire = result.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_path_hash_size() == 3
assert pkt2.get_path_hash_count() == 2
assert pkt2.get_path_hashes() == [b"\x11\x22\x33", b"\x44\x55\x66"]
def test_no_path_rejects(self):
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(b"", hash_size=2, hash_count=0)
result = h.direct_forward(pkt)
assert result is None
assert "no path" in (pkt.drop_reason or "").lower()
def test_path_too_short_for_hash_size(self):
"""If path has fewer bytes than hash_size, reject."""
h = _make_handler(path_hash_mode=1, local_hash_bytes=bytes([0xAB, 0xCD, 0xEF]))
pkt = _make_direct_packet(b"\xAB", hash_size=2, hash_count=1)
# path has 1 byte but hash_size is 2
result = h.direct_forward(pkt)
assert result is None
# ===================================================================
# 5. Flood → Direct — multi-hop forwarding chain
# ===================================================================
class TestMultiHopForwardingChain:
"""Simulate a multi-hop path being built and then consumed."""
def test_flood_chain_builds_path_then_direct_consumes(self):
"""
Simulate: node_A floods → repeater_1 forwards → repeater_2 forwards
Then the return direct packet strips hops in reverse order.
"""
node_a_hash = bytes([0x11, 0x22, 0x33])
rep1_hash = bytes([0xAA, 0xBB, 0xCC])
rep2_hash = bytes([0xDD, 0xEE, 0xFF])
# Step 1: node_A creates a flood packet with 0 hops, 2-byte mode
pkt = _make_flood_packet(b"", hash_size=2, hash_count=0)
# Step 2: repeater_1 flood_forward adds its 2-byte hash
h1 = _make_handler(path_hash_mode=1, local_hash_bytes=rep1_hash)
pkt = h1.flood_forward(pkt)
assert pkt is not None
assert pkt.get_path_hashes() == [rep1_hash[:2]]
# Step 3: repeater_2 flood_forward adds its 2-byte hash
h2 = _make_handler(path_hash_mode=1, local_hash_bytes=rep2_hash)
pkt = h2.flood_forward(pkt)
assert pkt is not None
assert pkt.get_path_hashes() == [rep1_hash[:2], rep2_hash[:2]]
# Verify the path serializes correctly
wire = pkt.write_to()
pkt_rx = Packet()
pkt_rx.read_from(wire)
assert pkt_rx.get_path_hash_size() == 2
assert pkt_rx.get_path_hash_count() == 2
# Step 4: Now simulate a direct reply going back through the path
# The path should be [rep1, rep2] — direct packet addressed to rep1 first
# (Direct packets strip from the front)
direct_pkt = _make_direct_packet(
bytes(pkt_rx.path), hash_size=2, hash_count=2,
payload=b"\xFE\xED"
)
# repeater_1 strips its hop
result = h1.direct_forward(direct_pkt)
assert result is not None
assert result.get_path_hash_count() == 1
assert result.get_path_hashes() == [rep2_hash[:2]]
# repeater_2 strips its hop
result = h2.direct_forward(result)
assert result is not None
assert result.get_path_hash_count() == 0
assert bytes(result.path) == b""
# ===================================================================
# 6. PacketBuilder.create_trace — payload structure
# ===================================================================
class TestTracePacketStructure:
"""Verify real trace packet creation and payload structure."""
def test_create_trace_basic(self):
pkt = PacketBuilder.create_trace(tag=0x12345678, auth_code=0xDEADBEEF, flags=0x01)
assert pkt.get_payload_type() == PAYLOAD_TYPE_TRACE
assert pkt.path_len == 0
assert len(pkt.path) == 0
payload = pkt.get_payload()
assert len(payload) == 9
tag, auth_code, flags = struct.unpack("<IIB", payload[:9])
assert tag == 0x12345678
assert auth_code == 0xDEADBEEF
assert flags == 0x01
def test_create_trace_with_path_bytes(self):
"""Trace path goes into payload, not routing path."""
path_bytes = [0xAA, 0xBB, 0xCC, 0xDD]
pkt = PacketBuilder.create_trace(
tag=1, auth_code=2, flags=0, path=path_bytes
)
payload = pkt.get_payload()
assert len(payload) == 9 + 4
# Routing path stays empty
assert pkt.path_len == 0
assert len(pkt.path) == 0
# Path bytes are in the payload after the 9-byte header
assert list(payload[9:]) == path_bytes
def test_trace_is_direct_route(self):
pkt = PacketBuilder.create_trace(tag=0, auth_code=0, flags=0)
assert pkt.is_route_direct()
def test_trace_apply_path_hash_mode_is_noop(self):
"""apply_path_hash_mode should not alter trace packets."""
pkt = PacketBuilder.create_trace(tag=0, auth_code=0, flags=0)
pkt.apply_path_hash_mode(2)
assert pkt.path_len == 0 # unchanged
def test_trace_serialization_round_trip(self):
path_bytes = [0x11, 0x22, 0x33]
pkt = PacketBuilder.create_trace(tag=42, auth_code=99, flags=3, path=path_bytes)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
assert pkt2.get_payload_type() == PAYLOAD_TYPE_TRACE
payload = pkt2.get_payload()
tag, auth_code, flags = struct.unpack("<IIB", payload[:9])
assert tag == 42
assert auth_code == 99
assert flags == 3
assert list(payload[9:]) == path_bytes
# ===================================================================
# 7. TraceHandler._parse_trace_payload — real parsing
# ===================================================================
class TestTracePayloadParsing:
"""Verify TraceHandler._parse_trace_payload with real trace payloads."""
def _make_trace_handler(self):
handler = object.__new__(TraceHandler)
return handler
def test_parse_basic_trace(self):
th = self._make_trace_handler()
payload = struct.pack("<IIB", 0x12345678, 0xAABBCCDD, 0x05)
result = th._parse_trace_payload(payload)
assert result["valid"]
assert result["tag"] == 0x12345678
assert result["auth_code"] == 0xAABBCCDD
assert result["flags"] == 0x05
assert result["trace_path"] == []
def test_parse_trace_with_1_byte_path(self):
th = self._make_trace_handler()
payload = struct.pack("<IIB", 1, 2, 0) + bytes([0xAA, 0xBB, 0xCC])
result = th._parse_trace_payload(payload)
assert result["valid"]
assert result["trace_path"] == [0xAA, 0xBB, 0xCC]
assert result["path_length"] == 3
def test_parse_trace_with_multibyte_path_is_flat(self):
"""
Trace path is raw bytes in payload — _parse_trace_payload returns it flat.
Multi-byte grouping is NOT done at the trace parser level.
"""
th = self._make_trace_handler()
# 2 hops of 2-byte hashes → 4 flat bytes in the payload
path = bytes([0xAA, 0xBB, 0xCC, 0xDD])
payload = struct.pack("<IIB", 10, 20, 0) + path
result = th._parse_trace_payload(payload)
assert result["valid"]
# Returns flat list, not grouped
assert result["trace_path"] == [0xAA, 0xBB, 0xCC, 0xDD]
assert result["path_length"] == 4
def test_parse_from_real_packet(self):
"""Create a trace with PacketBuilder, serialize, deserialize, then parse."""
th = self._make_trace_handler()
trace_path = [0x11, 0x22, 0x33, 0x44, 0x55, 0x66]
pkt = PacketBuilder.create_trace(
tag=100, auth_code=200, flags=7, path=trace_path
)
wire = pkt.write_to()
pkt2 = Packet()
pkt2.read_from(wire)
result = th._parse_trace_payload(pkt2.get_payload())
assert result["valid"]
assert result["tag"] == 100
assert result["auth_code"] == 200
assert result["flags"] == 7
assert result["trace_path"] == trace_path
def test_parse_too_short_payload(self):
th = self._make_trace_handler()
result = th._parse_trace_payload(b"\x01\x02\x03")
assert "error" in result
assert "too short" in result["error"].lower()
# ===================================================================
# 8. Wire-level verification — manual byte inspection
# ===================================================================
class TestWireLevelEncoding:
"""Verify exact byte layout of serialized multi-byte path packets."""
def test_2_byte_mode_wire_format(self):
"""
ROUTE_TYPE_FLOOD (no transport codes):
[header(1)] [path_len(1)] [path(N)] [payload(M)]
"""
pkt = _make_flood_packet(
b"\xAA\xBB\xCC\xDD", hash_size=2, hash_count=2,
payload=b"\xFE"
)
wire = pkt.write_to()
assert wire[0] == ROUTE_TYPE_FLOOD # header
path_len = wire[1]
assert PathUtils.get_path_hash_size(path_len) == 2
assert PathUtils.get_path_hash_count(path_len) == 2
assert wire[2:6] == b"\xAA\xBB\xCC\xDD" # path bytes
assert wire[6:] == b"\xFE" # payload
def test_3_byte_mode_wire_format(self):
pkt = _make_flood_packet(
b"\x11\x22\x33\x44\x55\x66", hash_size=3, hash_count=2,
payload=b"\xAA"
)
wire = pkt.write_to()
assert wire[0] == ROUTE_TYPE_FLOOD
path_len = wire[1]
assert PathUtils.get_path_hash_size(path_len) == 3
assert PathUtils.get_path_hash_count(path_len) == 2
assert wire[2:8] == b"\x11\x22\x33\x44\x55\x66"
assert wire[8:] == b"\xAA"
def test_1_byte_mode_backward_compat_wire(self):
"""1-byte mode: path_len byte on wire == hop count (legacy format)."""
pkt = _make_flood_packet(b"\xAA\xBB", hash_size=1, hash_count=2)
wire = pkt.write_to()
assert wire[1] == 2 # path_len == hop_count for 1-byte mode
def test_read_from_2_byte_wire(self):
"""Manually construct wire bytes and verify read_from parses correctly."""
# header=ROUTE_TYPE_FLOOD, path_len=encode(2, 2), path=4 bytes, payload=2 bytes
path_len = PathUtils.encode_path_len(2, 2)
wire = bytes([ROUTE_TYPE_FLOOD, path_len]) + b"\xAA\xBB\xCC\xDD" + b"\xFE\xED"
pkt = Packet()
pkt.read_from(wire)
assert pkt.get_path_hash_size() == 2
assert pkt.get_path_hash_count() == 2
assert pkt.get_path_hashes() == [b"\xAA\xBB", b"\xCC\xDD"]
assert pkt.get_payload() == b"\xFE\xED"