Recursively ingest federated instances (#353)

* Recursively ingest federated instances

* Keep absent is_private nil during signature verification
This commit is contained in:
l5y
2025-10-15 21:35:37 +02:00
committed by GitHub
parent a32125996c
commit dc2fa9d247
3 changed files with 524 additions and 13 deletions
@@ -351,6 +351,156 @@ module PotatoMesh
[nil, errors]
end
# Parse a remote federation instance payload into canonical attributes.
#
# @param payload [Hash] JSON object describing a remote instance.
# @return [Array<(Hash, String), String>] tuple containing the attribute
# hash and signature when valid or a failure reason when invalid.
def remote_instance_attributes_from_payload(payload)
unless payload.is_a?(Hash)
return [nil, nil, "instance payload is not an object"]
end
id = string_or_nil(payload["id"])
return [nil, nil, "missing instance id"] unless id
domain = sanitize_instance_domain(payload["domain"])
return [nil, nil, "missing instance domain"] unless domain
pubkey = sanitize_public_key_pem(payload["pubkey"])
return [nil, nil, "missing instance public key"] unless pubkey
signature = string_or_nil(payload["signature"])
return [nil, nil, "missing instance signature"] unless signature
private_value = if payload.key?("isPrivate")
payload["isPrivate"]
else
payload["is_private"]
end
private_flag = coerce_boolean(private_value)
if private_flag.nil?
numeric_flag = coerce_integer(private_value)
private_flag = !numeric_flag.to_i.zero? if numeric_flag
end
attributes = {
id: id,
domain: domain,
pubkey: pubkey,
name: string_or_nil(payload["name"]),
version: string_or_nil(payload["version"]),
channel: string_or_nil(payload["channel"]),
frequency: string_or_nil(payload["frequency"]),
latitude: coerce_float(payload["latitude"]),
longitude: coerce_float(payload["longitude"]),
last_update_time: coerce_integer(payload["lastUpdateTime"]),
is_private: private_flag,
}
[attributes, signature, nil]
rescue StandardError => e
[nil, nil, e.message]
end
# Recursively ingest federation records exposed by the supplied domain.
#
# @param db [SQLite3::Database] open database connection used for writes.
# @param domain [String] remote domain to crawl for federation records.
# @param visited [Set<String>] domains processed during this crawl.
# @return [Set<String>] updated set of visited domains.
def ingest_known_instances_from!(db, domain, visited: nil)
sanitized = sanitize_instance_domain(domain)
return visited || Set.new unless sanitized
visited ||= Set.new
return visited if visited.include?(sanitized)
visited << sanitized
payload, metadata = fetch_instance_json(sanitized, "/api/instances")
unless payload.is_a?(Array)
warn_log(
"Failed to load remote federation instances",
context: "federation.instances",
domain: sanitized,
reason: Array(metadata).map(&:to_s).join("; "),
)
return visited
end
payload.each do |entry|
attributes, signature, reason = remote_instance_attributes_from_payload(entry)
unless attributes && signature
warn_log(
"Discarded remote instance entry",
context: "federation.instances",
domain: sanitized,
reason: reason || "invalid payload",
)
next
end
if attributes[:is_private]
debug_log(
"Skipped private remote instance",
context: "federation.instances",
domain: attributes[:domain],
)
next
end
unless verify_instance_signature(attributes, signature, attributes[:pubkey])
warn_log(
"Discarded remote instance entry",
context: "federation.instances",
domain: attributes[:domain],
reason: "invalid signature",
)
next
end
attributes[:is_private] = false if attributes[:is_private].nil?
remote_nodes, node_metadata = fetch_instance_json(attributes[:domain], "/api/nodes")
unless remote_nodes
warn_log(
"Failed to load remote node data",
context: "federation.instances",
domain: attributes[:domain],
reason: Array(node_metadata).map(&:to_s).join("; "),
)
next
end
fresh, freshness_reason = validate_remote_nodes(remote_nodes)
unless fresh
warn_log(
"Discarded remote instance entry",
context: "federation.instances",
domain: attributes[:domain],
reason: freshness_reason || "stale node data",
)
next
end
begin
upsert_instance_record(db, attributes, signature)
ingest_known_instances_from!(db, attributes[:domain], visited: visited)
rescue ArgumentError => e
warn_log(
"Failed to persist remote instance",
context: "federation.instances",
domain: attributes[:domain],
error_class: e.class.name,
error_message: e.message,
)
end
end
visited
end
# Build an HTTP client configured for communication with a remote instance.
#
# @param uri [URI::Generic] target URI describing the remote endpoint.
@@ -217,6 +217,7 @@ module PotatoMesh
db = open_database
upsert_instance_record(db, attributes, signature)
ingest_known_instances_from!(db, attributes[:domain])
debug_log(
"Registered remote instance",
context: "ingest.register",
+373 -13
View File
@@ -1193,18 +1193,28 @@ RSpec.describe "Potato Mesh Sinatra app" do
it "rejects registrations with invalid signatures" do
invalid_payload = instance_payload.merge("signature" => Base64.strict_encode64("invalid"))
expect_any_instance_of(Sinatra::Application).to receive(:warn_log).with(
"Instance registration rejected",
context: "ingest.register",
domain: domain,
reason: "invalid signature",
).at_least(:once)
warning_calls = []
allow_any_instance_of(Sinatra::Application).to receive(:warn_log).and_wrap_original do |method, *args, **kwargs|
warning_calls << [args, kwargs]
method.call(*args, **kwargs)
end
post "/api/instances", invalid_payload.to_json, { "CONTENT_TYPE" => "application/json" }
expect(last_response.status).to eq(400)
expect(JSON.parse(last_response.body)).to eq("error" => "invalid signature")
expect(warning_calls).to include(
[
["Instance registration rejected"],
hash_including(
context: "ingest.register",
domain: domain,
reason: "invalid signature",
),
],
)
with_db(readonly: true) do |db|
count = db.get_first_value("SELECT COUNT(*) FROM instances")
expect(count).to eq(1)
@@ -1223,25 +1233,375 @@ RSpec.describe "Potato Mesh Sinatra app" do
"signature" => restricted_signature,
)
expect_any_instance_of(Sinatra::Application).to receive(:warn_log).with(
"Instance registration rejected",
context: "ingest.register",
domain: restricted_domain,
reason: "restricted IP address",
resolved_ip: an_instance_of(IPAddr),
).at_least(:once)
warning_calls = []
allow_any_instance_of(Sinatra::Application).to receive(:warn_log).and_wrap_original do |method, *args, **kwargs|
warning_calls << [args, kwargs]
method.call(*args, **kwargs)
end
post "/api/instances", restricted_payload.to_json, { "CONTENT_TYPE" => "application/json" }
expect(last_response.status).to eq(400)
expect(JSON.parse(last_response.body)).to eq("error" => "restricted domain")
expect(warning_calls).to include(
[
["Instance registration rejected"],
hash_including(
context: "ingest.register",
domain: restricted_domain,
reason: "restricted IP address",
resolved_ip: an_instance_of(IPAddr),
),
],
)
with_db(readonly: true) do |db|
count = db.get_first_value("SELECT COUNT(*) FROM instances")
expect(count).to eq(1)
end
end
it "ingests federation instances advertised by remote peers" do
ally_key = OpenSSL::PKey::RSA.new(2048)
ally_domain = "ally.mesh"
ally_attributes = {
id: "ally-instance-1",
domain: ally_domain,
pubkey: ally_key.public_key.export,
name: "Ally Mesh",
version: "2.0.0",
channel: "#Allies",
frequency: "433MHz",
latitude: 40.1,
longitude: -74.0,
last_update_time: Time.now.to_i,
is_private: false,
}
ally_signature_payload = canonical_instance_payload(ally_attributes)
ally_signature = Base64.strict_encode64(
ally_key.sign(OpenSSL::Digest::SHA256.new, ally_signature_payload),
)
ally_payload = {
"id" => ally_attributes[:id],
"domain" => ally_domain,
"pubkey" => ally_attributes[:pubkey],
"name" => ally_attributes[:name],
"version" => ally_attributes[:version],
"channel" => ally_attributes[:channel],
"frequency" => ally_attributes[:frequency],
"latitude" => ally_attributes[:latitude],
"longitude" => ally_attributes[:longitude],
"lastUpdateTime" => ally_attributes[:last_update_time],
"isPrivate" => ally_attributes[:is_private],
"signature" => ally_signature,
}
ally_nodes = Array.new(PotatoMesh::Config.remote_instance_min_node_count) do |index|
{ "node_id" => "ally-node-#{index}", "last_heard" => Time.now.to_i - index }
end
allow_any_instance_of(Sinatra::Application).to receive(:fetch_instance_json) do |_instance, host, path|
case [host, path]
when [domain, "/.well-known/potato-mesh"]
[well_known_document, URI("https://#{host}#{path}")]
when [domain, "/api/nodes"]
[remote_nodes, URI("https://#{host}#{path}")]
when [domain, "/api/instances"]
[[ally_payload], URI("https://#{host}#{path}")]
when [ally_domain, "/api/nodes"]
[ally_nodes, URI("https://#{host}#{path}")]
when [ally_domain, "/api/instances"]
[[instance_payload], URI("https://#{host}#{path}")]
else
[nil, []]
end
end
post "/api/instances", instance_payload.to_json, { "CONTENT_TYPE" => "application/json" }
expect(last_response.status).to eq(201)
with_db(readonly: true) do |db|
db.results_as_hash = true
ally_row = db.get_first_row(
"SELECT domain, signature FROM instances WHERE domain = ?",
[ally_domain],
)
remote_row = db.get_first_row(
"SELECT domain, signature FROM instances WHERE domain = ?",
[domain],
)
expect(ally_row).not_to be_nil
expect(ally_row["signature"]).to eq(ally_signature)
expect(remote_row).not_to be_nil
expect(remote_row["signature"]).to eq(instance_signature)
end
end
it "skips remote federation entries that fail validation" do
stale_key = OpenSSL::PKey::RSA.new(2048)
stale_domain = "stale.mesh"
stale_attributes = {
id: "stale-instance",
domain: stale_domain,
pubkey: stale_key.public_key.export,
name: "Stale Mesh",
version: "0.1.0",
channel: "#Stale",
frequency: "868MHz",
latitude: 10.0,
longitude: 20.0,
last_update_time: Time.now.to_i,
is_private: false,
}
stale_signature_payload = canonical_instance_payload(stale_attributes)
stale_signature = Base64.strict_encode64(
stale_key.sign(OpenSSL::Digest::SHA256.new, stale_signature_payload),
)
stale_payload = {
"id" => stale_attributes[:id],
"domain" => stale_domain,
"pubkey" => stale_attributes[:pubkey],
"name" => stale_attributes[:name],
"version" => stale_attributes[:version],
"channel" => stale_attributes[:channel],
"frequency" => stale_attributes[:frequency],
"latitude" => stale_attributes[:latitude],
"longitude" => stale_attributes[:longitude],
"lastUpdateTime" => stale_attributes[:last_update_time],
"isPrivate" => false,
"signature" => stale_signature,
}
private_key = OpenSSL::PKey::RSA.new(2048)
private_domain = "private.mesh"
private_attributes = {
id: "private-instance",
domain: private_domain,
pubkey: private_key.public_key.export,
name: "Private Mesh",
version: "3.0.0",
channel: "#Private",
frequency: "915MHz",
latitude: 0.0,
longitude: 0.0,
last_update_time: Time.now.to_i,
is_private: true,
}
private_signature_payload = canonical_instance_payload(private_attributes)
private_signature = Base64.strict_encode64(
private_key.sign(OpenSSL::Digest::SHA256.new, private_signature_payload),
)
private_payload = {
"id" => private_attributes[:id],
"domain" => private_domain,
"pubkey" => private_attributes[:pubkey],
"name" => private_attributes[:name],
"version" => private_attributes[:version],
"channel" => private_attributes[:channel],
"frequency" => private_attributes[:frequency],
"latitude" => private_attributes[:latitude],
"longitude" => private_attributes[:longitude],
"lastUpdateTime" => private_attributes[:last_update_time],
"isPrivate" => true,
"signature" => private_signature,
}
invalid_key = OpenSSL::PKey::RSA.new(2048)
invalid_payload = {
"id" => "invalid-instance",
"domain" => "invalid.mesh",
"pubkey" => invalid_key.public_key.export,
"name" => "Invalid Mesh",
"version" => "1.0.0",
"channel" => "#Invalid",
"frequency" => "915MHz",
"latitude" => 1.0,
"longitude" => 2.0,
"lastUpdateTime" => Time.now.to_i,
"isPrivate" => false,
"signature" => Base64.strict_encode64("bogus"),
}
unreachable_key = OpenSSL::PKey::RSA.new(2048)
unreachable_domain = "unreachable.mesh"
unreachable_attributes = {
id: "unreachable-instance",
domain: unreachable_domain,
pubkey: unreachable_key.public_key.export,
name: "Unreachable Mesh",
version: "6.0.0",
channel: "#Offline",
frequency: "915MHz",
latitude: 12.0,
longitude: 24.0,
last_update_time: Time.now.to_i,
is_private: false,
}
unreachable_signature_payload = canonical_instance_payload(unreachable_attributes)
unreachable_signature = Base64.strict_encode64(
unreachable_key.sign(OpenSSL::Digest::SHA256.new, unreachable_signature_payload),
)
unreachable_payload = {
"id" => unreachable_attributes[:id],
"domain" => unreachable_domain,
"pubkey" => unreachable_attributes[:pubkey],
"name" => unreachable_attributes[:name],
"version" => unreachable_attributes[:version],
"channel" => unreachable_attributes[:channel],
"frequency" => unreachable_attributes[:frequency],
"latitude" => unreachable_attributes[:latitude],
"longitude" => unreachable_attributes[:longitude],
"lastUpdateTime" => unreachable_attributes[:last_update_time],
"isPrivate" => false,
"signature" => unreachable_signature,
}
offline_domain = "offline.mesh"
offline_key = OpenSSL::PKey::RSA.new(2048)
offline_attributes = {
id: "offline-instance",
domain: offline_domain,
pubkey: offline_key.public_key.export,
name: "Offline Mesh",
version: "4.0.0",
channel: "#Offline",
frequency: "915MHz",
latitude: 5.0,
longitude: 6.0,
last_update_time: Time.now.to_i,
is_private: false,
}
offline_signature_payload = canonical_instance_payload(offline_attributes)
offline_signature = Base64.strict_encode64(
offline_key.sign(OpenSSL::Digest::SHA256.new, offline_signature_payload),
)
offline_payload = {
"id" => offline_attributes[:id],
"domain" => offline_domain,
"pubkey" => offline_attributes[:pubkey],
"name" => offline_attributes[:name],
"version" => offline_attributes[:version],
"channel" => offline_attributes[:channel],
"frequency" => offline_attributes[:frequency],
"latitude" => offline_attributes[:latitude],
"longitude" => offline_attributes[:longitude],
"lastUpdateTime" => offline_attributes[:last_update_time],
"isPrivate" => false,
"signature" => offline_signature,
}
restricted_domain = "127.0.0.1"
restricted_key = OpenSSL::PKey::RSA.new(2048)
restricted_attributes = {
id: "restricted-instance",
domain: restricted_domain,
pubkey: restricted_key.public_key.export,
name: "Restricted Mesh",
version: "5.0.0",
channel: "#Restricted",
frequency: "915MHz",
latitude: 9.0,
longitude: 9.0,
last_update_time: Time.now.to_i,
is_private: false,
}
restricted_signature_payload = canonical_instance_payload(restricted_attributes)
restricted_signature = Base64.strict_encode64(
restricted_key.sign(OpenSSL::Digest::SHA256.new, restricted_signature_payload),
)
restricted_payload = {
"id" => restricted_attributes[:id],
"domain" => restricted_domain,
"pubkey" => restricted_attributes[:pubkey],
"name" => restricted_attributes[:name],
"version" => restricted_attributes[:version],
"channel" => restricted_attributes[:channel],
"frequency" => restricted_attributes[:frequency],
"latitude" => restricted_attributes[:latitude],
"longitude" => restricted_attributes[:longitude],
"lastUpdateTime" => restricted_attributes[:last_update_time],
"isPrivate" => false,
"signature" => restricted_signature,
}
stale_nodes = Array.new(PotatoMesh::Config.remote_instance_min_node_count) do |index|
{ "node_id" => "stale-node-#{index}", "last_heard" => (Time.now.to_i - PotatoMesh::Config.remote_instance_max_node_age) - index - 1 }
end
allow_any_instance_of(Sinatra::Application).to receive(:fetch_instance_json) do |_instance, host, path|
case [host, path]
when [domain, "/.well-known/potato-mesh"]
[well_known_document, URI("https://#{host}#{path}")]
when [domain, "/api/nodes"]
[remote_nodes, URI("https://#{host}#{path}")]
when [domain, "/api/instances"]
[
[
"unexpected",
private_payload,
invalid_payload,
offline_payload,
stale_payload,
restricted_payload,
unreachable_payload,
],
URI("https://#{host}#{path}"),
]
when [offline_domain, "/api/nodes"]
[nil, ["timeout"]]
when [stale_domain, "/api/nodes"]
[stale_nodes, URI("https://#{host}#{path}")]
when [restricted_domain, "/api/nodes"]
[remote_nodes, URI("https://#{host}#{path}")]
when [unreachable_domain, "/api/nodes"]
[remote_nodes, URI("https://#{host}#{path}")]
when [unreachable_domain, "/api/instances"]
[nil, ["connection refused"]]
else
[nil, []]
end
end
warning_calls = []
allow_any_instance_of(Sinatra::Application).to receive(:warn_log).and_wrap_original do |method, *args, **kwargs|
warning_calls << [args, kwargs]
method.call(*args, **kwargs)
end
post "/api/instances", instance_payload.to_json, { "CONTENT_TYPE" => "application/json" }
expect(last_response.status).to eq(201)
with_db(readonly: true) do |db|
domains = db.execute("SELECT domain FROM instances ORDER BY domain").flatten
expect(domains).to include(domain, unreachable_domain)
expect(domains).not_to include(stale_domain, private_domain, "invalid.mesh", offline_domain, restricted_domain)
expect(domains.count { |value| value == domain }).to eq(1)
end
expect(warning_calls).to include(
[
["Failed to load remote federation instances"],
hash_including(context: "federation.instances", domain: unreachable_domain),
],
)
expect(warning_calls).to include(
[
["Discarded remote instance entry"],
hash_including(domain: stale_domain, reason: "node data is stale"),
],
)
expect(warning_calls).to include(
[
["Failed to persist remote instance"],
hash_including(domain: restricted_domain, error_class: "ArgumentError"),
],
)
end
it "accepts signatures when the optional isPrivate field is omitted" do
unsigned_attributes = instance_attributes.merge(is_private: nil)
unsigned_payload_json = canonical_instance_payload(unsigned_attributes)