diff --git a/web/lib/potato_mesh/application/federation.rb b/web/lib/potato_mesh/application/federation.rb index b97cdc1..3868716 100644 --- a/web/lib/potato_mesh/application/federation.rb +++ b/web/lib/potato_mesh/application/federation.rb @@ -351,6 +351,156 @@ module PotatoMesh [nil, errors] end + # Parse a remote federation instance payload into canonical attributes. + # + # @param payload [Hash] JSON object describing a remote instance. + # @return [Array<(Hash, String), String>] tuple containing the attribute + # hash and signature when valid or a failure reason when invalid. + def remote_instance_attributes_from_payload(payload) + unless payload.is_a?(Hash) + return [nil, nil, "instance payload is not an object"] + end + + id = string_or_nil(payload["id"]) + return [nil, nil, "missing instance id"] unless id + + domain = sanitize_instance_domain(payload["domain"]) + return [nil, nil, "missing instance domain"] unless domain + + pubkey = sanitize_public_key_pem(payload["pubkey"]) + return [nil, nil, "missing instance public key"] unless pubkey + + signature = string_or_nil(payload["signature"]) + return [nil, nil, "missing instance signature"] unless signature + + private_value = if payload.key?("isPrivate") + payload["isPrivate"] + else + payload["is_private"] + end + private_flag = coerce_boolean(private_value) + if private_flag.nil? + numeric_flag = coerce_integer(private_value) + private_flag = !numeric_flag.to_i.zero? if numeric_flag + end + + attributes = { + id: id, + domain: domain, + pubkey: pubkey, + name: string_or_nil(payload["name"]), + version: string_or_nil(payload["version"]), + channel: string_or_nil(payload["channel"]), + frequency: string_or_nil(payload["frequency"]), + latitude: coerce_float(payload["latitude"]), + longitude: coerce_float(payload["longitude"]), + last_update_time: coerce_integer(payload["lastUpdateTime"]), + is_private: private_flag, + } + + [attributes, signature, nil] + rescue StandardError => e + [nil, nil, e.message] + end + + # Recursively ingest federation records exposed by the supplied domain. + # + # @param db [SQLite3::Database] open database connection used for writes. + # @param domain [String] remote domain to crawl for federation records. + # @param visited [Set] domains processed during this crawl. + # @return [Set] updated set of visited domains. + def ingest_known_instances_from!(db, domain, visited: nil) + sanitized = sanitize_instance_domain(domain) + return visited || Set.new unless sanitized + + visited ||= Set.new + return visited if visited.include?(sanitized) + + visited << sanitized + + payload, metadata = fetch_instance_json(sanitized, "/api/instances") + unless payload.is_a?(Array) + warn_log( + "Failed to load remote federation instances", + context: "federation.instances", + domain: sanitized, + reason: Array(metadata).map(&:to_s).join("; "), + ) + return visited + end + + payload.each do |entry| + attributes, signature, reason = remote_instance_attributes_from_payload(entry) + unless attributes && signature + warn_log( + "Discarded remote instance entry", + context: "federation.instances", + domain: sanitized, + reason: reason || "invalid payload", + ) + next + end + + if attributes[:is_private] + debug_log( + "Skipped private remote instance", + context: "federation.instances", + domain: attributes[:domain], + ) + next + end + + unless verify_instance_signature(attributes, signature, attributes[:pubkey]) + warn_log( + "Discarded remote instance entry", + context: "federation.instances", + domain: attributes[:domain], + reason: "invalid signature", + ) + next + end + + attributes[:is_private] = false if attributes[:is_private].nil? + + remote_nodes, node_metadata = fetch_instance_json(attributes[:domain], "/api/nodes") + unless remote_nodes + warn_log( + "Failed to load remote node data", + context: "federation.instances", + domain: attributes[:domain], + reason: Array(node_metadata).map(&:to_s).join("; "), + ) + next + end + + fresh, freshness_reason = validate_remote_nodes(remote_nodes) + unless fresh + warn_log( + "Discarded remote instance entry", + context: "federation.instances", + domain: attributes[:domain], + reason: freshness_reason || "stale node data", + ) + next + end + + begin + upsert_instance_record(db, attributes, signature) + ingest_known_instances_from!(db, attributes[:domain], visited: visited) + rescue ArgumentError => e + warn_log( + "Failed to persist remote instance", + context: "federation.instances", + domain: attributes[:domain], + error_class: e.class.name, + error_message: e.message, + ) + end + end + + visited + end + # Build an HTTP client configured for communication with a remote instance. # # @param uri [URI::Generic] target URI describing the remote endpoint. diff --git a/web/lib/potato_mesh/application/routes/ingest.rb b/web/lib/potato_mesh/application/routes/ingest.rb index f144f9b..49dbaa1 100644 --- a/web/lib/potato_mesh/application/routes/ingest.rb +++ b/web/lib/potato_mesh/application/routes/ingest.rb @@ -217,6 +217,7 @@ module PotatoMesh db = open_database upsert_instance_record(db, attributes, signature) + ingest_known_instances_from!(db, attributes[:domain]) debug_log( "Registered remote instance", context: "ingest.register", diff --git a/web/spec/app_spec.rb b/web/spec/app_spec.rb index c4619c2..b45dbc2 100644 --- a/web/spec/app_spec.rb +++ b/web/spec/app_spec.rb @@ -1193,18 +1193,28 @@ RSpec.describe "Potato Mesh Sinatra app" do it "rejects registrations with invalid signatures" do invalid_payload = instance_payload.merge("signature" => Base64.strict_encode64("invalid")) - expect_any_instance_of(Sinatra::Application).to receive(:warn_log).with( - "Instance registration rejected", - context: "ingest.register", - domain: domain, - reason: "invalid signature", - ).at_least(:once) + warning_calls = [] + allow_any_instance_of(Sinatra::Application).to receive(:warn_log).and_wrap_original do |method, *args, **kwargs| + warning_calls << [args, kwargs] + method.call(*args, **kwargs) + end post "/api/instances", invalid_payload.to_json, { "CONTENT_TYPE" => "application/json" } expect(last_response.status).to eq(400) expect(JSON.parse(last_response.body)).to eq("error" => "invalid signature") + expect(warning_calls).to include( + [ + ["Instance registration rejected"], + hash_including( + context: "ingest.register", + domain: domain, + reason: "invalid signature", + ), + ], + ) + with_db(readonly: true) do |db| count = db.get_first_value("SELECT COUNT(*) FROM instances") expect(count).to eq(1) @@ -1223,25 +1233,375 @@ RSpec.describe "Potato Mesh Sinatra app" do "signature" => restricted_signature, ) - expect_any_instance_of(Sinatra::Application).to receive(:warn_log).with( - "Instance registration rejected", - context: "ingest.register", - domain: restricted_domain, - reason: "restricted IP address", - resolved_ip: an_instance_of(IPAddr), - ).at_least(:once) + warning_calls = [] + allow_any_instance_of(Sinatra::Application).to receive(:warn_log).and_wrap_original do |method, *args, **kwargs| + warning_calls << [args, kwargs] + method.call(*args, **kwargs) + end post "/api/instances", restricted_payload.to_json, { "CONTENT_TYPE" => "application/json" } expect(last_response.status).to eq(400) expect(JSON.parse(last_response.body)).to eq("error" => "restricted domain") + expect(warning_calls).to include( + [ + ["Instance registration rejected"], + hash_including( + context: "ingest.register", + domain: restricted_domain, + reason: "restricted IP address", + resolved_ip: an_instance_of(IPAddr), + ), + ], + ) + with_db(readonly: true) do |db| count = db.get_first_value("SELECT COUNT(*) FROM instances") expect(count).to eq(1) end end + it "ingests federation instances advertised by remote peers" do + ally_key = OpenSSL::PKey::RSA.new(2048) + ally_domain = "ally.mesh" + ally_attributes = { + id: "ally-instance-1", + domain: ally_domain, + pubkey: ally_key.public_key.export, + name: "Ally Mesh", + version: "2.0.0", + channel: "#Allies", + frequency: "433MHz", + latitude: 40.1, + longitude: -74.0, + last_update_time: Time.now.to_i, + is_private: false, + } + ally_signature_payload = canonical_instance_payload(ally_attributes) + ally_signature = Base64.strict_encode64( + ally_key.sign(OpenSSL::Digest::SHA256.new, ally_signature_payload), + ) + ally_payload = { + "id" => ally_attributes[:id], + "domain" => ally_domain, + "pubkey" => ally_attributes[:pubkey], + "name" => ally_attributes[:name], + "version" => ally_attributes[:version], + "channel" => ally_attributes[:channel], + "frequency" => ally_attributes[:frequency], + "latitude" => ally_attributes[:latitude], + "longitude" => ally_attributes[:longitude], + "lastUpdateTime" => ally_attributes[:last_update_time], + "isPrivate" => ally_attributes[:is_private], + "signature" => ally_signature, + } + + ally_nodes = Array.new(PotatoMesh::Config.remote_instance_min_node_count) do |index| + { "node_id" => "ally-node-#{index}", "last_heard" => Time.now.to_i - index } + end + + allow_any_instance_of(Sinatra::Application).to receive(:fetch_instance_json) do |_instance, host, path| + case [host, path] + when [domain, "/.well-known/potato-mesh"] + [well_known_document, URI("https://#{host}#{path}")] + when [domain, "/api/nodes"] + [remote_nodes, URI("https://#{host}#{path}")] + when [domain, "/api/instances"] + [[ally_payload], URI("https://#{host}#{path}")] + when [ally_domain, "/api/nodes"] + [ally_nodes, URI("https://#{host}#{path}")] + when [ally_domain, "/api/instances"] + [[instance_payload], URI("https://#{host}#{path}")] + else + [nil, []] + end + end + + post "/api/instances", instance_payload.to_json, { "CONTENT_TYPE" => "application/json" } + + expect(last_response.status).to eq(201) + + with_db(readonly: true) do |db| + db.results_as_hash = true + ally_row = db.get_first_row( + "SELECT domain, signature FROM instances WHERE domain = ?", + [ally_domain], + ) + remote_row = db.get_first_row( + "SELECT domain, signature FROM instances WHERE domain = ?", + [domain], + ) + + expect(ally_row).not_to be_nil + expect(ally_row["signature"]).to eq(ally_signature) + expect(remote_row).not_to be_nil + expect(remote_row["signature"]).to eq(instance_signature) + end + end + + it "skips remote federation entries that fail validation" do + stale_key = OpenSSL::PKey::RSA.new(2048) + stale_domain = "stale.mesh" + stale_attributes = { + id: "stale-instance", + domain: stale_domain, + pubkey: stale_key.public_key.export, + name: "Stale Mesh", + version: "0.1.0", + channel: "#Stale", + frequency: "868MHz", + latitude: 10.0, + longitude: 20.0, + last_update_time: Time.now.to_i, + is_private: false, + } + stale_signature_payload = canonical_instance_payload(stale_attributes) + stale_signature = Base64.strict_encode64( + stale_key.sign(OpenSSL::Digest::SHA256.new, stale_signature_payload), + ) + stale_payload = { + "id" => stale_attributes[:id], + "domain" => stale_domain, + "pubkey" => stale_attributes[:pubkey], + "name" => stale_attributes[:name], + "version" => stale_attributes[:version], + "channel" => stale_attributes[:channel], + "frequency" => stale_attributes[:frequency], + "latitude" => stale_attributes[:latitude], + "longitude" => stale_attributes[:longitude], + "lastUpdateTime" => stale_attributes[:last_update_time], + "isPrivate" => false, + "signature" => stale_signature, + } + + private_key = OpenSSL::PKey::RSA.new(2048) + private_domain = "private.mesh" + private_attributes = { + id: "private-instance", + domain: private_domain, + pubkey: private_key.public_key.export, + name: "Private Mesh", + version: "3.0.0", + channel: "#Private", + frequency: "915MHz", + latitude: 0.0, + longitude: 0.0, + last_update_time: Time.now.to_i, + is_private: true, + } + private_signature_payload = canonical_instance_payload(private_attributes) + private_signature = Base64.strict_encode64( + private_key.sign(OpenSSL::Digest::SHA256.new, private_signature_payload), + ) + private_payload = { + "id" => private_attributes[:id], + "domain" => private_domain, + "pubkey" => private_attributes[:pubkey], + "name" => private_attributes[:name], + "version" => private_attributes[:version], + "channel" => private_attributes[:channel], + "frequency" => private_attributes[:frequency], + "latitude" => private_attributes[:latitude], + "longitude" => private_attributes[:longitude], + "lastUpdateTime" => private_attributes[:last_update_time], + "isPrivate" => true, + "signature" => private_signature, + } + + invalid_key = OpenSSL::PKey::RSA.new(2048) + invalid_payload = { + "id" => "invalid-instance", + "domain" => "invalid.mesh", + "pubkey" => invalid_key.public_key.export, + "name" => "Invalid Mesh", + "version" => "1.0.0", + "channel" => "#Invalid", + "frequency" => "915MHz", + "latitude" => 1.0, + "longitude" => 2.0, + "lastUpdateTime" => Time.now.to_i, + "isPrivate" => false, + "signature" => Base64.strict_encode64("bogus"), + } + + unreachable_key = OpenSSL::PKey::RSA.new(2048) + unreachable_domain = "unreachable.mesh" + unreachable_attributes = { + id: "unreachable-instance", + domain: unreachable_domain, + pubkey: unreachable_key.public_key.export, + name: "Unreachable Mesh", + version: "6.0.0", + channel: "#Offline", + frequency: "915MHz", + latitude: 12.0, + longitude: 24.0, + last_update_time: Time.now.to_i, + is_private: false, + } + unreachable_signature_payload = canonical_instance_payload(unreachable_attributes) + unreachable_signature = Base64.strict_encode64( + unreachable_key.sign(OpenSSL::Digest::SHA256.new, unreachable_signature_payload), + ) + unreachable_payload = { + "id" => unreachable_attributes[:id], + "domain" => unreachable_domain, + "pubkey" => unreachable_attributes[:pubkey], + "name" => unreachable_attributes[:name], + "version" => unreachable_attributes[:version], + "channel" => unreachable_attributes[:channel], + "frequency" => unreachable_attributes[:frequency], + "latitude" => unreachable_attributes[:latitude], + "longitude" => unreachable_attributes[:longitude], + "lastUpdateTime" => unreachable_attributes[:last_update_time], + "isPrivate" => false, + "signature" => unreachable_signature, + } + + offline_domain = "offline.mesh" + offline_key = OpenSSL::PKey::RSA.new(2048) + offline_attributes = { + id: "offline-instance", + domain: offline_domain, + pubkey: offline_key.public_key.export, + name: "Offline Mesh", + version: "4.0.0", + channel: "#Offline", + frequency: "915MHz", + latitude: 5.0, + longitude: 6.0, + last_update_time: Time.now.to_i, + is_private: false, + } + offline_signature_payload = canonical_instance_payload(offline_attributes) + offline_signature = Base64.strict_encode64( + offline_key.sign(OpenSSL::Digest::SHA256.new, offline_signature_payload), + ) + offline_payload = { + "id" => offline_attributes[:id], + "domain" => offline_domain, + "pubkey" => offline_attributes[:pubkey], + "name" => offline_attributes[:name], + "version" => offline_attributes[:version], + "channel" => offline_attributes[:channel], + "frequency" => offline_attributes[:frequency], + "latitude" => offline_attributes[:latitude], + "longitude" => offline_attributes[:longitude], + "lastUpdateTime" => offline_attributes[:last_update_time], + "isPrivate" => false, + "signature" => offline_signature, + } + + restricted_domain = "127.0.0.1" + restricted_key = OpenSSL::PKey::RSA.new(2048) + restricted_attributes = { + id: "restricted-instance", + domain: restricted_domain, + pubkey: restricted_key.public_key.export, + name: "Restricted Mesh", + version: "5.0.0", + channel: "#Restricted", + frequency: "915MHz", + latitude: 9.0, + longitude: 9.0, + last_update_time: Time.now.to_i, + is_private: false, + } + restricted_signature_payload = canonical_instance_payload(restricted_attributes) + restricted_signature = Base64.strict_encode64( + restricted_key.sign(OpenSSL::Digest::SHA256.new, restricted_signature_payload), + ) + restricted_payload = { + "id" => restricted_attributes[:id], + "domain" => restricted_domain, + "pubkey" => restricted_attributes[:pubkey], + "name" => restricted_attributes[:name], + "version" => restricted_attributes[:version], + "channel" => restricted_attributes[:channel], + "frequency" => restricted_attributes[:frequency], + "latitude" => restricted_attributes[:latitude], + "longitude" => restricted_attributes[:longitude], + "lastUpdateTime" => restricted_attributes[:last_update_time], + "isPrivate" => false, + "signature" => restricted_signature, + } + + stale_nodes = Array.new(PotatoMesh::Config.remote_instance_min_node_count) do |index| + { "node_id" => "stale-node-#{index}", "last_heard" => (Time.now.to_i - PotatoMesh::Config.remote_instance_max_node_age) - index - 1 } + end + + allow_any_instance_of(Sinatra::Application).to receive(:fetch_instance_json) do |_instance, host, path| + case [host, path] + when [domain, "/.well-known/potato-mesh"] + [well_known_document, URI("https://#{host}#{path}")] + when [domain, "/api/nodes"] + [remote_nodes, URI("https://#{host}#{path}")] + when [domain, "/api/instances"] + [ + [ + "unexpected", + private_payload, + invalid_payload, + offline_payload, + stale_payload, + restricted_payload, + unreachable_payload, + ], + URI("https://#{host}#{path}"), + ] + when [offline_domain, "/api/nodes"] + [nil, ["timeout"]] + when [stale_domain, "/api/nodes"] + [stale_nodes, URI("https://#{host}#{path}")] + when [restricted_domain, "/api/nodes"] + [remote_nodes, URI("https://#{host}#{path}")] + when [unreachable_domain, "/api/nodes"] + [remote_nodes, URI("https://#{host}#{path}")] + when [unreachable_domain, "/api/instances"] + [nil, ["connection refused"]] + else + [nil, []] + end + end + + warning_calls = [] + allow_any_instance_of(Sinatra::Application).to receive(:warn_log).and_wrap_original do |method, *args, **kwargs| + warning_calls << [args, kwargs] + method.call(*args, **kwargs) + end + + post "/api/instances", instance_payload.to_json, { "CONTENT_TYPE" => "application/json" } + + expect(last_response.status).to eq(201) + + with_db(readonly: true) do |db| + domains = db.execute("SELECT domain FROM instances ORDER BY domain").flatten + expect(domains).to include(domain, unreachable_domain) + expect(domains).not_to include(stale_domain, private_domain, "invalid.mesh", offline_domain, restricted_domain) + expect(domains.count { |value| value == domain }).to eq(1) + end + + expect(warning_calls).to include( + [ + ["Failed to load remote federation instances"], + hash_including(context: "federation.instances", domain: unreachable_domain), + ], + ) + expect(warning_calls).to include( + [ + ["Discarded remote instance entry"], + hash_including(domain: stale_domain, reason: "node data is stale"), + ], + ) + expect(warning_calls).to include( + [ + ["Failed to persist remote instance"], + hash_including(domain: restricted_domain, error_class: "ArgumentError"), + ], + ) + end + it "accepts signatures when the optional isPrivate field is omitted" do unsigned_attributes = instance_attributes.merge(is_private: nil) unsigned_payload_json = canonical_instance_payload(unsigned_attributes)