From ba373e3a529b5ed3a248e8be4c2e250767d9f1d8 Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel Date: Tue, 16 Dec 2025 13:42:07 -0500 Subject: [PATCH 01/10] update Gemfile with async gems --- Gemfile | 3 +++ Gemfile.lock | 57 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/Gemfile b/Gemfile index 3bfe979f6..477da7c22 100644 --- a/Gemfile +++ b/Gemfile @@ -5,6 +5,9 @@ source "https://rubygems.org" gem "rake" gem "rake-compiler" gem "concurrent-ruby" +gem "async" +gem "async-bus" +gem "async-container-supervisor" group :test do gem "benchmark-memory" diff --git a/Gemfile.lock b/Gemfile.lock index b2e0c9748..6794e4a6b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -33,6 +33,32 @@ GEM remote: https://rubygems.org/ specs: ast (2.4.3) + async (2.35.0) + console (~> 1.29) + fiber-annotation + io-event (~> 1.11) + metrics (~> 0.12) + traces (~> 0.18) + async-bus (0.3.1) + async + io-endpoint + io-stream + msgpack + async-container (0.27.7) + async (~> 2.22) + async-container-supervisor (0.9.2) + async-service + io-endpoint + memory (~> 0.7) + memory-leak (~> 0.5) + process-metrics + async-service (0.16.0) + async + async-container (~> 0.16) + string-format (~> 0.2) + bake (0.24.1) + bigdecimal + samovar (~> 2.1) base64 (0.3.0) benchmark (0.4.1) benchmark-ips (2.14.0) @@ -43,12 +69,20 @@ GEM coderay (1.1.3) concurrent-ruby (1.3.5) connection_pool (2.5.4) + console (1.34.2) + fiber-annotation + fiber-local (~> 1.1) + json date (3.4.1) debug (1.11.0) irb (~> 1.10) reline (>= 0.3.8) drb (2.2.3) erb (5.0.1) + fiber-annotation (0.2.0) + fiber-local (1.1.0) + fiber-storage + fiber-storage (1.0.1) google-protobuf (4.32.0-aarch64-linux-gnu) bigdecimal rake (>= 13) @@ -81,6 +115,9 @@ GEM i18n (1.14.7) concurrent-ruby (~> 1.0) io-console (0.8.0) + io-endpoint (0.16.0) + io-event (1.14.2) + io-stream (0.11.1) irb (1.15.2) pp (>= 0.6.0) rdoc (>= 4.0.0) @@ -89,12 +126,20 @@ GEM language_server-protocol (3.17.0.5) lint_roller (1.1.0) logger (1.7.0) + mapping (1.1.3) matrix (0.4.3) + memory (0.12.0) + bake (~> 0.15) + console + msgpack + memory-leak (0.7.0) memory_profiler (1.1.0) method_source (1.1.0) + metrics (0.15.0) minitest (5.25.5) mocha (2.7.1) ruby2_keywords (>= 0.0.5) + msgpack (1.8.0) mutex_m (0.3.0) mysql2 (0.5.6) parallel (1.27.0) @@ -105,6 +150,10 @@ GEM prettyprint prettyprint (0.2.0) prism (1.4.0) + process-metrics (0.8.0) + console (~> 1.8) + json (~> 2) + samovar (~> 2.1) pry (0.15.2) coderay (~> 1.1) method_source (~> 1.0) @@ -160,10 +209,15 @@ GEM ruby2_keywords (0.0.5) rubystats (0.4.1) matrix + samovar (2.4.1) + console (~> 1.0) + mapping (~> 1.0) securerandom (0.4.1) + string-format (0.2.0) stringio (3.1.7) timeout (0.4.3) toxiproxy (2.0.2) + traces (0.18.2) trilogy (2.9.0) tzinfo (2.0.6) concurrent-ruby (~> 1.0) @@ -187,6 +241,9 @@ PLATFORMS DEPENDENCIES activerecord! + async + async-bus + async-container-supervisor benchmark-ips benchmark-memory bigdecimal From 9fc13e070bafe72534523c3f084d7f59904476b2 Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel Date: Tue, 16 Dec 2025 14:29:04 -0500 Subject: [PATCH 02/10] implement WAL --- Gemfile | 2 + Gemfile.lock | 4 ++ lib/semian/pod_pid/wal.rb | 130 ++++++++++++++++++++++++++++++++++++++ test/pod_pid/wal_test.rb | 120 +++++++++++++++++++++++++++++++++++ 4 files changed, 256 insertions(+) create mode 100644 lib/semian/pod_pid/wal.rb create mode 100644 test/pod_pid/wal_test.rb diff --git a/Gemfile b/Gemfile index 477da7c22..6fca8145b 100644 --- a/Gemfile +++ b/Gemfile @@ -8,6 +8,8 @@ gem "concurrent-ruby" gem "async" gem "async-bus" gem "async-container-supervisor" +gem "digest-crc" +gem "msgpack" group :test do gem "benchmark-memory" diff --git a/Gemfile.lock b/Gemfile.lock index 6794e4a6b..21d8298e7 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -77,6 +77,8 @@ GEM debug (1.11.0) irb (~> 1.10) reline (>= 0.3.8) + digest-crc (0.7.0) + rake (>= 12.0.0, < 14.0.0) drb (2.2.3) erb (5.0.1) fiber-annotation (0.2.0) @@ -249,12 +251,14 @@ DEPENDENCIES bigdecimal concurrent-ruby debug + digest-crc grpc (= 1.74.1) hiredis (~> 0.6) hiredis-client (>= 0.12.0) memory_profiler minitest mocha + msgpack mutex_m mysql2 (~> 0.5) pry-byebug diff --git a/lib/semian/pod_pid/wal.rb b/lib/semian/pod_pid/wal.rb new file mode 100644 index 000000000..6ec313c75 --- /dev/null +++ b/lib/semian/pod_pid/wal.rb @@ -0,0 +1,130 @@ +# frozen_string_literal: true + +require "digest/crc32" +require "msgpack" +require "fileutils" + +# The WAL uses the binary format described below: +# +# +----------+---------------+-----------------+------------------+-------+-------+ +# | CRC (4B) | Key Size (2B) | Value Size (2B) | Timestamp (8B) | Key | Value | +# +----------+---------------+-----------------+------------------+-------+-------+ +# +# CRC (4 bytes) +# - CRC32 checksum computed over the payload (everything after CRC) +# - Used to detect corrupted entries during replay +# - Packed as big-endian unsigned 32-bit integer (N) +# +# Key Size (2 bytes) +# - Length of the Key field in bytes +# - Packed as big-endian unsigned 16-bit integer (S>) +# - Max key size: 65,535 bytes +# +# Value Size (2 bytes) +# - Length of the Value field in bytes +# - Packed as big-endian unsigned 16-bit integer (S>) +# - Max value size: 65,535 bytes +# +# Timestamp (8 bytes) +# - Microseconds since Unix epoch when entry was written +# - Packed as big-endian unsigned 64-bit integer (Q>) +# - Used to determine which entry is latest per resource +# +# Key (variable) +# - Resource name as UTF-8 encoded string (e.g., "mysql", "redis_cache") +# +# Value (variable) +# - PID state serialized with MessagePack +# - Contains: rejection_rate, integral, smoother_value, observation_count +# +module Semian + module PodPID + class WAL + DEFAULT_PATH = "/tmp/semian_pid.wal" + + HEADER_SIZE = 16 + + attr_reader :path + + def initialize(path = DEFAULT_PATH) + @path = path + @mutex = Mutex.new + ensure_directory_exists + end + + def write(resource, state) + @mutex.synchronize do + key = resource.to_s.encode("UTF-8") + value = MessagePack.pack(state) + timestamp = (Time.now.to_f * 1_000_000).to_i + + payload = [key.bytesize, value.bytesize, timestamp].pack("S>S>Q>") + key + value + crc = Digest::CRC32.checksum(payload) + + File.open(@path, "ab") do |f| + f.write([crc].pack("N") + payload) + end + end + end + + def replay + return 0 unless File.exist?(@path) + + entries_by_resource = {} + + @mutex.synchronize do + File.open(@path, "rb") do |f| + until f.eof? + break unless (entry = read_entry(f)) + + resource, state, _timestamp = entry + entries_by_resource[resource] = state + end + end + end + + entries_by_resource.each do |resource, state| + yield(resource, state) if block_given? + end + + entries_by_resource.size + end + + def truncate + @mutex.synchronize do + File.truncate(@path, 0) if File.exist?(@path) + end + end + + private + + def read_entry(file) + header = file.read(4) + return unless header&.bytesize == 4 + + crc = header.unpack1("N") + + sizes_and_ts = file.read(12) + return unless sizes_and_ts&.bytesize == 12 + + key_size, value_size, timestamp = sizes_and_ts.unpack("S>S>Q>") + + key = file.read(key_size) + value = file.read(value_size) + return unless key&.bytesize == key_size && value&.bytesize == value_size + + payload = sizes_and_ts + key + value + expected_crc = Digest::CRC32.checksum(payload) + + return unless crc == expected_crc + + [key, MessagePack.unpack(value, symbolize_keys: true), timestamp] + end + + def ensure_directory_exists + dir = File.dirname(@path) + FileUtils.mkdir_p(dir) unless File.directory?(dir) + end + end + end +end diff --git a/test/pod_pid/wal_test.rb b/test/pod_pid/wal_test.rb new file mode 100644 index 000000000..1d4fad2eb --- /dev/null +++ b/test/pod_pid/wal_test.rb @@ -0,0 +1,120 @@ +# frozen_string_literal: true + +require "rubygems" +require "bundler/setup" +require "minitest/autorun" +require "tempfile" +require "digest/crc32" +require "msgpack" + +require_relative "../../lib/semian/pod_pid/wal" + +module Semian + module PodPID + class WALTest < Minitest::Test + def setup + @temp_file = Tempfile.new("semian_wal_test") + @wal_path = @temp_file.path + @temp_file.close + File.delete(@wal_path) if File.exist?(@wal_path) + @wal = WAL.new(@wal_path) + end + + def teardown + File.delete(@wal_path) if File.exist?(@wal_path) + @temp_file&.unlink + end + + def test_write_and_replay_single_entry + state = { rejection_rate: 0.25, integral: 1.5 } + @wal.write("mysql", state) + + entries = [] + @wal.replay { |resource, s| entries << [resource, s] } + + assert_equal(1, entries.size) + assert_equal("mysql", entries[0][0]) + assert_equal(0.25, entries[0][1][:rejection_rate]) + assert_equal(1.5, entries[0][1][:integral]) + end + + def test_replay_returns_last_entry_per_resource + @wal.write("mysql", { rejection_rate: 0.1, integral: 0.5 }) + @wal.write("redis", { rejection_rate: 0.2, integral: 1.0 }) + @wal.write("mysql", { rejection_rate: 0.3, integral: 1.5 }) + + entries = {} + @wal.replay { |resource, state| entries[resource] = state } + + assert_equal(2, entries.size) + assert_equal(0.3, entries["mysql"][:rejection_rate]) + assert_equal(1.5, entries["mysql"][:integral]) + assert_equal(0.2, entries["redis"][:rejection_rate]) + assert_equal(1.0, entries["redis"][:integral]) + end + + def test_truncate_clears_log + @wal.write("mysql", { rejection_rate: 0.5 }) + @wal.truncate + + count = @wal.replay { |_r, _s| } + + assert_equal(0, count) + end + + def test_binary_format_structure + state = { rejection_rate: 0.25, integral: 1.5 } + @wal.write("mysql", state) + + File.open(@wal_path, "rb") do |f| + crc = f.read(4).unpack1("N") + key_size = f.read(2).unpack1("S>") + value_size = f.read(2).unpack1("S>") + timestamp = f.read(8).unpack1("Q>") + key = f.read(key_size) + value_bytes = f.read(value_size) + + assert_equal(5, key_size) + assert_operator(value_size, :>, 0) + assert_operator(timestamp, :>, 0) + assert_equal("mysql", key) + + value = MessagePack.unpack(value_bytes, symbolize_keys: true) + + assert_equal(0.25, value[:rejection_rate]) + assert_equal(1.5, value[:integral]) + + payload = [key_size, value_size, timestamp].pack("S>S>Q>") + key + value_bytes + + assert_equal(Digest::CRC32.checksum(payload), crc) + assert(f.eof?) + end + end + + def test_corrupted_entry_is_skipped + @wal.write("mysql", { rejection_rate: 0.25 }) + File.open(@wal_path, "r+b") { |f| f.write([0xDEADBEEF].pack("N")) } + + entries = [] + @wal.replay { |resource, state| entries << [resource, state] } + + assert_equal(0, entries.size) + end + + def test_empty_file_replay_returns_zero + count = @wal.replay { |_r, _s| } + + assert_equal(0, count) + end + + def test_replay_without_block_returns_count + @wal.write("mysql", { rejection_rate: 0.1 }) + @wal.write("redis", { rejection_rate: 0.2 }) + + count = @wal.replay + + assert_equal(2, count) + end + end + end +end From 81294168be273f5523f0d7a71d0c2ffd4d62e5eb Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel Date: Tue, 16 Dec 2025 15:38:02 -0500 Subject: [PATCH 03/10] PID state service --- lib/semian/pod_pid/state_service.rb | 132 ++++++++++++++++++++++++++++ test/pod_pid/state_service_test.rb | 112 +++++++++++++++++++++++ 2 files changed, 244 insertions(+) create mode 100644 lib/semian/pod_pid/state_service.rb create mode 100644 test/pod_pid/state_service_test.rb diff --git a/lib/semian/pod_pid/state_service.rb b/lib/semian/pod_pid/state_service.rb new file mode 100644 index 000000000..71627ca35 --- /dev/null +++ b/lib/semian/pod_pid/state_service.rb @@ -0,0 +1,132 @@ +# frozen_string_literal: true + +require_relative "../pid_controller" +require_relative "../simple_sliding_window" +require_relative "wal" + +module Semian + module PodPID + class StateService + attr_reader :resources, :config + + def initialize( + kp:, + ki:, + kd:, + window_size:, + sliding_interval:, + initial_error_rate:, + wal_path: WAL::DEFAULT_PATH, + on_rejection_rate_change: nil + ) + @config = { + kp: kp, + ki: ki, + kd: kd, + window_size: window_size, + sliding_interval: sliding_interval, + initial_error_rate: initial_error_rate, + } + @resources = {} + @mutex = Mutex.new + @wal = WAL.new(wal_path) + @on_rejection_rate_change = on_rejection_rate_change + @running = false + + restore_from_wal + end + + def record_observation(resource, outcome) + controller = ensure_resource(resource) + controller.record_request(outcome) + end + + def rejection_rate(resource) + controller = @resources[resource.to_s] + controller&.rejection_rate || 0.0 + end + + def start_update_loop + @running = true + @update_thread = Thread.new do + while @running + sleep(@config[:sliding_interval]) + update_all_resources + end + end + end + + def stop_update_loop + @running = false + @update_thread&.join(5) + @update_thread = nil + end + + def update_all_resources + @mutex.synchronize do + @resources.each do |name, controller| + old_rate = controller.rejection_rate + controller.update + new_rate = controller.rejection_rate + + if old_rate != new_rate + persist_state(name, controller) + notify_rejection_rate_change(name, new_rate) + end + end + end + end + + def metrics(resource) + controller = @resources[resource.to_s] + controller&.metrics + end + + private + + def ensure_resource(resource) + name = resource.to_s + @mutex.synchronize do + @resources[name] ||= create_controller + end + end + + def create_controller + Semian::ThreadSafe::PIDController.new( + kp: @config[:kp], + ki: @config[:ki], + kd: @config[:kd], + window_size: @config[:window_size], + sliding_interval: @config[:sliding_interval], + initial_error_rate: @config[:initial_error_rate], + implementation: Semian::ThreadSafe, + ) + end + + def restore_from_wal + count = @wal.replay do |resource, state| + controller = ensure_resource(resource) + apply_state(controller, state) + end + @wal.truncate if count > 0 + end + + def apply_state(controller, state) + controller.instance_variable_set(:@rejection_rate, state[:rejection_rate]) if state[:rejection_rate] + controller.instance_variable_set(:@integral, state[:integral]) if state[:integral] + end + + def persist_state(resource, controller) + state = { + rejection_rate: controller.rejection_rate, + integral: controller.instance_variable_get(:@integral), + } + @wal.write(resource, state) + end + + def notify_rejection_rate_change(resource, new_rate) + @on_rejection_rate_change&.call(resource, new_rate) + end + end + end +end diff --git a/test/pod_pid/state_service_test.rb b/test/pod_pid/state_service_test.rb new file mode 100644 index 000000000..1bbf6ce2d --- /dev/null +++ b/test/pod_pid/state_service_test.rb @@ -0,0 +1,112 @@ +# frozen_string_literal: true + +require "rubygems" +require "bundler/setup" +require "minitest/autorun" +require "tempfile" +require "forwardable" + +require_relative "../../lib/semian/pod_pid/state_service" + +module Semian + module PodPID + class StateServiceTest < Minitest::Test + def setup + @temp_file = Tempfile.new("semian_state_service_test") + @wal_path = @temp_file.path + @temp_file.close + File.delete(@wal_path) if File.exist?(@wal_path) + + @service = StateService.new( + kp: 1.0, + ki: 0.2, + kd: 0.0, + window_size: 10, + sliding_interval: 1, + initial_error_rate: 0.05, + wal_path: @wal_path, + ) + end + + def teardown + @service.stop_update_loop + File.delete(@wal_path) if File.exist?(@wal_path) + @temp_file&.unlink + end + + def test_record_observation_creates_resource + @service.record_observation("mysql", :success) + + assert(@service.resources.key?("mysql")) + end + + def test_record_observation_tracks_outcomes + 5.times { @service.record_observation("mysql", :success) } + 3.times { @service.record_observation("mysql", :error) } + 2.times { @service.record_observation("mysql", :rejected) } + + metrics = @service.metrics("mysql") + + assert_equal(5, metrics[:current_window_requests][:success]) + assert_equal(3, metrics[:current_window_requests][:error]) + assert_equal(2, metrics[:current_window_requests][:rejected]) + end + + def test_rejection_rate_returns_zero_for_unknown_resource + assert_equal(0.0, @service.rejection_rate("unknown")) + end + + def test_update_all_resources_computes_rejection_rate + 10.times { @service.record_observation("mysql", :error) } + @service.update_all_resources + + assert_operator(@service.rejection_rate("mysql"), :>, 0.0) + end + + def test_wal_persists_on_rejection_rate_change + 10.times { @service.record_observation("mysql", :error) } + @service.update_all_resources + + assert_operator(File.size(@wal_path), :>, 0) + end + + def test_restores_state_from_wal + 10.times { @service.record_observation("mysql", :error) } + @service.update_all_resources + old_rate = @service.rejection_rate("mysql") + + new_service = StateService.new( + kp: 1.0, + ki: 0.2, + kd: 0.0, + window_size: 10, + sliding_interval: 1, + initial_error_rate: 0.05, + wal_path: @wal_path, + ) + + assert_equal(old_rate, new_service.rejection_rate("mysql")) + end + + def test_on_rejection_rate_change_callback + notified = [] + service = StateService.new( + kp: 1.0, + ki: 0.2, + kd: 0.0, + window_size: 10, + sliding_interval: 1, + initial_error_rate: 0.05, + wal_path: @wal_path, + on_rejection_rate_change: ->(resource, rate) { notified << [resource, rate] }, + ) + + 10.times { service.record_observation("mysql", :error) } + service.update_all_resources + + assert_equal(1, notified.size) + assert_equal("mysql", notified[0][0]) + end + end + end +end From 2fbc697fd522c957086a7807ce4d401b32bef5ac Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel Date: Tue, 16 Dec 2025 16:56:14 -0500 Subject: [PATCH 04/10] implement state service, bus controller and methods for RPC --- lib/semian/pod_pid.rb | 11 +++++ lib/semian/pod_pid/client.rb | 60 +++++++++++++++++++++++++ lib/semian/pod_pid/controller.rb | 68 +++++++++++++++++++++++++++++ lib/semian/pod_pid/state_service.rb | 33 ++++++-------- lib/semian/pod_pid/wal.rb | 2 - test/pod_pid/client_test.rb | 56 ++++++++++++++++++++++++ test/pod_pid/state_service_test.rb | 24 ++++------ 7 files changed, 216 insertions(+), 38 deletions(-) create mode 100644 lib/semian/pod_pid.rb create mode 100644 lib/semian/pod_pid/client.rb create mode 100644 lib/semian/pod_pid/controller.rb create mode 100644 test/pod_pid/client_test.rb diff --git a/lib/semian/pod_pid.rb b/lib/semian/pod_pid.rb new file mode 100644 index 000000000..b01f5f1c4 --- /dev/null +++ b/lib/semian/pod_pid.rb @@ -0,0 +1,11 @@ +# frozen_string_literal: true + +require_relative "pod_pid/wal" +require_relative "pod_pid/state_service" +require_relative "pod_pid/controller" +require_relative "pod_pid/client" + +module Semian + module PodPID + end +end diff --git a/lib/semian/pod_pid/client.rb b/lib/semian/pod_pid/client.rb new file mode 100644 index 000000000..a8ecafed0 --- /dev/null +++ b/lib/semian/pod_pid/client.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +require "async" +require "async/bus" +require "async/bus/client" + +module Semian + module PodPID + class Client < Async::Bus::Client + attr_reader :rejection_rates + + def initialize(**options) + super(**options) + @rejection_rates = {} + @mutex = Mutex.new + @state_service = nil + end + + def should_reject?(resource) + rate = @rejection_rates[resource.to_s] || 0.0 + rand < rate + end + + def rejection_rate(resource) + @rejection_rates[resource.to_s] || 0.0 + end + + def update_rejection_rate(resource, rate) + @mutex.synchronize do + @rejection_rates[resource.to_s] = rate + end + end + + def record_observation(resource, outcome) + return false unless @state_service + + @state_service.record_observation(resource.to_s, outcome.to_s) + true + rescue StandardError + false + end + + def metrics(resource) + return unless @state_service + + @state_service.metrics(resource.to_s) + rescue StandardError + nil + end + + protected + + def connected!(connection) + @state_service = connection[:pid_controller] + connection.bind(:client, self) + @state_service.register_client(connection[:client]) + end + end + end +end diff --git a/lib/semian/pod_pid/controller.rb b/lib/semian/pod_pid/controller.rb new file mode 100644 index 000000000..333b9086d --- /dev/null +++ b/lib/semian/pod_pid/controller.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +require "async" +require "async/bus" +require_relative "state_service" + +module Semian + module PodPID + class Controller < Async::Bus::Controller + def initialize(state_service) + super() + @state_service = state_service + @clients = [] + @clients_mutex = Mutex.new + + @state_service.on_rejection_rate_change = ->(resource, rate) { + broadcast_rejection_rate(resource, rate) + } + end + + def record_observation(resource, outcome) + @state_service.record_observation(resource, outcome) + end + + def rejection_rate(resource) + @state_service.rejection_rate(resource) + end + + def metrics(resource) + @state_service.metrics(resource) + end + + def register_client(client_proxy) + @clients_mutex.synchronize { @clients << client_proxy } + end + + def unregister_client(client_proxy) + @clients_mutex.synchronize { @clients.delete(client_proxy) } + end + + class << self + def start(state_service) + controller = new(state_service) + + Async do |task| + server = Async::Bus::Server.new + task.async { state_service.run_update_loop } + server.accept do |connection| + connection.bind(:pid_controller, controller) + end + end + end + end + + private + + def broadcast_rejection_rate(resource, rate) + @clients_mutex.synchronize do + @clients.each do |client| + client.update_rejection_rate(resource, rate) + rescue StandardError + # Client disconnected + end + end + end + end + end +end diff --git a/lib/semian/pod_pid/state_service.rb b/lib/semian/pod_pid/state_service.rb index 71627ca35..ec05adf9a 100644 --- a/lib/semian/pod_pid/state_service.rb +++ b/lib/semian/pod_pid/state_service.rb @@ -8,6 +8,7 @@ module Semian module PodPID class StateService attr_reader :resources, :config + attr_accessor :on_rejection_rate_change def initialize( kp:, @@ -38,7 +39,7 @@ def initialize( def record_observation(resource, outcome) controller = ensure_resource(resource) - controller.record_request(outcome) + controller.record_request(outcome.to_sym) end def rejection_rate(resource) @@ -46,20 +47,21 @@ def rejection_rate(resource) controller&.rejection_rate || 0.0 end - def start_update_loop + def metrics(resource) + controller = @resources[resource.to_s] + controller&.metrics + end + + def run_update_loop @running = true - @update_thread = Thread.new do - while @running - sleep(@config[:sliding_interval]) - update_all_resources - end + while @running + sleep(@config[:sliding_interval]) + update_all_resources end end - def stop_update_loop + def stop @running = false - @update_thread&.join(5) - @update_thread = nil end def update_all_resources @@ -71,17 +73,12 @@ def update_all_resources if old_rate != new_rate persist_state(name, controller) - notify_rejection_rate_change(name, new_rate) + @on_rejection_rate_change&.call(name, new_rate) end end end end - def metrics(resource) - controller = @resources[resource.to_s] - controller&.metrics - end - private def ensure_resource(resource) @@ -123,10 +120,6 @@ def persist_state(resource, controller) } @wal.write(resource, state) end - - def notify_rejection_rate_change(resource, new_rate) - @on_rejection_rate_change&.call(resource, new_rate) - end end end end diff --git a/lib/semian/pod_pid/wal.rb b/lib/semian/pod_pid/wal.rb index 6ec313c75..b24af6716 100644 --- a/lib/semian/pod_pid/wal.rb +++ b/lib/semian/pod_pid/wal.rb @@ -42,8 +42,6 @@ module PodPID class WAL DEFAULT_PATH = "/tmp/semian_pid.wal" - HEADER_SIZE = 16 - attr_reader :path def initialize(path = DEFAULT_PATH) diff --git a/test/pod_pid/client_test.rb b/test/pod_pid/client_test.rb new file mode 100644 index 000000000..403c39037 --- /dev/null +++ b/test/pod_pid/client_test.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +require "rubygems" +require "bundler/setup" +require "minitest/autorun" +require "forwardable" +require "async" +require "async/bus" + +require_relative "../../lib/semian/pod_pid" + +module Semian + module PodPID + class ClientTest < Minitest::Test + def setup + @client = Client.new + end + + def test_should_reject_returns_false_when_rate_is_zero + refute(@client.should_reject?("mysql")) + end + + def test_should_reject_respects_cached_rate + @client.update_rejection_rate("mysql", 1.0) + + assert(@client.should_reject?("mysql")) + end + + def test_rejection_rate_returns_zero_for_unknown_resource + assert_equal(0.0, @client.rejection_rate("unknown")) + end + + def test_update_rejection_rate_updates_cache + @client.update_rejection_rate("mysql", 0.5) + + assert_equal(0.5, @client.rejection_rate("mysql")) + end + + def test_record_observation_returns_false_when_not_connected + refute(@client.record_observation("mysql", :success)) + end + + def test_multiple_resources_have_independent_rates + @client.update_rejection_rate("mysql", 0.3) + @client.update_rejection_rate("redis", 0.7) + + assert_equal(0.3, @client.rejection_rate("mysql")) + assert_equal(0.7, @client.rejection_rate("redis")) + end + + def test_metrics_returns_nil_when_not_connected + assert_nil(@client.metrics("mysql")) + end + end + end +end diff --git a/test/pod_pid/state_service_test.rb b/test/pod_pid/state_service_test.rb index 1bbf6ce2d..a792c5552 100644 --- a/test/pod_pid/state_service_test.rb +++ b/test/pod_pid/state_service_test.rb @@ -29,7 +29,7 @@ def setup end def teardown - @service.stop_update_loop + @service.stop File.delete(@wal_path) if File.exist?(@wal_path) @temp_file&.unlink end @@ -89,23 +89,15 @@ def test_restores_state_from_wal end def test_on_rejection_rate_change_callback - notified = [] - service = StateService.new( - kp: 1.0, - ki: 0.2, - kd: 0.0, - window_size: 10, - sliding_interval: 1, - initial_error_rate: 0.05, - wal_path: @wal_path, - on_rejection_rate_change: ->(resource, rate) { notified << [resource, rate] }, - ) + updates = [] + @service.on_rejection_rate_change = ->(resource, rate) { updates << [resource, rate] } - 10.times { service.record_observation("mysql", :error) } - service.update_all_resources + 10.times { @service.record_observation("mysql", :error) } + @service.update_all_resources - assert_equal(1, notified.size) - assert_equal("mysql", notified[0][0]) + assert_equal(1, updates.size) + assert_equal("mysql", updates[0][0]) + assert_operator(updates[0][1], :>, 0.0) end end end From 1d9f182913af6490a78dcf221c2027587d0bb222 Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel Date: Wed, 17 Dec 2025 10:52:15 -0500 Subject: [PATCH 05/10] default config for PID --- lib/semian.rb | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/lib/semian.rb b/lib/semian.rb index d350fcca1..10b1b79a9 100644 --- a/lib/semian.rb +++ b/lib/semian.rb @@ -113,6 +113,15 @@ module Semian self.default_permissions = 0660 self.default_force_config_validation = false + DEFAULT_PID_CONFIG = { + kp: 1.0, + ki: 0.2, + kd: 0.0, + window_size: 10, + sliding_interval: 1, + initial_error_rate: 0.05, + }.freeze + # We only allow disabling thread-safety for parts of the code that are on the hot path. # Since locking there could have a significant impact. Everything else is enforced thread safety def thread_safe? @@ -343,13 +352,12 @@ def create_adaptive_circuit_breaker(name, is_child: false, **options) cls = is_child ? DualCircuitBreaker::ChildAdaptiveCircuitBreaker : AdaptiveCircuitBreaker cls.new( name: name, - exceptions: Array(exceptions) + [::Semian::BaseError], - kp: 1.0, - ki: 0.2, - kd: 0.0, - window_size: 10, - sliding_interval: 1, - initial_error_rate: options[:initial_error_rate] || 0.05, + kp: DEFAULT_PID_CONFIG[:kp], + ki: DEFAULT_PID_CONFIG[:ki], + kd: DEFAULT_PID_CONFIG[:kd], + window_size: DEFAULT_PID_CONFIG[:window_size], + sliding_interval: DEFAULT_PID_CONFIG[:sliding_interval], + initial_error_rate: options[:initial_error_rate] || DEFAULT_PID_CONFIG[:initial_error_rate], implementation: implementation(**options), ) end From 54d08251f7d4f7c3d553dc0e400d35eabc6472d7 Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel Date: Wed, 17 Dec 2025 10:57:33 -0500 Subject: [PATCH 06/10] implement services for PID state --- bin/semian-pid-service | 17 +++++++++++++++++ lib/semian/pod_pid/service.rb | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100755 bin/semian-pid-service create mode 100644 lib/semian/pod_pid/service.rb diff --git a/bin/semian-pid-service b/bin/semian-pid-service new file mode 100755 index 000000000..043ff7c17 --- /dev/null +++ b/bin/semian-pid-service @@ -0,0 +1,17 @@ +#!/usr/bin/env async-service +# frozen_string_literal: true + +require "bundler/setup" +require "async/container/supervisor" +require "semian" +require "semian/pod_pid" +require "semian/pod_pid/service" + +service "pid-state-service" do + service_class Semian::PodPID::Service + include Async::Container::Supervisor::Supervised +end + +service "supervisor" do + include Async::Container::Supervisor::Environment +end diff --git a/lib/semian/pod_pid/service.rb b/lib/semian/pod_pid/service.rb new file mode 100644 index 000000000..da543d8d7 --- /dev/null +++ b/lib/semian/pod_pid/service.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +require "async/service/generic" +require "async/container/supervisor" +require_relative "state_service" +require_relative "controller" + +module Semian + module PodPID + class Service < Async::Service::Generic + def setup(container) + super + + container.run(name: self.class.name, count: 1, restart: true) do |instance| + Async do + if @environment.implements?(Async::Container::Supervisor::Supervised) + @evaluator.make_supervised_worker(instance).run + end + + instance.ready! + + config = Semian::DEFAULT_PID_CONFIG.merge( + wal_path: ENV.fetch("SEMIAN_PID_WAL_PATH", WAL::DEFAULT_PATH), + ) + + state_service = StateService.new(**config) + Controller.start(state_service) + end + end + end + end + end +end + From 932fe65cdef7046934e2981a39e931406aecf180 Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel Date: Wed, 17 Dec 2025 11:06:43 -0500 Subject: [PATCH 07/10] integrate pod ACB into Semian --- lib/semian.rb | 14 +++++++ lib/semian/pod_adaptive_circuit_breaker.rb | 49 ++++++++++++++++++++++ 2 files changed, 63 insertions(+) create mode 100644 lib/semian/pod_adaptive_circuit_breaker.rb diff --git a/lib/semian.rb b/lib/semian.rb index 10b1b79a9..8ce569b76 100644 --- a/lib/semian.rb +++ b/lib/semian.rb @@ -12,6 +12,7 @@ require "semian/resource" require "semian/circuit_breaker" require "semian/adaptive_circuit_breaker" +require "semian/pod_adaptive_circuit_breaker" require "semian/dual_circuit_breaker" require "semian/protected_resource" require "semian/unprotected_resource" @@ -231,6 +232,8 @@ def register(name, **options) circuit_breaker = if options[:dual_circuit_breaker] create_dual_circuit_breaker(name, **options) + elsif options[:pod_adaptive_circuit_breaker] + create_pod_adaptive_circuit_breaker(name, **options) elsif options[:adaptive_circuit_breaker] create_adaptive_circuit_breaker(name, **options) else @@ -362,6 +365,17 @@ def create_adaptive_circuit_breaker(name, is_child: false, **options) ) end + def create_pod_adaptive_circuit_breaker(name, **options) + return if ENV.key?("SEMIAN_CIRCUIT_BREAKER_DISABLED") || ENV.key?("SEMIAN_POD_ADAPTIVE_CIRCUIT_BREAKER_DISABLED") + + exceptions = options[:exceptions] || [] + PodAdaptiveCircuitBreaker.new( + name: name, + exceptions: Array(exceptions) + [::Semian::BaseError], + client: options[:pod_pid_client], + ) + end + def create_circuit_breaker(name, is_child: false, **options) return if ENV.key?("SEMIAN_CIRCUIT_BREAKER_DISABLED") return unless options.fetch(:circuit_breaker, true) diff --git a/lib/semian/pod_adaptive_circuit_breaker.rb b/lib/semian/pod_adaptive_circuit_breaker.rb new file mode 100644 index 000000000..fa2bed415 --- /dev/null +++ b/lib/semian/pod_adaptive_circuit_breaker.rb @@ -0,0 +1,49 @@ +# frozen_string_literal: true + +require_relative "adaptive_circuit_breaker" +require_relative "pod_pid/client" + +module Semian + class PodAdaptiveCircuitBreaker < AdaptiveCircuitBreaker + attr_reader :client + + def initialize(name:, exceptions: [], client: nil) + initialize_behaviour(name: name) + @exceptions = exceptions + @client = client || PodPID::Client.new + @stopped = false + @pid_controller = ClientAdapter.new(@name, @client) + end + + def destroy; end # No-op to satisfy AdaptiveCircuitBreaker + + private + + def start_pid_controller_update_thread; end + + class ClientAdapter + def initialize(resource_name, client) + @resource_name = resource_name + @client = client + end + + def record_request(outcome) + @client.record_observation(@resource_name, outcome) + end + + def should_reject? + @client.should_reject?(@resource_name) + end + + def rejection_rate + @client.rejection_rate(@resource_name) + end + + def metrics + @client.metrics(@resource_name) + end + + def reset; end # No-op to satisfy interface + end + end +end From f2bb2a208fdd7046611ffe1c6127210fef329c69 Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel Date: Wed, 17 Dec 2025 12:30:12 -0500 Subject: [PATCH 08/10] implement example --- examples/pod_adaptive_circuit_breaker_demo.rb | 166 ++++++++++++++++++ lib/semian/pod_pid/client.rb | 10 +- lib/semian/pod_pid/controller.rb | 7 +- 3 files changed, 181 insertions(+), 2 deletions(-) create mode 100755 examples/pod_adaptive_circuit_breaker_demo.rb diff --git a/examples/pod_adaptive_circuit_breaker_demo.rb b/examples/pod_adaptive_circuit_breaker_demo.rb new file mode 100755 index 000000000..30d39d934 --- /dev/null +++ b/examples/pod_adaptive_circuit_breaker_demo.rb @@ -0,0 +1,166 @@ +#!/usr/bin/env ruby +# frozen_string_literal: true + +require "bundler/setup" +require "semian" +require "semian/pod_pid" +require "async" +require "async/bus/server" + +NUM_WORKERS = 4 +PHASES = [ + { name: "Phase 1: Healthy Service", count: 100, success_rate: 1.0 }, + { name: "Phase 2: Degraded Service", count: 200, success_rate: 0.5 }, + { name: "Phase 3: Failing Service", count: 200, success_rate: 0.0 }, + { name: "Phase 4: Service Recovery", count: 300, success_rate: 1.0 }, +].freeze + +def run_worker(worker_id, phase_pipe_read, result_pipe_write) + Sync do |task| + client = Semian::PodPID::Client.new + client.connect + + result_pipe_write.puts("READY") + + while (line = phase_pipe_read.gets) + phase = line.strip.split(",") + break if phase[0] == "DONE" + + count = phase[1].to_i + success_rate = phase[2].to_f + + stats = { success: 0, error: 0, rejected: 0 } + + count.times do |i| + task.sleep(0.001) if (i % 10).zero? + + if client.should_reject?("mysql") + client.record_observation("mysql", :rejected) + stats[:rejected] += 1 + elsif rand < success_rate + client.record_observation("mysql", :success) + stats[:success] += 1 + else + client.record_observation("mysql", :error) + stats[:error] += 1 + end + end + + task.sleep(0.1) + result_pipe_write.puts("#{stats[:success]},#{stats[:error]},#{stats[:rejected]},#{client.rejection_rate('mysql')}") + end + + client.disconnect + end + exit!(0) +end + +puts "=== Pod Adaptive Circuit Breaker Demo (Multi-Process) ===\n\n" + +server_ready_read, server_ready_write = IO.pipe + +server_pid = fork do + server_ready_read.close + + Sync do |task| + state_service = Semian::PodPID::StateService.new( + kp: Semian::DEFAULT_PID_CONFIG[:kp], + ki: Semian::DEFAULT_PID_CONFIG[:ki], + kd: Semian::DEFAULT_PID_CONFIG[:kd], + window_size: Semian::DEFAULT_PID_CONFIG[:window_size], + sliding_interval: Semian::DEFAULT_PID_CONFIG[:sliding_interval], + initial_error_rate: Semian::DEFAULT_PID_CONFIG[:initial_error_rate], + ) + + controller = Semian::PodPID::Controller.new(state_service) + server = Async::Bus::Server.new + + task.async do + server.accept do |connection| + connection.bind(:pid_controller, controller) + end + end + + task.async do + loop do + task.sleep(0.05) + state_service.update_all_resources + end + end + + server_ready_write.puts("READY") + server_ready_write.close + + sleep + end +end + +server_ready_write.close +server_ready_read.gets +puts "PID state service started (PID: #{server_pid})\n\n" + +workers = [] +phase_pipes = [] +result_pipes = [] + +NUM_WORKERS.times do |i| + phase_read, phase_write = IO.pipe + result_read, result_write = IO.pipe + + pid = fork do + phase_write.close + result_read.close + run_worker(i + 1, phase_read, result_write) + end + + phase_read.close + result_write.close + + workers << pid + phase_pipes << phase_write + result_pipes << result_read +end + +puts "Waiting for #{NUM_WORKERS} worker processes to connect..." +result_pipes.each { |pipe| pipe.gets } +puts "All workers connected!\n\n" + +PHASES.each do |phase| + puts "#{phase[:name]} (#{phase[:count]} requests per worker)" + puts "-" * 50 + + phase_pipes.each do |pipe| + pipe.puts("PHASE,#{phase[:count]},#{phase[:success_rate]}") + end + + total = { success: 0, error: 0, rejected: 0 } + rates = [] + + result_pipes.each_with_index do |pipe, i| + result = pipe.gets.strip.split(",") + total[:success] += result[0].to_i + total[:error] += result[1].to_i + total[:rejected] += result[2].to_i + rates << result[3].to_f + puts " Worker #{i + 1} (PID #{workers[i]}): #{result[0]} success, #{result[1]} errors, #{result[2]} rejected, rate=#{(result[3].to_f * 100).round(2)}%" + end + + puts " Total: #{total[:success]} success, #{total[:error]} errors, #{total[:rejected]} rejected" + puts " All workers synchronized: #{rates.uniq.size == 1}\n\n" +end + +puts "=== Demo Complete ===\n" + +phase_pipes.each do |pipe| + pipe.puts("DONE") + pipe.close +end +result_pipes.each(&:close) + +sleep(0.5) + +(workers + [server_pid]).each do |pid| + Process.kill("KILL", pid) rescue nil +end + +Process.waitall diff --git a/lib/semian/pod_pid/client.rb b/lib/semian/pod_pid/client.rb index a8ecafed0..3ae45b763 100644 --- a/lib/semian/pod_pid/client.rb +++ b/lib/semian/pod_pid/client.rb @@ -48,12 +48,20 @@ def metrics(resource) nil end + def disconnect + @state_service&.unregister_client(@client_proxy) + close + rescue StandardError + # Ignore errors during disconnect + end + protected def connected!(connection) @state_service = connection[:pid_controller] + @client_proxy = connection[:client] connection.bind(:client, self) - @state_service.register_client(connection[:client]) + @state_service.register_client(@client_proxy) end end end diff --git a/lib/semian/pod_pid/controller.rb b/lib/semian/pod_pid/controller.rb index 333b9086d..3c0e4d774 100644 --- a/lib/semian/pod_pid/controller.rb +++ b/lib/semian/pod_pid/controller.rb @@ -2,6 +2,7 @@ require "async" require "async/bus" +require "async/bus/server" require_relative "state_service" module Semian @@ -55,12 +56,16 @@ def start(state_service) private def broadcast_rejection_rate(resource, rate) + dead_clients = [] + @clients_mutex.synchronize do @clients.each do |client| client.update_rejection_rate(resource, rate) rescue StandardError - # Client disconnected + dead_clients << client end + + @clients -= dead_clients end end end From 47b832cbe1e8dc508d47e95a601efc731cd515fe Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel Date: Wed, 17 Dec 2025 12:33:39 -0500 Subject: [PATCH 09/10] replace comment with nil --- lib/semian/pod_pid/client.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/semian/pod_pid/client.rb b/lib/semian/pod_pid/client.rb index 3ae45b763..de009793f 100644 --- a/lib/semian/pod_pid/client.rb +++ b/lib/semian/pod_pid/client.rb @@ -52,7 +52,7 @@ def disconnect @state_service&.unregister_client(@client_proxy) close rescue StandardError - # Ignore errors during disconnect + nil end protected From 5fe34f7ff5f230cd04a802628bb494d77867948c Mon Sep 17 00:00:00 2001 From: Kristofer Gaudel Date: Wed, 17 Dec 2025 12:49:10 -0500 Subject: [PATCH 10/10] only load `PodAdaptiveCircuitBreaker` when used, minor linting --- examples/net_http/08_dual_circuit_breaker.rb | 2 +- examples/pod_adaptive_circuit_breaker_demo.rb | 8 +++++--- lib/semian.rb | 4 +++- lib/semian/pod_adaptive_circuit_breaker.rb | 2 +- lib/semian/pod_pid/service.rb | 1 - 5 files changed, 10 insertions(+), 7 deletions(-) diff --git a/examples/net_http/08_dual_circuit_breaker.rb b/examples/net_http/08_dual_circuit_breaker.rb index 8e8c970bc..7c639de21 100644 --- a/examples/net_http/08_dual_circuit_breaker.rb +++ b/examples/net_http/08_dual_circuit_breaker.rb @@ -66,7 +66,7 @@ def print_semian_state Semian::DualCircuitBreaker.adaptive_circuit_breaker_selector(->(_resource) { ExperimentFlags.use_adaptive_circuit_breaker? }) # Configure Semian with dual circuit breaker -Semian::NetHTTP.semian_configuration = proc do |host, port| +Semian::NetHTTP.semian_configuration = proc do |host, _port| # Example: only enable for specific host if host == "shopify-debug.com" { diff --git a/examples/pod_adaptive_circuit_breaker_demo.rb b/examples/pod_adaptive_circuit_breaker_demo.rb index 30d39d934..9ad6d1337 100755 --- a/examples/pod_adaptive_circuit_breaker_demo.rb +++ b/examples/pod_adaptive_circuit_breaker_demo.rb @@ -47,7 +47,7 @@ def run_worker(worker_id, phase_pipe_read, result_pipe_write) end task.sleep(0.1) - result_pipe_write.puts("#{stats[:success]},#{stats[:error]},#{stats[:rejected]},#{client.rejection_rate('mysql')}") + result_pipe_write.puts("#{stats[:success]},#{stats[:error]},#{stats[:rejected]},#{client.rejection_rate("mysql")}") end client.disconnect @@ -122,7 +122,7 @@ def run_worker(worker_id, phase_pipe_read, result_pipe_write) end puts "Waiting for #{NUM_WORKERS} worker processes to connect..." -result_pipes.each { |pipe| pipe.gets } +result_pipes.each(&:gets) puts "All workers connected!\n\n" PHASES.each do |phase| @@ -160,7 +160,9 @@ def run_worker(worker_id, phase_pipe_read, result_pipe_write) sleep(0.5) (workers + [server_pid]).each do |pid| - Process.kill("KILL", pid) rescue nil + Process.kill("KILL", pid) +rescue + nil end Process.waitall diff --git a/lib/semian.rb b/lib/semian.rb index 8ce569b76..54f033330 100644 --- a/lib/semian.rb +++ b/lib/semian.rb @@ -12,7 +12,6 @@ require "semian/resource" require "semian/circuit_breaker" require "semian/adaptive_circuit_breaker" -require "semian/pod_adaptive_circuit_breaker" require "semian/dual_circuit_breaker" require "semian/protected_resource" require "semian/unprotected_resource" @@ -100,6 +99,8 @@ module Semian extend self extend Instrumentable + autoload :PodAdaptiveCircuitBreaker, "semian/pod_adaptive_circuit_breaker" + BaseError = Class.new(StandardError) SyscallError = Class.new(BaseError) TimeoutError = Class.new(BaseError) @@ -355,6 +356,7 @@ def create_adaptive_circuit_breaker(name, is_child: false, **options) cls = is_child ? DualCircuitBreaker::ChildAdaptiveCircuitBreaker : AdaptiveCircuitBreaker cls.new( name: name, + exceptions: Array(exceptions) + [::Semian::BaseError], kp: DEFAULT_PID_CONFIG[:kp], ki: DEFAULT_PID_CONFIG[:ki], kd: DEFAULT_PID_CONFIG[:kd], diff --git a/lib/semian/pod_adaptive_circuit_breaker.rb b/lib/semian/pod_adaptive_circuit_breaker.rb index fa2bed415..6f58e499f 100644 --- a/lib/semian/pod_adaptive_circuit_breaker.rb +++ b/lib/semian/pod_adaptive_circuit_breaker.rb @@ -7,7 +7,7 @@ module Semian class PodAdaptiveCircuitBreaker < AdaptiveCircuitBreaker attr_reader :client - def initialize(name:, exceptions: [], client: nil) + def initialize(name:, exceptions: [], client: nil) # rubocop:disable Lint/MissingSuper initialize_behaviour(name: name) @exceptions = exceptions @client = client || PodPID::Client.new diff --git a/lib/semian/pod_pid/service.rb b/lib/semian/pod_pid/service.rb index da543d8d7..f954f9b07 100644 --- a/lib/semian/pod_pid/service.rb +++ b/lib/semian/pod_pid/service.rb @@ -31,4 +31,3 @@ def setup(container) end end end -