class Mongo::Server::ConnectionPool::Queue
A LIFO queue of connections to be used by the connection pool. This is based on mperham's connection pool.
@note The queue contains active connections that are available for
use. It does not track connections which are in use (checked out). It is easy to confuse the size of the connection pool (number of connections that are used plus number of connections that are available for use) and the size of the queue (number of connections that have already been created that are available for use). API documentation for this class states whether each size refers to the pool or to the queue size. Note that minimum and maximum sizes only make sense when talking about the connection pool, as the size of the queue of available connections is determined by the size constraints of the pool plus how many connections are currently checked out.
@since 2.0.0
Constants
- MAX_SIZE
The default max size for the connection pool.
- MIN_SIZE
The default min size for the connection pool.
- WAIT_TIMEOUT
The default timeout, in seconds, to wait for a connection.
Attributes
@return [ Integer ] generation Generation of connections currently
being used by the queue.
@since 2.7.0 @api private
@return [ Mutex ] mutex The mutex used for synchronization of
access to #queue.
@api private
@return [ Hash ] options The options.
Number of connections in the pool (active connections ready to be checked out plus connections already checked out).
@since 2.7.0
@return [ Array ] queue The underlying array of connections.
@return [ ConditionVariable ] resource The resource.
Public Class Methods
Initialize the new queue. Will yield the block the number of times equal to the initial connection pool size.
@example Create the queue.
Mongo::Server::ConnectionPool::Queue.new(max_pool_size: 5) { Connection.new }
@param [ Hash ] options The options.
@option options [ Integer ] :max_pool_size The maximum pool size. @option options [ Integer ] :min_pool_size The minimum pool size. @option options [ Float ] :wait_queue_timeout The time to wait, in
seconds, for a free connection.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 63 def initialize(options = {}, &block) if options[:min_pool_size] && options[:max_pool_size] && options[:min_pool_size] > options[:max_pool_size] then raise ArgumentError, "Cannot have min size > max size" end @block = block # This is the number of connections in the pool. # Includes available connections in the queue and the checked # out connections that we don't otherwise track. @pool_size = 0 @options = options @generation = 1 if min_size > max_size raise ArgumentError, "min_size (#{min_size}) cannot exceed max_size (#{max_size})" end @queue = Array.new(min_size) { create_connection } @mutex = Mutex.new @resource = ConditionVariable.new check_count_invariants end
Public Instance Methods
Close sockets that have been open for longer than the max idle time,
if the option is set.
@example Close the stale sockets
queue.close_stale_sockets!
@since 2.5.0
# File lib/mongo/server/connection_pool/queue.rb, line 294 def close_stale_sockets! check_count_invariants return unless max_idle_time mutex.synchronize do i = 0 while i < queue.length connection = queue[i] if last_checkin = connection.last_checkin if (Time.now - last_checkin) > max_idle_time connection.disconnect! queue.delete_at(i) @pool_size -= 1 next end end i += 1 end end ensure check_count_invariants end
Retrieves a connection. If there are active connections in the queue, the most recently used connection is returned. Otherwise if the connection pool size is less than the max size, creates a new connection and returns it. Otherwise raises Timeout::Error.
@example Dequeue a connection.
queue.dequeue
@return [ Mongo::Server::Connection
] The next connection. @raise [ Timeout::Error ] If the connection pool is at maximum size
and remains so for longer than the wait timeout.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 141 def dequeue check_count_invariants dequeue_connection ensure check_count_invariants end
Closes all idle connections in the queue and schedules currently dequeued connections to be closed when they are enqueued back into the queue. The queue remains operational and can create new connections when requested.
@example Disconnect all connections.
queue.disconnect!
@return [ true ] Always true.
@since 2.1.0
# File lib/mongo/server/connection_pool/queue.rb, line 159 def disconnect! check_count_invariants mutex.synchronize do while connection = queue.pop connection.disconnect! @pool_size -= 1 if @pool_size < 0 # This should never happen log_warn("ConnectionPool::Queue: connection accounting problem") @pool_size = 0 end end @generation += 1 true end ensure check_count_invariants end
Enqueue a connection in the queue.
Only connections created by this queue should be enqueued back into it, however the queue does not verify whether it originally created the connection being enqueued.
If linting is enabled (see Mongo::Lint
), attempting to enqueue connections beyond the pool's capacity will raise Mongo::Error::LintError
(since some of those connections must not have originated from the queue into which they are being enqueued). If linting is not enabled, the queue can grow beyond its max size with undefined results.
@example Enqueue a connection.
queue.enqueue(connection)
@param [ Mongo::Server::Connection
] connection The connection.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 197 def enqueue(connection) check_count_invariants mutex.synchronize do if connection.generation == @generation queue.unshift(connection.record_checkin!) resource.broadcast else connection.disconnect! @pool_size = if @pool_size > 0 @pool_size - 1 else # This should never happen log_warn("ConnectionPool::Queue: unexpected enqueue") 0 end while @pool_size < min_size @pool_size += 1 queue.unshift(@block.call(@generation)) end end end nil ensure check_count_invariants end
Get a pretty printed string inspection for the queue.
@example Inspect the queue.
queue.inspect
@return [ String ] The queue inspection.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 233 def inspect "#<Mongo::Server::ConnectionPool::Queue:0x#{object_id} min_size=#{min_size} max_size=#{max_size} " + "wait_timeout=#{wait_timeout} current_size=#{queue_size}>" end
The maximum seconds a socket can remain idle since it has been checked in to the pool.
@example Get the max idle time.
queue.max_idle_time
@return [ Float ] The max socket idle time in seconds.
@since 2.5.0
# File lib/mongo/server/connection_pool/queue.rb, line 283 def max_idle_time @max_idle_time ||= options[:max_idle_time] end
Get the maximum size of the connection pool.
@example Get the max size.
queue.max_size
@return [ Integer ] The maximum size of the connection pool.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 246 def max_size @max_size ||= options[:max_pool_size] || [MAX_SIZE, min_size].max end
Get the minimum size of the connection pool.
@example Get the min size.
queue.min_size
@return [ Integer ] The minimum size of the connection pool.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 258 def min_size @min_size ||= options[:min_pool_size] || MIN_SIZE end
Number of connections that the pool has which are ready to be checked out.
@since 2.7.0
Number of connections that the pool has which are ready to be checked out. This is NOT the size of the connection pool (total number of active connections created by the pool).
# File lib/mongo/server/connection_pool/queue.rb, line 110 def size mutex.synchronize do queue.size end end
The time to wait, in seconds, for a connection to become available.
@example Get the wait timeout.
queue.wait_timeout
@return [ Float ] The queue wait timeout.
@since 2.0.0
# File lib/mongo/server/connection_pool/queue.rb, line 270 def wait_timeout @wait_timeout ||= options[:wait_queue_timeout] || WAIT_TIMEOUT end
Private Instance Methods
# File lib/mongo/server/connection_pool/queue.rb, line 346 def check_count_invariants if Mongo::Lint.enabled? if pool_size < 0 raise Error::LintError, 'connection pool queue: underflow' end if pool_size > max_size raise Error::LintError, 'connection pool queue: overflow' end end end
# File lib/mongo/server/connection_pool/queue.rb, line 331 def create_connection if pool_size < max_size @pool_size += 1 @block.call(@generation) end end
# File lib/mongo/server/connection_pool/queue.rb, line 319 def dequeue_connection mutex.synchronize do deadline = Time.now + wait_timeout loop do return queue.shift unless queue.empty? connection = create_connection return connection if connection wait_for_next!(deadline) end end end
# File lib/mongo/server/connection_pool/queue.rb, line 338 def wait_for_next!(deadline) wait = deadline - Time.now if wait <= 0 raise Timeout::Error.new("Timed out attempting to dequeue connection after #{wait_timeout} sec.") end resource.wait(mutex, wait) end