Skip to content

Commit

Permalink
Merge #396
Browse files Browse the repository at this point in the history
396: Alternate Downloads.jl-based backend (alternative to HTTP.jl) r=mattBrzezinski a=ericphanson

I was running into issues with HTTP when trying to make many concurrent requests to s3, so I thought I'd try libcurl through the Downloads.jl stdlib. Turned out to be pretty easy to at least get something basic working, e.g. I can pull down a test file from s3 with no problems with this little bit of code-- and you can even pass a `Downloads.Downloader` along to have more control over the client. I haven't had a chance to do more thorough testing, but I thought I'd put the PR up so maybe we can see what CI says to test out the Downloads-based backend. (Right now Downloads.jl is set to the default in this PR for testing purposes, but I think we should keep HTTP.jl as the default unless it really is a big upgrade).

Co-authored-by: Eric Hanson <5846501+ericphanson@users.noreply.github.com>
Co-authored-by: Chris de Graaf <me@cdg.dev>
  • Loading branch information
3 people authored Aug 13, 2021
2 parents 6ce7fe5 + 1c6c1bf commit 768bb57
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 15 deletions.
5 changes: 3 additions & 2 deletions .github/workflows/CI.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
test:
name: Julia ${{ matrix.version }} - ${{ matrix.os }} - ${{ matrix.arch }}
runs-on: ${{ matrix.os }}
timeout-minutes: 30
continue-on-error: ${{ matrix.version == 'nightly' }}
strategy:
fail-fast: false
Expand All @@ -25,9 +26,9 @@ jobs:
arch:
- x64
include:
# Add a 1.0 job just to make sure we still support it
# Add a job using the earliest version of Julia supported by this package
- os: ubuntu-latest
version: 1.0.5
version: 1.3
arch: x64
# Add a 1.5 job because that's what Invenia actually uses
- os: ubuntu-latest
Expand Down
3 changes: 2 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ version = "1.56.0"
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
Compat = "34da2185-b29b-5c13-b0c7-acf172513d20"
Dates = "ade2ca70-3891-5945-98fb-dc099432e06a"
Downloads = "f43a241f-c20a-4ad4-852c-f6b1247861c6"
GitHub = "bc5e4493-9b4d-5f90-b8aa-2b2bcaad7a26"
HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3"
IniFile = "83e8ac13-25f8-5344-8a64-a9f2b223428f"
Expand All @@ -32,7 +33,7 @@ OrderedCollections = "1"
Retry = "0.3, 0.4"
URIs = "1"
XMLDict = "0.3, 0.4"
julia = "1"
julia = "1.3"

[extras]
Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f"
Expand Down
2 changes: 2 additions & 0 deletions src/AWS.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module AWS
using Compat: Compat, @something
using Base64
using Dates
using Downloads: Downloads, Downloader, Curl
using HTTP
using MbedTLS
using Mocking
Expand Down Expand Up @@ -33,6 +34,7 @@ include("AWSMetadata.jl")

include(joinpath("utilities", "request.jl"))
include(joinpath("utilities", "sign.jl"))
include(joinpath("utilities", "downloads_backend.jl"))


using ..AWSExceptions
Expand Down
2 changes: 1 addition & 1 deletion src/AWSCredentials.jl
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ function credentials_from_webtoken()
role_creds["SessionToken"],
assumed_role_user["Arn"];
expiry=DateTime(rstrip(role_creds["Expiration"], 'Z')),
renew=credentials_from_webtoken
renew=credentials_from_webtoken,
)
end

Expand Down
115 changes: 115 additions & 0 deletions src/utilities/downloads_backend.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
struct DownloadsBackend <: AWS.AbstractBackend
downloader::Union{Nothing, Downloads.Downloader}
end

DownloadsBackend() = DownloadsBackend(nothing)

const AWS_DOWNLOADER = Ref{Union{Nothing, Downloader}}(nothing)
const AWS_DOWNLOAD_LOCK = ReentrantLock()

# Here we mimic Download.jl's own setup for using a global downloader.
# We do this to have our own downloader (separate from Downloads.jl's global downloader)
# because we add a hook to avoid redirects in order to try to match the HTTPBackend's
# implementation, and we don't want to mutate the global downloader from Downloads.jl.
# https://github.com/JuliaLang/Downloads.jl/blob/84e948c02b8a0625552a764bf90f7d2ee97c949c/src/Downloads.jl#L293-L301
function get_downloader(downloader=nothing)
lock(AWS_DOWNLOAD_LOCK) do
yield() # let other downloads finish
downloader isa Downloader && return
while true
downloader = AWS_DOWNLOADER[]
downloader isa Downloader && return
D = Downloader()
D.easy_hook = (easy, info) -> Curl.setopt(easy, Curl.CURLOPT_FOLLOWLOCATION, false)
AWS_DOWNLOADER[] = D
end
end
return downloader
end

# https://github.com/JuliaWeb/HTTP.jl/blob/2a03ca76376162ffc3423ba7f15bd6d966edff9b/src/MessageRequest.jl#L84-L85
body_length(x::AbstractVector{UInt8}) = length(x)
body_length(x::AbstractString) = sizeof(x)

read_body(x::IOBuffer) = take!(x)
function read_body(x::IO)
close(x)
return read(x)
end

function _http_request(backend::DownloadsBackend, request)
# If we pass `output`, older versions of Downloads.jl will
# expect a message body in the response. Specifically, it sets
# <https://curl.se/libcurl/c/CURLOPT_NOBODY.html>
# only when we do not pass the `output` argument.
# See <https://github.com/JuliaLang/Downloads.jl/issues/131>.
#
# When the method is `HEAD`, the response may have a Content-Length
# but not send any content back (which appears to be correct,
# <https://stackoverflow.com/a/18925736/12486544>).
#
# Thus, if we did not set `CURLOPT_NOBODY`, and it gets a Content-Length
# back, it will hang waiting for that body.
#
# Therefore, we do not pass an `output` when the `request_method` is `HEAD`.
# (Note: this is fixed on the latest Downloads.jl, but we include this workaround
# for compatability).
if request.response_stream === nothing
request.response_stream = IOBuffer()
end
output_arg = if request.request_method == "HEAD"
NamedTuple()
else
(; output=request.response_stream)
end

# If we're going to return the stream, we don't want to read the body into an
# HTTP.Response we're never going to use. If we do that, the returned stream
# will have no data available (and reading from it could hang forever).
body_arg = if request.request_method == "HEAD" || request.return_stream
() -> NamedTuple()
else
() -> (; body = read_body(request.response_stream))
end

# HTTP.jl sets this header automatically.
request.headers["Content-Length"] = string(body_length(request.content))

# We pass an `input` only when we have content we wish to send.
input = IOBuffer()
input_arg = if !isempty(request.content)
write(input, request.content)
(; input=input)
else
NamedTuple()
end

@repeat 4 try
downloader = @something(backend.downloader, get_downloader())
# set the hook so that we don't follow redirects. Only
# need to do this on per-request downloaders, because we
# set our global one with this hook already.
if backend.downloader !== nothing && downloader.easy_hook === nothing
downloader.easy_hook = (easy, info) -> Curl.setopt(easy, Curl.CURLOPT_FOLLOWLOCATION, false)
end

# We seekstart on every attempt, otherwise every attempt
# but the first will send an empty payload.
seekstart(input)

response = Downloads.request(request.url; input_arg..., output_arg...,
method = request.request_method,
headers = request.headers, verbose=false, throw=true,
downloader=downloader)

http_response = HTTP.Response(response.status, response.headers; body_arg()..., request=nothing)

if HTTP.iserror(http_response)
target = HTTP.resource(HTTP.URI(request.url))
throw(HTTP.StatusError(http_response.status, request.request_method, target, http_response))
end
return http_response
catch e
@delay_retry if ((isa(e, HTTP.StatusError) && AWS._http_status(e) >= 500) || isa(e, Downloads.RequestError)) end
end
end
14 changes: 10 additions & 4 deletions test/AWS.jl
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,16 @@ end
end

@testset "set user agent" begin
old_user_agent = AWS.user_agent[]
new_user_agent = "new user agent"

@test AWS.user_agent[] == "AWS.jl/1.0.0"
set_user_agent(new_user_agent)
@test AWS.user_agent[] == new_user_agent
try
@test AWS.user_agent[] == "AWS.jl/1.0.0"
set_user_agent(new_user_agent)
@test AWS.user_agent[] == new_user_agent
finally
set_user_agent(old_user_agent)
end
end

@testset "sign" begin
Expand Down Expand Up @@ -365,6 +370,7 @@ end
api_version="api_version",
request_method="GET",
url="https://s3.us-east-1.amazonaws.com/sample-bucket",
backend=AWS.HTTPBackend(),
)
apply(Patches._http_options_patch) do
# No default options
Expand Down Expand Up @@ -393,7 +399,7 @@ end
api_version="api_version",
request_method="GET",
url="https://s3.us-east-1.amazonaws.com/sample-bucket",
backend = TestBackend(4)
backend = TestBackend(4),
)
@test AWS._http_request(request.backend, request) == 4

Expand Down
2 changes: 1 addition & 1 deletion test/AWSCredentials.jl
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ end
@test result.user_arn == "$(role_arn)/$(session_name)"
@test result.renew == credentials_from_webtoken
expiry = result.expiry

sleep(0.1)
result = check_credentials(result)

@test result.access_key_id == access_key
Expand Down
6 changes: 5 additions & 1 deletion test/issues.jl
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ end
# => BUG: header `response_stream` is pushed into the query...
io = Base.BufferStream()
S3.get_object(bucket_name, file_name, Dict("response_stream"=>io, "return_stream"=>true))
@test String(read(io)) == body
if bytesavailable(io) > 0
@test String(readavailable(io)) == body
else
@test "no body data was available" == body
end

finally
S3.delete_object(bucket_name, file_name)
Expand Down
15 changes: 10 additions & 5 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,20 @@ function _now_formatted()
end

@testset "AWS.jl" begin
include("AWS.jl")
include("AWSCredentials.jl")
include("AWSExceptions.jl")
include("AWSMetadataUtilities.jl")
include("issues.jl")
include("test_pkg.jl")
include("utilities.jl")

if haskey(ENV, "TEST_MINIO")
include("minio.jl")
backends = [AWS.HTTPBackend, AWS.DownloadsBackend]
@testset "Backend: $(nameof(backend))" for backend in backends
AWS.DEFAULT_BACKEND[] = backend()
include("AWS.jl")
include("AWSCredentials.jl")
include("issues.jl")

if haskey(ENV, "TEST_MINIO")
include("minio.jl")
end
end
end

0 comments on commit 768bb57

Please sign in to comment.