diff --git a/src/ConnectionPool.jl b/src/ConnectionPool.jl index 8a4b69613..59ca690ca 100644 --- a/src/ConnectionPool.jl +++ b/src/ConnectionPool.jl @@ -25,7 +25,7 @@ and `closewrite`, with corresponding `startread` and `closeread`. module ConnectionPool export Connection, Transaction, - getconnection, getrawstream, inactiveseconds + newconnection, getrawstream, inactiveseconds using ..IOExtras, ..Sockets @@ -38,6 +38,9 @@ const default_connection_limit = 8 const default_pipeline_limit = 16 const nolimit = typemax(Int) +include("connectionpools.jl") +using .ConnectionPools + # certain operations, like locking Channels and Conditions # is only supported in >= 1.3 macro v1_3(expr, elses=nothing) @@ -64,7 +67,6 @@ A `TCPSocket` or `SSLContext` connection to a HTTP `host` and `port`. Fields: - `host::String` - `port::String`, exactly as specified in the URI (i.e. may be empty). -- `pipeline_limit`, number of requests to send before waiting for responses. - `idle_timeout`, No. of seconds to maintain connection after last transaction. - `peerip`, remote IP adress (used for debug/log messages). - `peerport`, remote TCP port number (used for debug/log messages). @@ -74,37 +76,20 @@ Fields: - `buffer::IOBuffer`, left over bytes read from the connection after the end of a response header (or chunksize). These bytes are usually part of the response body. -- `sequence`, number of most recent `Transaction`. -- `writecount`, number of Messages that have been written, protected by `writelock` -- `writelock`, lock writecount and writebusy, and signal that `writecount` was incremented. -- `writebusy`, whether a Transaction currently holds the Connection write lock, protected by `writelock` -- `readcount`, number of Messages that have been read, protected by `readlock` -- `readlock`, lock readcount and readbusy, and signal that `readcount` was incremented. -- `readbusy`, whether a Transaction currently holds the Connection read lock, protectecd by `readlock` - `timestamp`, time data was last received. """ -mutable struct Connection{T <: IO} +mutable struct Connection host::String port::String - pipeline_limit::Int idle_timeout::Int require_ssl_verification::Bool peerip::IPAddr # for debugging/logging peerport::UInt16 # for debugging/logging localport::UInt16 # debug only - io::T + io::IO clientconnection::Bool buffer::IOBuffer - sequence::Threads.Atomic{Int} - writecount::Int - writelock::Cond # protects the writecount and writebusy fields, notifies on closewrite - writebusy::Bool - readcount::Int - readlock::Cond # protects the readcount and readbusy fields, notifies on closeread - readbusy::Bool timestamp::Float64 - closelock::ReentrantLock - closed::Bool end """ @@ -112,50 +97,44 @@ end Used for "hashing" a Connection object on just the key properties necessary for determining connection re-useability. That is, when a new request calls `getconnection`, we take the -request parameters of what socket type, the host and port, any pipeline_limit and if ssl +request parameters of what socket type, the host and port, and if ssl verification is required, and if an existing Connection was already created with the exact same parameters, we can re-use it (as long as it's not already being used, obviously). """ function hashconn end -hashconn(x::Connection{T}) where {T} = hashconn(T, x.host, x.port, x.pipeline_limit, x.require_ssl_verification, x.clientconnection) -hashconn(T, host, port, pipeline_limit, require_ssl_verification, client) = hash(T, hash(host, hash(port, hash(pipeline_limit, hash(require_ssl_verification, hash(client, UInt(0))))))) +hashconn(x::Connection) = hashconn(typeof(x.io), x.host, x.port, x.require_ssl_verification, x.clientconnection) +hashconn(T, host, port, require_ssl_verification, client) = hash(T, hash(host, hash(port, hash(require_ssl_verification, hash(client, UInt(0)))))) + +@enum TransactionState Pre Busy Post """ Transaction -A single pipelined HTTP Request/Response transaction. +A single HTTP Request/Response transaction. Fields: - `c`, the shared [`Connection`](@ref) used for this `Transaction`. - - `sequence::Int`, identifies this `Transaction` among the others that share `c`. - - `writebusy::Bool`, whether this Transaction holds its parent Connection write lock, protected by c.writelock - - `readbusy::Bool`, whether this Transaction holds its parent Connection read lock, protected by c.readlock """ -mutable struct Transaction{T <: IO} <: IO - c::Connection{T} - sequence::Int - writebusy::Bool - readbusy::Bool +mutable struct Transaction <: IO + c::Connection + writestate::TransactionState + readstate::TransactionState end Connection(host::AbstractString, port::AbstractString, - pipeline_limit::Int, idle_timeout::Int, - require_ssl_verification::Bool, io::T, client=true) where T <: IO = - Connection{T}(host, port, - pipeline_limit, idle_timeout, + idle_timeout::Int, + require_ssl_verification::Bool, io::IO, client=true) = + Connection(host, port, + idle_timeout, require_ssl_verification, safe_getpeername(io)..., localport(io), - io, client, PipeBuffer(), Threads.Atomic{Int}(0), - 0, Cond(), false, - 0, Cond(), false, - time(), ReentrantLock(), false) + io, client, PipeBuffer(), time()) Connection(io; require_ssl_verification::Bool=true) = - Connection("", "", default_pipeline_limit, 0, require_ssl_verification, io, false) + Connection("", "", 0, require_ssl_verification, io, false) -Transaction(c::Connection{T}) where T <: IO = - Transaction{T}(c, (Threads.atomic_add!(c.sequence, 1)), false, false) +Transaction(c::Connection) = Transaction(c, Pre, Pre) function client_transaction(c) t = Transaction(c) @@ -175,18 +154,7 @@ Base.unsafe_write(t::Transaction, p::Ptr{UInt8}, n::UInt) = unsafe_write(t.c.io, p, n) Base.isopen(c::Connection) = isopen(c.io) - -Base.isopen(t::Transaction) = isopen(t.c) && - readcount(t.c) <= t.sequence && - writecount(t.c) <= t.sequence - -writebusy(c::Connection) = @v1_3 lock(() -> c.writebusy, c.writelock) c.writebusy -writecount(c::Connection) = @v1_3 lock(() -> c.writecount, c.writelock) c.writecount -readbusy(c::Connection) = @v1_3 lock(() -> c.readbusy, c.readlock) c.readbusy -readcount(c::Connection) = @v1_3 lock(() -> c.readcount, c.readlock) c.readcount - -writebusy(t::Transaction) = @v1_3 lock(() -> t.writebusy, t.c.writelock) t.writebusy -readbusy(t::Transaction) = @v1_3 lock(() -> t.readbusy, t.c.readlock) t.readbusy +Base.isopen(t::Transaction) = isopen(t.c) """ flush(c::Connection) @@ -208,12 +176,6 @@ function Base.flush(c::Connection) end end -""" -Is `c` currently in use or expecting a response to request already sent? -""" -isbusy(c::Connection) = isopen(c) && (writebusy(c) || readbusy(c) || - writecount(c) > readcount(c)) - function Base.eof(t::Transaction) @require isreadable(t) || !isopen(t) if bytesavailable(t) > 0 @@ -226,8 +188,8 @@ Base.bytesavailable(t::Transaction) = bytesavailable(t.c) Base.bytesavailable(c::Connection) = bytesavailable(c.buffer) + bytesavailable(c.io) -Base.isreadable(t::Transaction) = readbusy(t) -Base.iswritable(t::Transaction) = writebusy(t) +Base.isreadable(t::Transaction) = t.readstate == Busy +Base.iswritable(t::Transaction) = t.writestate == Busy function Base.read(t::Transaction, nb::Int) nb = min(nb, bytesavailable(t)) @@ -305,22 +267,12 @@ end Wait for prior pending writes to complete. """ function IOExtras.startwrite(t::Transaction) - @require !iswritable(t) - - @v1_3 lock(t.c.writelock) - try - while writecount(t.c) != t.sequence - @debug 1 "⏳ Wait write: $t" - wait(t.c.writelock) - end - t.writebusy = true - t.c.writebusy = true - @ensure iswritable(t) - @debug 2 "👁 Start write:$t" - finally - @v1_3 unlock(t.c.writelock) + @require t.writestate == Pre + while bytesavailable(t.c.io) > 0 + readavailable(t.c.io) end - + t.writestate = Busy + @debug 2 "👁 Start write:$t" return end @@ -331,46 +283,20 @@ Signal that an entire Request Message has been written to the `Transaction`. """ function IOExtras.closewrite(t::Transaction) @require iswritable(t) - - @v1_3 lock(t.c.writelock) - try - t.writebusy = false - t.c.writecount += 1 ;@debug 2 "🗣 Write done: $t" - t.c.writebusy = false - notify(t.c.writelock) - @ensure !iswritable(t) - finally - @v1_3 unlock(t.c.writelock) - end + t.writestate = Post + @debug 2 "🗣 Write done: $t" flush(t.c) - release(t.c) - return end """ startread(::Transaction) - -Wait for prior pending reads to complete. """ function IOExtras.startread(t::Transaction) - @require !isreadable(t) - + @require t.readstate == Pre t.c.timestamp = time() - @v1_3 lock(t.c.readlock) - try - while readcount(t.c) != t.sequence - @debug 1 "⏳ Wait read: $t" - wait(t.c.readlock) - end - t.readbusy = true - t.c.readbusy = true - @debug 2 "👁 Start read: $t" - @ensure isreadable(t) - finally - @v1_3 unlock(t.c.readlock) - end - + t.readstate = Busy + @debug 2 "👁 Start read: $t" return end @@ -378,46 +304,31 @@ end closeread(::Transaction) Signal that an entire Response Message has been read from the `Transaction`. - -Increment `readcount` and wake up tasks waiting in `startread`. """ function IOExtras.closeread(t::Transaction) - @require isreadable(t) - - @v1_3 lock(t.c.readlock) - try - t.readbusy = false - t.c.readcount += 1 ;@debug 2 "✉️ Read done: $t" - t.c.readbusy = false - notify(t.c.readlock) - @ensure !isreadable(t) - if !isbusy(t.c) - @async monitor_idle_connection(t.c) - end - finally - @v1_3 unlock(t.c.readlock) - end - release(t.c) - + @require t.readstate == Busy + t.readstate = Post + @debug 2 "✉️ Read done: $t" + t.c.clientconnection && release(POOL, hashconn(t.c), t.c) return end -""" -Wait for `c` to receive data or reach EOF. -Close `c` on EOF or if response data arrives when no request was sent. -""" -function monitor_idle_connection(c::Connection) - if eof(c.io) ;@debug 2 "💀 Closed: $c" - close(c.io) - elseif !isbusy(c) ;@debug 1 "😈 Idle RX!!: $c" - close(c.io) - end -end +# """ +# Wait for `c` to receive data or reach EOF. +# Close `c` on EOF or if response data arrives when no request was sent. +# """ +# function monitor_idle_connection(c::Connection) +# if eof(c.io) ;@debug 2 "💀 Closed: $c" +# close(c.io) +# elseif !isbusy(c) ;@debug 1 "😈 Idle RX!!: $c" +# close(c.io) +# end +# end -function monitor_idle_connection(c::Connection{SSLContext}) - # MbedTLS.jl monitors idle connections for TLS close_notify messages. - # https://github.com/JuliaWeb/MbedTLS.jl/pull/145 -end +# function monitor_idle_connection(c::Connection{SSLContext}) +# # MbedTLS.jl monitors idle connections for TLS close_notify messages. +# # https://github.com/JuliaWeb/MbedTLS.jl/pull/145 +# end Base.wait_close(t::Transaction) = Base.wait_close(tcpsocket(t.c.io)) @@ -461,82 +372,16 @@ end Close all connections in `pool`. """ function closeall() - lock(POOL.lock) do - for pod in values(POOL.conns) - @v1_3 lock(pod.conns) - while isready(pod.conns) - close(take!(pod.conns)) - end - pod.numactive = 0 - @v1_3 unlock(pod.conns) - end - end - return -end - -mutable struct Pod - conns::Channel{Connection} - numactive::Int -end - -Pod() = Pod(Channel{Connection}(Inf), 0) - -function decr!(pod::Pod) - @v1_3 @assert islocked(pod.conns.cond_take) - pod.numactive -= 1 - return -end - -function incr!(pod::Pod) - @v1_3 @assert islocked(pod.conns.cond_take) - pod.numactive += 1 - return -end - - -function release(c::Connection) - h = hashconn(c) - if haskey(POOL.conns, h) - pod = getpod(POOL, h) - @debug 2 "returning connection to pod: $c" - put!(pod.conns, c) - end + ConnectionPools.reset!(POOL) return end -# "release" a Connection, without returning to pod for re-use -# used for https proxy tunnel upgrades which shouldn't be reused -function kill!(c::Connection) - h = hashconn(c) - if haskey(POOL.conns, h) - pod = getpod(POOL, h) - @v1_3 lock(pod.conns) - try - decr!(pod) - finally - @v1_3 unlock(pod.conns) - end - end - return -end - -struct Pool - lock::ReentrantLock - conns::Dict{UInt, Pod} -end - """ POOL Global connection pool keeping track of active connections. """ -const POOL = Pool(ReentrantLock(), Dict{UInt, Pod}()) - -function getpod(pool::Pool, x) - lock(pool.lock) do - get!(() -> Pod(), pool.conns, x) - end -end +const POOL = Pool(Connection) """ getconnection(type, host, port) -> Connection @@ -544,7 +389,7 @@ end Find a reusable `Connection` in the `pool`, or create a new `Connection` if required. """ -function getconnection(::Type{Transaction{T}}, +function newconnection(::Type{T}, host::AbstractString, port::AbstractString; connection_limit::Int=default_connection_limit, @@ -552,83 +397,20 @@ function getconnection(::Type{Transaction{T}}, idle_timeout::Int=0, reuse_limit::Int=nolimit, require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"), - kw...)::Transaction{T} where T <: IO - pod = getpod(POOL, hashconn(T, host, port, pipeline_limit, require_ssl_verification, true)) - @v1_3 lock(pod.conns) - try - while isready(pod.conns) - conn = take!(pod.conns) - if isvalid(pod, conn, reuse_limit, pipeline_limit) - # this is a reuseable connection, so use it - @debug 2 "1 reusing connection: $conn" - return client_transaction(conn) - end - end - # If there are not too many connections to this host:port, - # create a new connection... - if pod.numactive < connection_limit - return newconnection(pod, T, host, port, pipeline_limit, - require_ssl_verification, idle_timeout; kw...) - end - # wait for a Connection to be released - while true - conn = take!(pod.conns) - if isvalid(pod, conn, reuse_limit, pipeline_limit) - # this is a reuseable connection, so use it - @debug 2 "2 reusing connection: $conn" - return client_transaction(conn) - elseif pod.numactive < connection_limit - return newconnection(pod, T, host, port, pipeline_limit, - require_ssl_verification, idle_timeout; kw...) - end - end - finally - @v1_3 unlock(pod.conns) - end -end - -function isvalid(pod, conn, reuse_limit, pipeline_limit) - # Close connections that have reached the reuse limit... - if reuse_limit != nolimit - if readcount(conn) >= reuse_limit && !readbusy(conn) - @debug 2 "💀 overuse: $conn" - close(conn.io) - end + kw...)::Transaction where {T <: IO} + conn = acquire( + POOL, + hashconn(T, host, port, require_ssl_verification, true); + max=connection_limit, + idle=idle_timeout, + reuse=reuse_limit) do + Connection(host, port, + idle_timeout, require_ssl_verification, + getconnection(T, host, port; + require_ssl_verification=require_ssl_verification, kw...) + ) end - # Close connections that have reached the timeout limit... - if conn.idle_timeout > 0 - if !isbusy(conn) && inactiveseconds(conn) > conn.idle_timeout - @debug 2 "💀 idle timeout: $conn" - close(conn.io) - end - end - # For closed connections, we decrease active count in pod, and "continue" - # which effectively drops the connection - if !isopen(conn.io) - close(conn) - lock(conn.closelock) do - if !conn.closed - conn.closed = true - decr!(pod) - end - end - return false - end - # If we've hit our pipeline_limit, can't use this one, but don't close - if (writecount(conn) - readcount(conn)) >= pipeline_limit + 1 - return false - end - - return !writebusy(conn) -end - -function newconnection(pod, T, host, port, pipeline_limit, require_ssl_verification, idle_timeout; kw...) - io = getconnection(T, host, port; - require_ssl_verification=require_ssl_verification, kw...) - c = Connection(host, port, pipeline_limit, idle_timeout, require_ssl_verification, io) - incr!(pod) - @debug 1 "🔗 New: $c" - return client_transaction(c) + return client_transaction(conn) end function keepalive!(tcp) @@ -746,15 +528,19 @@ function sslconnection(tcp::TCPSocket, host::AbstractString; return io end -function sslupgrade(t::Transaction{TCPSocket}, +function sslupgrade(t::Transaction, host::AbstractString; require_ssl_verification::Bool=NetworkOptions.verify_host(host, "SSL"), - kw...)::Transaction{SSLContext} + kw...)::Transaction + # first we release the original connection, but we don't want it to be + # reused in the pool, because we're hijacking the TCPSocket + release(POOL, hashconn(t.c), t.c; return_for_reuse=false) + # now we hijack the TCPSocket and upgrade to SSLContext tls = sslconnection(t.c.io, host; require_ssl_verification=require_ssl_verification, kw...) - c = Connection(tls; require_ssl_verification=require_ssl_verification) - kill!(t.c) + conn = Connection(host, "", 0, require_ssl_verification, tls) + c = acquire(() -> conn, POOL, hashconn(conn)) return client_transaction(c) end diff --git a/src/ConnectionRequest.jl b/src/ConnectionRequest.jl index 80de0c4b8..635f11dd4 100644 --- a/src/ConnectionRequest.jl +++ b/src/ConnectionRequest.jl @@ -79,19 +79,15 @@ function request(::Type{ConnectionPoolLayer{Next}}, url::URI, req, body; end end - IOType = ConnectionPool.Transaction{sockettype(url, socket_type)} + IOType = sockettype(url, socket_type) local io try - io = getconnection(IOType, url.host, url.port; + io = newconnection(IOType, url.host, url.port; reuse_limit=reuse_limit, kw...) catch e rethrow(isioerror(e) ? IOError(e, "during request($url)") : e) end - if io.sequence >= reuse_limit - defaultheader!(req, "Connection" => "close") - end - try if proxy !== nothing && target_url.scheme == "https" # tunnel request @@ -107,8 +103,7 @@ function request(::Type{ConnectionPoolLayer{Next}}, url::URI, req, body; r = request(Next, io, req, body; kw...) - if (io.sequence >= reuse_limit - || (proxy !== nothing && target_url.scheme == "https")) + if proxy !== nothing && target_url.scheme == "https" close(io) end @@ -132,7 +127,6 @@ function connect_tunnel(io, target_url, req) end request = Request("CONNECT", target, headers) writeheaders(io, request) - startread(io) readheaders(io, request.response) return request.response end diff --git a/src/connectionpools.jl b/src/connectionpools.jl new file mode 100644 index 000000000..5567fa7fb --- /dev/null +++ b/src/connectionpools.jl @@ -0,0 +1,233 @@ +module ConnectionPools + +export Pod, Pool, acquire, release + +import Base: acquire, release + +connectionid(x) = objectid(x) + +""" + ConnectionTracker(conn::T) + +Wraps a `Connection` of type `T`. +A `Connection` object must support the following interface: + * `isopen(x)`: check if a `Connection` object is still open and can be used + * `close(x)`: close a `Connection` object; `isopen(x)` should return false after calling `close` + * `ConnectionPools.connectionid(x)`: optional method to distinguish `Connection` objects from each other; by default, calls `objectid(x)`, which is valid for `mutable struct` objects + +The `idle` field is a timestamp to track when a `Connection` was returned to a `Pod` and became idle. + +The `count` field keeps track of how many times the connection has been used. +""" +mutable struct ConnectionTracker{T} + conn::T + idle::Float64 + count::Int +end + +ConnectionTracker(conn::T) where {T} = ConnectionTracker(conn, time(), 1) + +""" + Pod(T; max::Int, idle::Int, reuse::Int) + +A threadsafe object for managing a pool of and the reuse of `Connection` objects (see [`ConnectionTracker`](@ref)). + +A Pod manages a collection of `Connection`s and the following keyword arguments allow configuring the management thereof: + + * `max::Int=typemax(Int64)`: controls the max # of currently acquired `Connection`s allowed + * `idle::Int=typemax(Int64)`: controls the max # of seconds a `Connection` may be idle before it should be closed and not reused + * `reuse::Int=typemax(Int64)`: controls the max # of times a `Connection` may be reused before it should be closed + +After creating a `Pod`, `Connection`s can be acquired by calling [`acquire`](@ref) and MUST +be subsequently released by calling [`release`](@ref). +""" +struct Pod{T} + # this lock/condition protects the `conns` Vector and `active` Dict + # no changes to either field should be made without holding this lock + lock::Threads.Condition + conns::Vector{ConnectionTracker{T}} + active::Dict{Any, ConnectionTracker{T}} + max::Int + idle::Int + reuse::Int +end + +const MAX = typemax(Int64) + +function Pod(T; max::Int=MAX, idle::Int=MAX, reuse::Int=MAX) + return Pod(Threads.Condition(), ConnectionTracker{T}[], Dict{Any, ConnectionTracker{T}}(), max, idle, reuse) +end + +# check if an idle `Connection` is still valid to be reused +function isvalid(pod::Pod{C}, conn::ConnectionTracker{C}) where {C} + if (time() - conn.idle) > pod.idle + # if the connection has been idle too long, close it + close(conn.conn) + elseif conn.count >= pod.reuse + # if the connection has been used too many times, close it + close(conn.conn) + elseif isopen(conn.conn) + # otherwise, if the connection is open, this is a valid connection we can use! + return true + end + return false +end + +""" + acquire(f, pod::Pod{C}) -> C + +Check first for existing `Connection`s in a `Pod` still valid to reuse, +and if so, return one. If no existing `Connection` is available for reuse, +call the provided function `f()`, which must return a new connection instance of type `C`. +This new connection instance will be tracked by the `Pod` and MUST be returned to the `Pod` +after use by calling `release(pod, conn)`. +""" +function acquire(f, pod::Pod) + lock(pod.lock) + try + # if there are already connections in the pod that have been + # returned, let's check if they're still valid and can be used directly + while !isempty(pod.conns) + # Pod connections are FIFO, so grab the earliest returned connection + conn = popfirst!(pod.conns) + if isvalid(pod, conn) + # connection is valid! increment its usage count + # and move the ConnectionTracker to the `active` Dict tracker + conn.count += 1 + pod.active[connectionid(conn.conn)] = conn + return conn.conn + end + end + # There were no existing connections able to be reused + # If there are not too many already-active connections, create new + if length(pod.active) < pod.max + conn = ConnectionTracker(f()) + pod.active[connectionid(conn.conn)] = conn + return conn.conn + end + # If we reach here, there were no valid idle connections and too many + # currently-active connections, so we need to wait until a connection + # is released back to the Pod + while true + # this `wait` call will block on our Pod `lock` condition + # until a connection is `release`ed and the condition + # is notified + conn = wait(pod.lock) + if conn !== nothing + if isvalid(pod, conn) + # connection just returned to the Pod is valid and can be reused + conn.count += 1 + pod.active[connectionid(conn.conn)] = conn + return conn.conn + end + end + # if the Connection just returned to the Pod wasn't valid, the active + # count at least went down, so we should be able to create a new one + if length(pod.active) < pod.max + conn = ConnectionTracker(f()) + pod.active[connectionid(conn.conn)] = conn + return conn.conn + end + # If for some reason there were still too many active connections, let's + # start the loop back over waiting for idle connections to be returned + # Hey, we get it, writing threadsafe code can be hard + end + finally + unlock(pod.lock) + end +end + +function release(pod::Pod{C}, conn::C; return_for_reuse::Bool=true) where {C} + lock(pod.lock) + try + # We first want to look up this connection object in our + # Pod `active` Dict that tracks active connections + id = connectionid(conn) + # if, for some reason, it's not in our `active` tracking Dict + # then something is wrong; you're trying to release a `Connection` + # that this Pod currently doesn't think is active + if !haskey(pod.active, id) + error("invalid connection pool release call; each acquired connection should be `release`ed ONLY once") + end + cp_conn = pod.active[id] + # remove the ConnectionTracker from our `active` Dict tracker + delete!(pod.active, id) + if return_for_reuse + # reset the idle timestamp of the ConnectionTracker + cp_conn.idle = time() + # now we put the connection back in the pod idle queue + push!(pod.conns, cp_conn) + # and notify our Pod condition that a connection has been returned + # in order to "wake up" any `wait`ers looking for a new connection + notify(pod.lock, cp_conn) + else + notify(pod.lock, nothing) + end + finally + unlock(pod.lock) + end + return +end + +""" + Pool(T) + +A threadsafe convenience object for managing multiple [`Pod`](@ref)s of connections. +A `Pod` of reuseable connections will be looked up by the `key` when calling `acquire(f, pool, key)`. +""" +struct Pool{C} + lock::ReentrantLock + pods::Dict{Any, Pod{C}} +end + +Pool(C) = Pool(ReentrantLock(), Dict{Any, Pod{C}}()) + +""" + acquire(f, pool::Pool{C}, key; max::Int, idle::Int, reuse::Int) -> C + +Get a connection from a `pool`, looking up a `Pod` of reuseable connections +by the provided `key`. If no `Pod` exists for the given key yet, one will be +created and passed the `max`, `idle`, and `reuse` keyword arguments if provided. +The provided function `f` must create a new connection instance of type `C`. +The acquired connection MUST be returned to the pool by calling `release(pool, key, conn)` exactly once. +""" +function acquire(f, pool::Pool{C}, key; kw...) where {C} + pod = lock(pool.lock) do + get!(() -> Pod(C; kw...), pool.pods, key) + end + return acquire(f, pod) +end + +""" + release(pool::Pool{C}, key, conn::C) + +Return an acquired connection to a `pool` with the same `key` provided when it was acquired. +""" +function release(pool::Pool{C}, key, conn::C; kw...) where {C} + pod = lock(pool.lock) do + pool.pods[key] + end + release(pod, conn; kw...) + return +end + +function reset!(pool::Pool) + lock(pool.lock) do + for pod in values(pool.pods) + lock(pod.lock) do + foreach(pod.conns) do conn + close(conn.conn) + end + empty!(pod.conns) + for conn in values(pod.active) + close(conn.conn) + end + empty!(pod.active) + end + end + empty!(pool.pods) + end + return +end + +end # module diff --git a/test/runtests.jl b/test/runtests.jl index 08f16e3a5..3e831cbcc 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,7 +1,7 @@ using Test, HTTP, JSON const dir = joinpath(dirname(pathof(HTTP)), "..", "test") -include("resources/TestRequest.jl") +include(joinpath(dir, "resources/TestRequest.jl")) @testset "HTTP" begin for f in [