diff --git a/Appraisals b/Appraisals index 777037e1..96f20de8 100644 --- a/Appraisals +++ b/Appraisals @@ -1,7 +1,3 @@ -appraise "active_record_7_0" do - gem "activerecord", "~> 7.0.0" -end - appraise "active_record_7_1" do gem "activerecord", "~> 7.1.0" end diff --git a/CHANGELOG.md b/CHANGELOG.md index 78471da8..d56c6995 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Added - [OpenAPI] Add support for blueprint inheritance. Child blueprints now inherit fields from parent blueprints. +- Connection pool improvements (#301). ### Fixed diff --git a/Rakefile b/Rakefile index 55635bc5..a8bb6628 100644 --- a/Rakefile +++ b/Rakefile @@ -17,6 +17,9 @@ task :appraise do |_, args| puts ">> Appraising #{ext_version}" gem_name = ext_version.sub(/_\d+(_\d+)*$/, "") - system "bundle exec appraisal #{ext_version} rspec spec/ext/#{gem_name}/" + + Pathname.new("spec/ext").join(gem_name).glob("*_spec.rb").each do |spec| + system("bundle exec appraisal #{ext_version} rspec #{spec}", exception: true) + end end end diff --git a/lib/rage/ext/active_record/connection_pool.rb b/lib/rage/ext/active_record/connection_pool.rb index 225cd691..5206fad8 100644 --- a/lib/rage/ext/active_record/connection_pool.rb +++ b/lib/rage/ext/active_record/connection_pool.rb @@ -11,7 +11,11 @@ def <<(el) @arr << el end - def shift + def unshift(el) + @arr.unshift(el) + end + + def pop nil end @@ -59,18 +63,27 @@ def __init_rage_extension # in the format of { Fiber => Connection } @__in_use = {} - # a list of all DB connections that are currently idle - @__connections = build_new_connections - # how long a fiber can wait for a connection to become available @__checkout_timeout = checkout_timeout # how long a connection can be idle for before disconnecting @__idle_timeout = respond_to?(:db_config) ? db_config.idle_timeout : @idle_timeout + # pool will maintain at least this many connections + @__min_connections = respond_to?(:min_connections) ? min_connections : 0 + + # connections older than this are automatically disconnected + @__max_age = respond_to?(:max_age) ? max_age : Float::INFINITY + + # seconds between keepalive pings on idle connections + @__keepalive = respond_to?(:keepalive) ? keepalive : nil + # how often should we check for fibers that wait for a connection for too long @__timeout_worker_frequency = 0.5 + # a list of all DB connections that are currently idle + @__connections = build_new_connections + # reject fibers that wait for a connection for more than `@__checkout_timeout` Iodine.run_every((@__timeout_worker_frequency * 1_000).to_i) do if @__blocked.length > 0 @@ -105,6 +118,8 @@ def __init_rage_extension end @release_connection_channel = "ext:ar-connection-released:#{object_id}" + @__owner_thread = Thread.current + @__background_maintenance_disabled = (ActiveRecord.version < Gem::Version.create("8.1")) # resume blocked fibers once connections become available Iodine.subscribe(@release_connection_channel) do @@ -118,6 +133,14 @@ def __init_rage_extension Iodine.on_state(:on_finish) do Iodine.unsubscribe(@release_connection_channel) end + + Iodine.run_every((db_config.reaping_frequency * 1_000).to_i) do + reap + flush + disconnected = retire_old_connections + keep_alive + preconnect if disconnected + end end # Returns true if there is an open connection being used for the current fiber. @@ -127,12 +150,10 @@ def active_connection? # Retrieve the connection associated with the current fiber, or obtain one if necessary. def connection - @__in_use[Fiber.current] ||= @__connections.shift || begin - fiber, blocked_since = Fiber.current, Process.clock_gettime(Process::CLOCK_MONOTONIC) - @__blocked[fiber] = blocked_since + @__in_use[Fiber.current] ||= @__connections.pop || begin + @__blocked[Fiber.current] = Process.clock_gettime(Process::CLOCK_MONOTONIC) Fiber.yield - - @__connections.shift + @__connections.pop end end @@ -149,45 +170,125 @@ def release_connection(owner = Fiber.current) # Recover lost connections for the pool. def reap - crashed_fibers = nil + return unless Thread.current == @__owner_thread - @__in_use.each do |fiber, conn| + dead = nil + + @__in_use.delete_if do |fiber, conn| unless fiber.alive? + (dead ||= []) << [fiber, conn] + true + end + end + + dead&.each do |fiber, conn| + Fiber.schedule do if conn.active? conn.reset! - (crashed_fibers ||= []) << fiber + @__in_use[fiber] = conn + release_connection(fiber) else - @__in_use.delete(fiber) conn.disconnect! __remove__(conn) self.automatic_reconnect = true - @__connections += build_new_connections(1) + @__connections.unshift(*build_new_connections(1)) Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 end end end - - if crashed_fibers - crashed_fibers.each { |fiber| release_connection(fiber) } - end end # Disconnect all connections that have been idle for at least # `minimum_idle` seconds. Connections currently checked out, or that were # checked in less than `minimum_idle` seconds ago, are unaffected. def flush(minimum_idle = @__idle_timeout) - return if minimum_idle.nil? || @__connections.length == 0 + return if Thread.current != @__owner_thread || minimum_idle.nil? || @__connections.length == 0 - current_time, i = Process.clock_gettime(Process::CLOCK_MONOTONIC), 0 - while i < @__connections.length + current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + i = @__connections.length - 1 + connections_to_preserve = @__min_connections + + while i >= 0 conn = @__connections[i] + + if connections_to_preserve > 0 && conn.connected? && conn.verified? + connections_to_preserve -= 1 + i -= 1 + next + end + if conn.__idle_since && current_time - conn.__idle_since >= minimum_idle conn.__idle_since = nil conn.__needs_reconnect = true conn.disconnect! end + i -= 1 + end + end + + # Disconnect connections exceeding `max_age` + def retire_old_connections(max_age = @__max_age) + return false if max_age.nil? || max_age.infinite? + + i, disconnected = 0, false + + while i < @__connections.length + conn = @__connections[i] + if (conn.connection_age || 0) >= conn.pool_jitter(max_age) + conn.disconnect! + disconnected = true + end i += 1 end + + disconnected + end + + # Ping idle connections to prevent firewall/server timeouts + def keep_alive(threshold = @__keepalive) + return if threshold.nil? || @__connections.length == 0 + + to_ping = nil + + @__connections.delete_if do |conn| + if (conn.seconds_since_last_activity || 0) >= conn.pool_jitter(threshold) + (to_ping ||= []) << conn + true + end + end + + to_ping&.each do |conn| + Fiber.schedule do + if conn.active? + @__connections << conn + else + conn.disconnect! + @__connections.unshift(conn) + end + Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 + end + end + end + + # Proactively establish DB connections + def preconnect + return if @__connections.length == 0 || @__background_maintenance_disabled + + active_connections_count = @__in_use.length + @__connections.count { |conn| conn.connected? && conn.verified? } + + while @__min_connections - active_connections_count > 0 + i = @__connections.rindex { |conn| !conn.connected? || !conn.verified? } + return unless i + + Fiber.schedule do + conn = @__connections.delete_at(i) + conn.connect! rescue nil + @__connections << conn + Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0 + end + + active_connections_count += 1 + end end # Disconnect all currently idle connections. Connections currently checked out are unaffected. @@ -234,6 +335,8 @@ def stat # Raises `ActiveRecord::ExclusiveConnectionTimeoutError` if unable to gain ownership of all # connections in the pool within a timeout interval (default duration is `checkout_timeout * 2` seconds). def disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0) + return if @__connections.is_a?(BlackHoleList) + # allow request fibers to release connections, but block from acquiring new ones if disconnect_attempts == 0 @__connections = BlackHoleList.new(@__connections) @@ -288,6 +391,10 @@ def lease_connection connection end + def maintainable? + false + end + # Check in a database connection back into the pool, indicating that you no longer need this connection. def checkin(conn) fiber = @__in_use.key(conn) @@ -328,8 +435,12 @@ def discarded? private def build_new_connections(num_connections = size) - (1..num_connections).map do + connections = (1..num_connections).map do __checkout__.tap { |conn| conn.__idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC) } end + + Iodine.defer { preconnect } + + connections end end diff --git a/lib/rage/templates/db-templates/mysql/config-database.yml b/lib/rage/templates/db-templates/mysql/config-database.yml index f9626e5f..5502e48d 100644 --- a/lib/rage/templates/db-templates/mysql/config-database.yml +++ b/lib/rage/templates/db-templates/mysql/config-database.yml @@ -4,7 +4,7 @@ default: &default adapter: mysql2 encoding: utf8mb4 - pool: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %> + max_connections: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %> username: root password: socket: /tmp/mysql.sock diff --git a/lib/rage/templates/db-templates/postgresql/config-database.yml b/lib/rage/templates/db-templates/postgresql/config-database.yml index ddf69450..32c55081 100644 --- a/lib/rage/templates/db-templates/postgresql/config-database.yml +++ b/lib/rage/templates/db-templates/postgresql/config-database.yml @@ -4,7 +4,7 @@ default: &default adapter: postgresql encoding: unicode - pool: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %> + max_connections: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %> development: <<: *default diff --git a/lib/rage/templates/db-templates/sqlite3/config-database.yml b/lib/rage/templates/db-templates/sqlite3/config-database.yml index e52124c4..6a597515 100644 --- a/lib/rage/templates/db-templates/sqlite3/config-database.yml +++ b/lib/rage/templates/db-templates/sqlite3/config-database.yml @@ -3,7 +3,7 @@ # default: &default adapter: sqlite3 - pool: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %> + max_connections: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %> timeout: 5000 development: diff --git a/lib/rage/templates/db-templates/trilogy/config-database.yml b/lib/rage/templates/db-templates/trilogy/config-database.yml index d4bba43f..b62c5410 100644 --- a/lib/rage/templates/db-templates/trilogy/config-database.yml +++ b/lib/rage/templates/db-templates/trilogy/config-database.yml @@ -4,7 +4,7 @@ default: &default adapter: trilogy encoding: utf8mb4 - pool: <%%= ENV.fetch("DB_MAX_THREADS") { 5 } %> + max_connections: <%%= ENV.fetch("DB_MAX_THREADS") { 5 } %> username: root password: socket: /tmp/mysql.sock diff --git a/spec/ext/active_record/connection_pool_spec.rb b/spec/ext/active_record/connection_pool_spec.rb index c311a7b8..1366af1d 100644 --- a/spec/ext/active_record/connection_pool_spec.rb +++ b/spec/ext/active_record/connection_pool_spec.rb @@ -9,19 +9,36 @@ skip("skipping external tests") unless ENV["ENABLE_EXTERNAL_TESTS"] == "true" Fiber.set_scheduler(Rage::FiberScheduler.new) - ActiveRecord::Base.establish_connection(url: (ENV["TEST_PG_URL"]).to_s) - ActiveRecord::Base.connection_pool.extend(Rage::Ext::ActiveRecord::ConnectionPool) - end - after :all do - Fiber.set_scheduler(nil) + # await internal fibers scheduled by the pool + interceptor = Module.new do + private def __intercepted_fibers + @intercepted_fibers ||= [] + end + + def __await_internal_fibers + __intercepted_fibers.clear + yield + Fiber.await(__intercepted_fibers) + end + + def schedule + super.tap { |f| __intercepted_fibers << f } + end + end + + Fiber.singleton_class.prepend(interceptor) end around do |example| + ActiveRecord::Base.establish_connection(db_config) + ActiveRecord::Base.connection_pool.extend(Rage::Ext::ActiveRecord::ConnectionPool) + # we need to init the extension before every test to refresh the subscriptions subject.__init_rage_extension within_reactor do + ensure_preconnected.call example.call -> {} end @@ -33,6 +50,25 @@ end end + let(:pool_size) { 5 } + let(:pool_size_config) do + ActiveRecord.version < Gem::Version.create("8.1") ? { pool: pool_size } : { max_connections: pool_size } + end + let(:db_config) { { url: (ENV["TEST_PG_URL"]).to_s, **pool_size_config } } + let(:ensure_preconnected) do + -> do + 20.downto(0) do |i| + if subject.connections.length != pool_size + sleep 0.1 + elsif i == 0 + raise "Could not connect to the DB" + else + break + end + end + end + end + describe "#with_connection" do it "checks out a connection" do subject.with_connection do |conn| @@ -211,12 +247,11 @@ describe "#reap" do it "reaps connections from dead fibers" do - fiber = Fiber.schedule do + Fiber.schedule do subject.connection # Fiber exits without releasing end - Fiber.await(fiber) expect(subject.stat[:dead]).to eq(1) subject.reap @@ -234,11 +269,47 @@ end Fiber.await(fiber) - subject.reap + + Fiber.__await_internal_fibers { subject.reap } # Connection should be reset and returned to pool expect(subject.stat[:idle]).to eq(subject.size) end + + it "ignores calls from non-owner threads" do + fiber = Fiber.schedule do + subject.connection + # Fiber exits without releasing + end + + Thread.new { subject.reap }.join + + expect(subject.stat[:dead]).to eq(1) + ensure + subject.release_connection(fiber) + end + + context "with min_connections" do + before do + skip("skipping on Active Record < 8.1") if ActiveRecord.version < Gem::Version.create("8.1") + end + + let(:db_config) { super().merge(min_connections: 1) } + + it "preconnects connections" do + Fiber.schedule do + subject.connection + end + + subject.reap + expect(subject.connections.count(&:active?)).to be(0) + + Fiber.pause # wait for `preconnect` to kick in + ensure_preconnected.call # wait for `preconnect` to finish + + expect(subject.connections.last).to be_active + end + end end describe "#with_connection exception handling" do @@ -297,6 +368,268 @@ expect(conn.execute("select 1")).to be_a(PG::Result) end end + + it "flushes connections" do + skip("skipping on Active Record 7.1") if ActiveRecord.version < Gem::Version.create("7.2") + + fiber = Fiber.schedule do + subject.with_connection do |conn| + conn.execute("select 1") + allow(conn).to receive(:__idle_since).and_return(0) + end + end + + Fiber.await(fiber) + expect(subject.connections.count(&:active?)).to eq(1) + + subject.flush + + expect(subject.connections.count(&:active?)).to eq(0) + end + + it "ignores calls from non-owner threads" do + connection = subject.with_connection do |conn| + conn.execute("select 1") + allow(conn).to receive(:__idle_since).and_return(0) + conn + end + + expect(connection).to be_active + + Thread.new { subject.flush }.join + + # connection is still active - `flush` returned early + expect(connection).to be_active + end + + context "with min_connections" do + before do + skip("skipping on Active Record < 8.1") if ActiveRecord.version < Gem::Version.create("8.1") + end + + let(:db_config) { super().merge(min_connections: 1) } + + it "preconnects connections" do + fiber_1 = Fiber.schedule do + subject.with_connection do |conn| + conn.execute("select 1") + allow(conn).to receive(:__idle_since).and_return(0) + end + end + + fiber_2 = Fiber.schedule do + subject.with_connection do |conn| + conn.execute("select 1") + allow(conn).to receive(:__idle_since).and_return(0) + end + end + + Fiber.await([fiber_1, fiber_2]) + expect(subject.connections.count { |conn| !conn.active? }).to eq(pool_size - 2) + + subject.flush + + # one connection has been left active + expect(subject.connections.count { |conn| !conn.active? }).to eq(pool_size - 1) + + # the active one is at the end of the list + expect(subject.connections.last).to be_active + end + end + + context "during disconnect" do + it "doesn't raise error" do + subject.connection + subject.disconnect + + expect { subject.flush }.not_to raise_error + + ensure + subject.release_connection + end + end + end + + describe "#retire_old_connections" do + before do + skip("skipping on Active Record < 8.1") if ActiveRecord.version < Gem::Version.create("8.1") + end + + it "doesn't disconnect connections when no `max_age` is set" do + connection = subject.with_connection do |conn| + conn.execute("select 1") + conn + end + + result = subject.retire_old_connections + + expect(result).to be(false) + expect(connection).to be_active + end + + context "with `max_age` set" do + let(:db_config) { super().merge(max_age: 10) } + + it "disconnects old connections" do + connection = subject.with_connection do |conn| + conn.execute("select 1") + allow(conn).to receive(:connection_age).and_return(100) + conn + end + + result = subject.retire_old_connections + + expect(result).to be(true) + expect(connection).not_to be_active + end + + it "returns false if no connections were disconnected" do + result = subject.retire_old_connections + expect(result).to be(false) + end + end + + context "during disconnect" do + it "doesn't raise error" do + subject.connection + subject.disconnect + + expect { subject.retire_old_connections }.not_to raise_error + + ensure + subject.release_connection + end + end + end + + describe "#keep_alive" do + before do + skip("skipping on Active Record < 8.1") if ActiveRecord.version < Gem::Version.create("8.1") + end + + let(:db_config) { super().merge(keepalive: 10) } + + it "ignores fresh connections" do + subject.connections.each do |conn| + expect(conn).not_to receive(:disconnect!) + end + + subject.keep_alive + expect(subject.connections.length).to eq(pool_size) + end + + it "pings stale connections" do + fiber_1 = Fiber.schedule do + subject.with_connection do |conn| + allow(conn).to receive(:seconds_since_last_activity).and_return(100) + conn.execute("select 1") + end + end + + fiber_2 = Fiber.schedule do + subject.with_connection do |conn| + allow(conn).to receive(:seconds_since_last_activity).and_return(100) + conn.execute("select 1") + end + end + + fiber_3 = Fiber.schedule do + subject.with_connection do |conn| + conn.execute("select 1") + end + end + + Fiber.await([fiber_1, fiber_2, fiber_3]) + + # ensure the last connection on the list is inactive + subject.connections.last.disconnect! + + # pings from `keep_alive` + allow(subject.connections[-2]).to receive(:active?).and_call_original + allow(subject.connections[-3]).to receive(:active?).and_call_original + + Fiber.__await_internal_fibers { subject.keep_alive } + + # ensure active connections are now at the end of the list + expect(subject.connections.length).to eq(pool_size) + expect(subject.connections.last(2).all?(&:active?)).to be(true) + end + + it "disconnects broken connections" do + connection = subject.with_connection do |conn| + allow(conn).to receive(:seconds_since_last_activity).and_return(100) + conn + end + + expect(subject.connections.last).to eq(connection) + + expect(connection).to receive(:disconnect!) + Fiber.__await_internal_fibers { subject.keep_alive } + + # broken connection is pushed to the bottom + expect(subject.connections.first).to eq(connection) + end + + context "during disconnect" do + it "doesn't raise error" do + subject.connection + subject.disconnect + + expect { subject.keep_alive }.not_to raise_error + + ensure + subject.release_connection + end + end + end + + describe "#preconnect" do + before do + skip("skipping on Active Record < 8.1") if ActiveRecord.version < Gem::Version.create("8.1") + end + + context "with `min_connections`" do + let(:db_config) { super().merge(min_connections: 3) } + + it "preconnects connections" do + subject.preconnect + + expect(subject.connections.length).to eq(pool_size) + expect(subject.connections.last(3).all?(&:active?)).to be(true) + end + end + + context "with `min_connection` bigger than number of connections" do + let(:db_config) { super().merge(min_connections: 1_000) } + + it "preconnects connections" do + subject.preconnect + + expect(subject.connections.length).to eq(pool_size) + expect(subject.connections.all?(&:active?)).to be(true) + end + end + + context "with no `min_connections`" do + it "doesn't preconnect connections" do + subject.preconnect + + expect(subject.connections.length).to eq(pool_size) + expect(subject.connections.none?(&:active?)).to be(true) + end + end + + context "during disconnect" do + it "doesn't raise error" do + subject.connection + subject.disconnect + + expect { subject.preconnect }.not_to raise_error + + ensure + subject.release_connection + end + end end describe "#stat" do @@ -377,4 +710,22 @@ end end end + + describe "monitoring" do + before do + skip("skipping on Active Record >= 8.1") if ActiveRecord.version >= Gem::Version.create("8.1") + end + + it "correctly monitors the pool" do + expect { + subject.flush + subject.reap + subject.keep_alive + subject.preconnect + subject.retire_old_connections + }.not_to raise_error + + expect(subject.connections.length).to eq(pool_size) + end + end end diff --git a/spec/ext/active_record/openapi_parser_spec.rb b/spec/ext/active_record/openapi_parser_spec.rb index 36a70294..fa0058ac 100644 --- a/spec/ext/active_record/openapi_parser_spec.rb +++ b/spec/ext/active_record/openapi_parser_spec.rb @@ -10,10 +10,6 @@ setup_test_schema end - after :all do - ActiveRecord::Base.connection.disconnect! - end - def setup_test_schema ActiveRecord::Base.connection.create_table :open_api_test_products, force: true do |t| t.string :name