Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions Appraisals
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
appraise "active_record_7_0" do
gem "activerecord", "~> 7.0.0"
end

appraise "active_record_7_1" do
gem "activerecord", "~> 7.1.0"
end
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### Added

- [OpenAPI] Add support for blueprint inheritance. Child blueprints now inherit fields from parent blueprints.
- Connection pool improvements (#301).

### Fixed

Expand Down
5 changes: 4 additions & 1 deletion Rakefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ task :appraise do |_, args|
puts ">> Appraising #{ext_version}"

gem_name = ext_version.sub(/_\d+(_\d+)*$/, "")
system "bundle exec appraisal #{ext_version} rspec spec/ext/#{gem_name}/"

Pathname.new("spec/ext").join(gem_name).glob("*_spec.rb").each do |spec|
system("bundle exec appraisal #{ext_version} rspec #{spec}", exception: true)
end
end
end
155 changes: 133 additions & 22 deletions lib/rage/ext/active_record/connection_pool.rb
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ def <<(el)
@arr << el
end

def shift
def unshift(el)
@arr.unshift(el)
end

def pop
nil
end

Expand Down Expand Up @@ -59,18 +63,27 @@ def __init_rage_extension
# in the format of { Fiber => Connection }
@__in_use = {}

# a list of all DB connections that are currently idle
@__connections = build_new_connections

# how long a fiber can wait for a connection to become available
@__checkout_timeout = checkout_timeout

# how long a connection can be idle for before disconnecting
@__idle_timeout = respond_to?(:db_config) ? db_config.idle_timeout : @idle_timeout

# pool will maintain at least this many connections
@__min_connections = respond_to?(:min_connections) ? min_connections : 0

# connections older than this are automatically disconnected
@__max_age = respond_to?(:max_age) ? max_age : Float::INFINITY

# seconds between keepalive pings on idle connections
@__keepalive = respond_to?(:keepalive) ? keepalive : nil

# how often should we check for fibers that wait for a connection for too long
@__timeout_worker_frequency = 0.5

# a list of all DB connections that are currently idle
@__connections = build_new_connections

# reject fibers that wait for a connection for more than `@__checkout_timeout`
Iodine.run_every((@__timeout_worker_frequency * 1_000).to_i) do
if @__blocked.length > 0
Expand Down Expand Up @@ -105,6 +118,8 @@ def __init_rage_extension
end

@release_connection_channel = "ext:ar-connection-released:#{object_id}"
@__owner_thread = Thread.current
@__background_maintenance_disabled = (ActiveRecord.version < Gem::Version.create("8.1"))

# resume blocked fibers once connections become available
Iodine.subscribe(@release_connection_channel) do
Expand All @@ -118,6 +133,14 @@ def __init_rage_extension
Iodine.on_state(:on_finish) do
Iodine.unsubscribe(@release_connection_channel)
end

Iodine.run_every((db_config.reaping_frequency * 1_000).to_i) do
reap
flush
disconnected = retire_old_connections
keep_alive
preconnect if disconnected
end
end

# Returns true if there is an open connection being used for the current fiber.
Expand All @@ -127,12 +150,10 @@ def active_connection?

# Retrieve the connection associated with the current fiber, or obtain one if necessary.
def connection
@__in_use[Fiber.current] ||= @__connections.shift || begin
fiber, blocked_since = Fiber.current, Process.clock_gettime(Process::CLOCK_MONOTONIC)
@__blocked[fiber] = blocked_since
@__in_use[Fiber.current] ||= @__connections.pop || begin
@__blocked[Fiber.current] = Process.clock_gettime(Process::CLOCK_MONOTONIC)
Fiber.yield

@__connections.shift
@__connections.pop
end
end

Expand All @@ -149,45 +170,125 @@ def release_connection(owner = Fiber.current)

# Recover lost connections for the pool.
def reap
crashed_fibers = nil
return unless Thread.current == @__owner_thread

@__in_use.each do |fiber, conn|
dead = nil

@__in_use.delete_if do |fiber, conn|
unless fiber.alive?
(dead ||= []) << [fiber, conn]
true
end
end

dead&.each do |fiber, conn|
Fiber.schedule do
if conn.active?
conn.reset!
(crashed_fibers ||= []) << fiber
@__in_use[fiber] = conn
release_connection(fiber)
else
@__in_use.delete(fiber)
conn.disconnect!
__remove__(conn)
self.automatic_reconnect = true
@__connections += build_new_connections(1)
@__connections.unshift(*build_new_connections(1))
Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
end
end
end

if crashed_fibers
crashed_fibers.each { |fiber| release_connection(fiber) }
end
end

# Disconnect all connections that have been idle for at least
# `minimum_idle` seconds. Connections currently checked out, or that were
# checked in less than `minimum_idle` seconds ago, are unaffected.
def flush(minimum_idle = @__idle_timeout)
return if minimum_idle.nil? || @__connections.length == 0
return if Thread.current != @__owner_thread || minimum_idle.nil? || @__connections.length == 0

current_time, i = Process.clock_gettime(Process::CLOCK_MONOTONIC), 0
while i < @__connections.length
current_time = Process.clock_gettime(Process::CLOCK_MONOTONIC)
i = @__connections.length - 1
connections_to_preserve = @__min_connections

while i >= 0
conn = @__connections[i]

if connections_to_preserve > 0 && conn.connected? && conn.verified?
connections_to_preserve -= 1
i -= 1
next
end

if conn.__idle_since && current_time - conn.__idle_since >= minimum_idle
conn.__idle_since = nil
conn.__needs_reconnect = true
conn.disconnect!
end
i -= 1
end
end

# Disconnect connections exceeding `max_age`
def retire_old_connections(max_age = @__max_age)
return false if max_age.nil? || max_age.infinite?

i, disconnected = 0, false

while i < @__connections.length
conn = @__connections[i]
if (conn.connection_age || 0) >= conn.pool_jitter(max_age)
conn.disconnect!
disconnected = true
end
i += 1
end

disconnected
end

# Ping idle connections to prevent firewall/server timeouts
def keep_alive(threshold = @__keepalive)
return if threshold.nil? || @__connections.length == 0

to_ping = nil

@__connections.delete_if do |conn|
if (conn.seconds_since_last_activity || 0) >= conn.pool_jitter(threshold)
(to_ping ||= []) << conn
true
end
end

to_ping&.each do |conn|
Fiber.schedule do
if conn.active?
@__connections << conn
else
conn.disconnect!
@__connections.unshift(conn)
end
Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
end
end
end

# Proactively establish DB connections
def preconnect
return if @__connections.length == 0 || @__background_maintenance_disabled

active_connections_count = @__in_use.length + @__connections.count { |conn| conn.connected? && conn.verified? }

while @__min_connections - active_connections_count > 0
i = @__connections.rindex { |conn| !conn.connected? || !conn.verified? }
return unless i

Fiber.schedule do
conn = @__connections.delete_at(i)
conn.connect! rescue nil
@__connections << conn
Iodine.publish(@release_connection_channel, "", Iodine::PubSub::PROCESS) if @__blocked.length > 0
end

active_connections_count += 1
end
end

# Disconnect all currently idle connections. Connections currently checked out are unaffected.
Expand Down Expand Up @@ -234,6 +335,8 @@ def stat
# Raises `ActiveRecord::ExclusiveConnectionTimeoutError` if unable to gain ownership of all
# connections in the pool within a timeout interval (default duration is `checkout_timeout * 2` seconds).
def disconnect(raise_on_acquisition_timeout = true, disconnect_attempts = 0)
return if @__connections.is_a?(BlackHoleList)

# allow request fibers to release connections, but block from acquiring new ones
if disconnect_attempts == 0
@__connections = BlackHoleList.new(@__connections)
Expand Down Expand Up @@ -288,6 +391,10 @@ def lease_connection
connection
end

def maintainable?
false
end

# Check in a database connection back into the pool, indicating that you no longer need this connection.
def checkin(conn)
fiber = @__in_use.key(conn)
Expand Down Expand Up @@ -328,8 +435,12 @@ def discarded?
private

def build_new_connections(num_connections = size)
(1..num_connections).map do
connections = (1..num_connections).map do
__checkout__.tap { |conn| conn.__idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC) }
end

Iodine.defer { preconnect }

connections
end
end
2 changes: 1 addition & 1 deletion lib/rage/templates/db-templates/mysql/config-database.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
default: &default
adapter: mysql2
encoding: utf8mb4
pool: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %>
max_connections: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %>
username: root
password:
socket: /tmp/mysql.sock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
default: &default
adapter: postgresql
encoding: unicode
pool: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %>
max_connections: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %>

development:
<<: *default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#
default: &default
adapter: sqlite3
pool: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %>
max_connections: <%%= ENV.fetch("DB_MAX_CONNECTIONS") { 5 } %>
timeout: 5000

development:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
default: &default
adapter: trilogy
encoding: utf8mb4
pool: <%%= ENV.fetch("DB_MAX_THREADS") { 5 } %>
max_connections: <%%= ENV.fetch("DB_MAX_THREADS") { 5 } %>
username: root
password:
socket: /tmp/mysql.sock
Expand Down
Loading