mirror of
https://github.com/l5yth/potato-mesh.git
synced 2026-03-28 17:42:48 +01:00
web: daemonize federation worker pool to avoid deadlocks on stuck announcments (#610)
* web: daemonize federation worker pool to avoid deadlocks on stuck announcments * web: address review comments * web: address review comments
This commit is contained in:
@@ -177,6 +177,7 @@ module PotatoMesh
|
||||
pool = PotatoMesh::App::WorkerPool.new(
|
||||
size: PotatoMesh::Config.federation_worker_pool_size,
|
||||
max_queue: PotatoMesh::Config.federation_worker_queue_capacity,
|
||||
task_timeout: PotatoMesh::Config.federation_task_timeout_seconds,
|
||||
name: "potato-mesh-fed",
|
||||
)
|
||||
|
||||
@@ -442,6 +443,8 @@ module PotatoMesh
|
||||
end
|
||||
end
|
||||
thread.name = "potato-mesh-federation" if thread.respond_to?(:name=)
|
||||
# Allow shutdown even if the announcement loop is still sleeping.
|
||||
thread.daemon = true if thread.respond_to?(:daemon=)
|
||||
set(:federation_thread, thread)
|
||||
thread
|
||||
end
|
||||
@@ -474,6 +477,8 @@ module PotatoMesh
|
||||
end
|
||||
thread.name = "potato-mesh-federation-initial" if thread.respond_to?(:name=)
|
||||
thread.report_on_exception = false if thread.respond_to?(:report_on_exception=)
|
||||
# Avoid blocking process shutdown during delayed startup announcements.
|
||||
thread.daemon = true if thread.respond_to?(:daemon=)
|
||||
set(:initial_federation_thread, thread)
|
||||
thread
|
||||
end
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
|
||||
# frozen_string_literal: true
|
||||
|
||||
require "timeout"
|
||||
|
||||
module PotatoMesh
|
||||
module App
|
||||
# WorkerPool executes submitted blocks using a bounded set of Ruby threads.
|
||||
@@ -124,8 +126,9 @@ module PotatoMesh
|
||||
#
|
||||
# @param size [Integer] number of worker threads to spawn.
|
||||
# @param max_queue [Integer, nil] optional upper bound on queued jobs.
|
||||
# @param task_timeout [Numeric, nil] optional per-task execution timeout.
|
||||
# @param name [String] prefix assigned to worker thread names.
|
||||
def initialize(size:, max_queue: nil, name: "worker-pool")
|
||||
def initialize(size:, max_queue: nil, task_timeout: nil, name: "worker-pool")
|
||||
raise ArgumentError, "size must be positive" unless size.is_a?(Integer) && size.positive?
|
||||
|
||||
@name = name
|
||||
@@ -133,6 +136,7 @@ module PotatoMesh
|
||||
@threads = []
|
||||
@stopped = false
|
||||
@mutex = Mutex.new
|
||||
@task_timeout = normalize_task_timeout(task_timeout)
|
||||
spawn_workers(size)
|
||||
end
|
||||
|
||||
@@ -192,23 +196,45 @@ module PotatoMesh
|
||||
worker = Thread.new do
|
||||
Thread.current.name = "#{@name}-#{index}" if Thread.current.respond_to?(:name=)
|
||||
Thread.current.report_on_exception = false if Thread.current.respond_to?(:report_on_exception=)
|
||||
# Daemon threads allow the process to exit even if a job is stuck.
|
||||
Thread.current.daemon = true if Thread.current.respond_to?(:daemon=)
|
||||
|
||||
loop do
|
||||
task, block = @queue.pop
|
||||
break if task.equal?(STOP_SIGNAL)
|
||||
|
||||
begin
|
||||
result = block.call
|
||||
result = if @task_timeout
|
||||
Timeout.timeout(@task_timeout, TaskTimeoutError, "task exceeded timeout") do
|
||||
block.call
|
||||
end
|
||||
else
|
||||
block.call
|
||||
end
|
||||
task.fulfill(result)
|
||||
rescue StandardError => e
|
||||
task.reject(e)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@threads << worker
|
||||
end
|
||||
end
|
||||
|
||||
# Normalize the per-task timeout into a positive float value.
|
||||
#
|
||||
# @param task_timeout [Numeric, nil] candidate timeout value.
|
||||
# @return [Float, nil] positive timeout in seconds or nil when disabled.
|
||||
def normalize_task_timeout(task_timeout)
|
||||
return nil if task_timeout.nil?
|
||||
|
||||
value = Float(task_timeout)
|
||||
return nil unless value.positive?
|
||||
|
||||
value
|
||||
rescue ArgumentError, TypeError
|
||||
nil
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -18,8 +18,13 @@ require "spec_helper"
|
||||
require "timeout"
|
||||
|
||||
RSpec.describe PotatoMesh::App::WorkerPool do
|
||||
def with_pool(size: 2, queue: 2)
|
||||
pool = PotatoMesh::App::WorkerPool.new(size: size, max_queue: queue, name: "spec-pool")
|
||||
def with_pool(size: 2, queue: 2, task_timeout: nil)
|
||||
pool = PotatoMesh::App::WorkerPool.new(
|
||||
size: size,
|
||||
max_queue: queue,
|
||||
task_timeout: task_timeout,
|
||||
name: "spec-pool",
|
||||
)
|
||||
yield pool
|
||||
ensure
|
||||
pool&.shutdown(timeout: 0.5)
|
||||
@@ -33,6 +38,20 @@ RSpec.describe PotatoMesh::App::WorkerPool do
|
||||
end
|
||||
end
|
||||
|
||||
it "fails tasks that exceed the configured timeout" do
|
||||
with_pool(task_timeout: 0.01) do |pool|
|
||||
task = pool.schedule { sleep 0.05; :late }
|
||||
expect { task.wait(timeout: 1) }.to raise_error(described_class::TaskTimeoutError)
|
||||
end
|
||||
end
|
||||
|
||||
it "ignores invalid timeout values" do
|
||||
with_pool(task_timeout: "nope") do |pool|
|
||||
task = pool.schedule { sleep 0.01; :ok }
|
||||
expect(task.wait(timeout: 1)).to eq(:ok)
|
||||
end
|
||||
end
|
||||
|
||||
it "propagates exceptions raised by the job block" do
|
||||
with_pool do |pool|
|
||||
task = pool.schedule { raise ArgumentError, "boom" }
|
||||
|
||||
Reference in New Issue
Block a user