Skip to content

Add ability to expire resources within the pool #216

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
268 changes: 268 additions & 0 deletions spec/pool_spec.cr
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require "./spec_helper"
require "../src/db/error.cr"

class ShouldSleepingOp
@is_sleeping = false
Expand Down Expand Up @@ -57,6 +58,17 @@ class Closable
end
end

class ClosableWithSignal < Closable
setter signal : Channel(Nil)?

def initialize(@signal = nil)
end

protected def do_close
@signal.try &.send(nil)
end
end

private def create_pool(**options, &factory : -> T) forall T
DB::Pool.new(DB::Pool::Options.new(**options), &factory)
end
Expand Down Expand Up @@ -224,4 +236,260 @@ describe DB::Pool do

all.size.should eq 4
end

it "should expire resources that exceed maximum lifetime on checkout" do
all = [] of Closable
pool = create_pool(
max_pool_size: 2, max_idle_pool_size: 1,
max_lifetime_per_resource: 0.1, expired_resource_sweeper: false
) { Closable.new.tap { |c| all << c } }

sleep 0.2.seconds
ex = expect_raises DB::PoolResourceLifetimeExpired(Closable) do
pool.checkout
end

# Lifetime expiration error should cause the client to be closed
all[0].closed?.should be_true
end

it "should expire resources that exceed maximum idle-time on checkout" do
all = [] of Closable
pool = create_pool(
max_pool_size: 2, max_idle_pool_size: 1,
max_idle_time_per_resource: 0.2, max_lifetime_per_resource: 2.0,
expired_resource_sweeper: false
) { Closable.new.tap { |c| all << c } }

sleep 0.2.seconds

# Idle expiration error should cause the client to be closed
ex = expect_raises DB::PoolResourceIdleExpired(Closable) do
pool.checkout
end

all[0].closed?.should be_true
end

it "should expire resources that exceed maximum lifetime on release" do
all = [] of Closable
pool = create_pool(
max_pool_size: 2,
max_idle_pool_size: 1,
max_lifetime_per_resource: 0.2,
max_idle_time_per_resource: 2.0,
expired_resource_sweeper: false
) { Closable.new.tap { |c| all << c } }

pool.checkout { sleep 0.25.seconds }
pool.stats.lifetime_expired_connections.should eq 1
all[0].closed?.should be_true
end

it "should reset idle-time during release" do
all = [] of Closable
pool = create_pool(
max_pool_size: 2,
initial_pool_size: 0,
max_idle_pool_size: 1,
max_idle_time_per_resource: 0.2,
max_lifetime_per_resource: 0.4,
expired_resource_sweeper: false
) { Closable.new.tap { |c| all << c } }

# Resource gets idled every 0.2 seconds but gets reset upon each release.
pool.checkout do
sleep 0.21.seconds
end

all[0].closed?.should be_false
pool.checkout { sleep 0.2.seconds }

pool.stats.lifetime_expired_connections.should eq(1)
pool.stats.idle_expired_connections.should eq(0)
all[0].closed?.should be_true
all.size.should eq(1)
end

it "Should ensure minimum of initial_pool_size non-expired idle resources on checkout" do
all = [] of Closable

pool = create_pool(
initial_pool_size: 3,
max_pool_size: 5,
max_idle_pool_size: 5,
max_lifetime_per_resource: 1.0,
max_idle_time_per_resource: 0.1,
expired_resource_sweeper: false
) { Closable.new.tap { |c| all << c } }

# Initially have 3 clients
all.size.should eq(3)

sleep 0.2.seconds

# checkout
3.times do |i|
# Each call should cause another resource to be spawned and the old one expired off
# for a minimum of three
ex = expect_raises DB::PoolResourceIdleExpired(Closable) do
pool.checkout { }
end

pool.stats.idle_connections.should eq(3)
pool.stats.idle_expired_connections.should eq(i + 1)
all.size.should eq(3 + (i + 1))
all[i].closed?.should be_true
end
end

it "Should ensure minimum of initial_pool_size non-expired idle resources on release" do
all = [] of Closable

pool = create_pool(
initial_pool_size: 3,
max_pool_size: 5,
max_idle_pool_size: 5,
max_lifetime_per_resource: 0.2,
expired_resource_sweeper: false
) { Closable.new.tap { |c| all << c } }

temp_resource_store = {
pool.checkout,
pool.checkout,
pool.checkout,
}

# Await lifetime expiration
sleep 0.2.seconds
# release
temp_resource_store.each_with_index do |resource, i|
# All three idle connections were checked out
# Each iteration should result in a new idle connection being created
# as the one we release get expired.
pool.release(resource)

pool.stats.idle_connections.should eq(i + 1)
pool.stats.lifetime_expired_connections.should eq(i + 1)
all.size.should eq(3 + (i + 1))
all[i].closed?.should be_true
end
end

it "Should count inflight resources when ensuring minimum of initial_pool_size non-expired resources" do
number_of_factory_calls = 0
toggle_long_inflight = false

close_inflight = Channel(Nil).new
resource_closed_signal = Channel(Nil).new

pool = create_pool(
initial_pool_size: 3,
max_pool_size: 5,
max_idle_pool_size: 5,
max_lifetime_per_resource: 0.25,
expired_resource_sweeper: false
) do
number_of_factory_calls += 1
if toggle_long_inflight
close_inflight.send(nil)
end
ClosableWithSignal.new(resource_closed_signal)
end

toggle_long_inflight = true
temporary_latch = {pool.checkout, pool.checkout, pool.checkout}
spawn { pool.checkout { } }

# Make existing resources stale
sleep 0.25.seconds

pool.stats.idle_connections.should eq(0)
pool.stats.in_flight_connections.should eq(1)

# Release latched resources
temporary_latch.each do |resource|
spawn do
pool.release(resource)
end
end

# If inflight number is used correctly there should only be a total of
# three new pending resources created in total which is used to replace the
# expiring ones.
#
# +1 from the initial checkout (total 3, inflight: 1)
# +0 from the first release (total 2, inflight: 1)
# +1 from the second release (total: 1, inflight: 2)
# +1 from the third release (total: 0, inflight: 3)

3.times do
resource_closed_signal.receive
close_inflight.receive
end

# Should close gracefully and without any errors.
close_inflight.close

number_of_factory_calls.should eq(6)
pool.stats.idle_connections.should eq(3)
pool.stats.open_connections.should eq(3)
pool.stats.in_flight_connections.should eq(0)
pool.stats.lifetime_expired_connections.should eq(3)
end

describe "background expired resource sweeper " do
it "should clear idle resources" do
all = [] of Closable
signal = Channel(Nil).new
pool = create_pool(
initial_pool_size: 0,
max_pool_size: 5,
max_idle_pool_size: 5,
max_idle_time_per_resource: 0.25,
) do
ClosableWithSignal.new.tap { |c| all << c }.tap &.signal = signal
end

# Create 5 resource
5.times {
spawn do
pool.checkout { sleep 0.1.seconds }
end
}

all.each &.closed?.should be_false

5.times do
signal.receive
end

# Gone
all.each &.closed?.should be_true
pool.stats.open_connections.should eq(0)
pool.stats.idle_connections.should eq(0)
end

it "should ensure minimum of initial_pool_size fresh resources" do
all = [] of Closable
signal = Channel(Nil).new
pool = create_pool(
initial_pool_size: 3,
max_pool_size: 5,
max_idle_pool_size: 5,
max_lifetime_per_resource: 2.0,
max_idle_time_per_resource: 0.5,
) { ClosableWithSignal.new.tap { |c| all << c }.tap &.signal = signal }

# The job will replace the three idle expired resources with new ones
3.times { signal.receive }
# Wait for the replenishment process to finish
sleep 0.25.seconds

all.size.should eq(6)
all[..2].each &.closed?.should be_true
all[3..].each &.closed?.should be_false
pool.stats.idle_connections.should eq(3)
end
end
end
12 changes: 12 additions & 0 deletions src/db/error.cr
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,18 @@ module DB
class PoolResourceRefused < Error
end

# Raised when a checked out resource has reached expiration of any kind
class PoolResourceExpired(T) < PoolResourceLost(T)
end

# Raised when a checked out resource has exceeded the maximum lifetime
class PoolResourceLifetimeExpired(T) < PoolResourceExpired(T)
end

# Raised when a checked out resource has idle expired
class PoolResourceIdleExpired(T) < PoolResourceExpired(T)
end

# Raised when an established connection is lost
# probably due to socket/network issues.
# It is used by the connection pool retry logic.
Expand Down
Loading