Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for making a request and receiving the response as a stream. #983

Merged
merged 3 commits into from
Jun 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions lib/stripe/api_operations/request.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,28 @@ module Request
module ClassMethods
def execute_resource_request(method, url,
params = {}, opts = {})
execute_resource_request_internal(
:execute_request, method, url, params, opts
)
end

def execute_resource_request_stream(method, url,
params = {}, opts = {},
&read_body_chunk_block)
execute_resource_request_internal(
:execute_request_stream,
method,
url,
params,
opts,
&read_body_chunk_block
)
end

private def execute_resource_request_internal(client_request_method_sym,
method, url,
params, opts,
&read_body_chunk_block)
params ||= {}

error_on_invalid_params(params)
Expand All @@ -22,10 +44,12 @@ def execute_resource_request(method, url,
client = headers.delete(:client)
# Assume all remaining opts must be headers

resp, opts[:api_key] = client.execute_request(
resp, opts[:api_key] = client.send(
client_request_method_sym,
method, url,
api_base: api_base, api_key: api_key,
headers: headers, params: params
headers: headers, params: params,
&read_body_chunk_block
)

# Hash#select returns an array before 1.9
Expand Down Expand Up @@ -89,6 +113,15 @@ def self.included(base)
self.class.execute_resource_request(method, url, params, opts)
end

protected def execute_resource_request_stream(method, url,
params = {}, opts = {},
&read_body_chunk_block)
opts = @opts.merge(Util.normalize_opts(opts))
self.class.execute_resource_request_stream(
method, url, params, opts, &read_body_chunk_block
)
end

# See notes on `alias` above.
alias request execute_resource_request
end
Expand Down
8 changes: 8 additions & 0 deletions lib/stripe/api_resource.rb
Original file line number Diff line number Diff line change
Expand Up @@ -115,5 +115,13 @@ def self.retrieve(id, opts = {})
Util.convert_to_stripe_object(resp.data, opts)
end
end

protected def request_stream(method:, path:, params:, opts: {},
&read_body_chunk_block)
resp, = execute_resource_request_stream(
method, path, params, opts, &read_body_chunk_block
)
resp
end
end
end
19 changes: 17 additions & 2 deletions lib/stripe/connection_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ def connection_for(uri)

# Executes an HTTP request to the given URI with the given method. Also
# allows a request body, headers, and query string to be specified.
def execute_request(method, uri, body: nil, headers: nil, query: nil)
def execute_request(method, uri, body: nil, headers: nil, query: nil,
&block)
# Perform some basic argument validation because it's easy to get
# confused between strings and hashes for things like body and query
# parameters.
Expand All @@ -92,8 +93,22 @@ def execute_request(method, uri, body: nil, headers: nil, query: nil)
u.path
end

method_name = method.to_s.upcase
has_response_body = method_name != "HEAD"
request = Net::HTTPGenericRequest.new(
method_name,
(body ? true : false),
has_response_body,
path,
headers
)

@mutex.synchronize do
connection.send_request(method.to_s.upcase, path, body, headers)
# The block parameter is special here. If a block is provided, the block
# is invoked with the Net::HTTPResponse. However, the body will not have
# been read yet in the block, and can be streamed by calling
# HTTPResponse#read_body.
connection.request(request, body, &block)
end
end

Expand Down
177 changes: 120 additions & 57 deletions lib/stripe/stripe_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -213,62 +213,9 @@ def request

def execute_request(method, path,
api_base: nil, api_key: nil, headers: {}, params: {})
raise ArgumentError, "method should be a symbol" \
unless method.is_a?(Symbol)
raise ArgumentError, "path should be a string" \
unless path.is_a?(String)

api_base ||= config.api_base
api_key ||= config.api_key
params = Util.objects_to_ids(params)

check_api_key!(api_key)

body_params = nil
query_params = nil
case method
when :get, :head, :delete
query_params = params
else
body_params = params
end

query_params, path = merge_query_params(query_params, path)

headers = request_headers(api_key, method)
.update(Util.normalize_headers(headers))
url = api_url(path, api_base)

# Merge given query parameters with any already encoded in the path.
query = query_params ? Util.encode_parameters(query_params) : nil

# Encoding body parameters is a little more complex because we may have
# to send a multipart-encoded body. `body_log` is produced separately as
# a log-friendly variant of the encoded form. File objects are displayed
# as such instead of as their file contents.
body, body_log =
body_params ? encode_body(body_params, headers) : [nil, nil]

# stores information on the request we're about to make so that we don't
# have to pass as many parameters around for logging.
context = RequestLogContext.new
context.account = headers["Stripe-Account"]
context.api_key = api_key
context.api_version = headers["Stripe-Version"]
context.body = body_log
context.idempotency_key = headers["Idempotency-Key"]
context.method = method
context.path = path
context.query = query

http_resp = execute_request_with_rescues(method, api_base, context) do
self.class
.default_connection_manager(config)
.execute_request(method, url,
body: body,
headers: headers,
query: query)
end
http_resp, api_key = execute_request_internal(
method, path, api_base, api_key, headers, params
)

begin
resp = StripeResponse.from_net_http(http_resp)
Expand All @@ -284,6 +231,38 @@ def execute_request(method, path,
[resp, api_key]
end

# Executes a request and returns the body as a stream instead of converting
# it to a StripeObject. This should be used for any request where we expect
# an arbitrary binary response.
#
# A `read_body_chunk` block can be passed, which will be called repeatedly
# with the body chunks read from the socket.
#
# If a block is passed, a StripeHeadersOnlyResponse is returned as the
# block is expected to do all the necessary body processing. If no block is
# passed, then a StripeStreamResponse is returned containing an IO stream
# with the response body.
def execute_request_stream(method, path,
api_base: nil, api_key: nil,
headers: {}, params: {},
&read_body_chunk_block)
unless block_given?
raise ArgumentError,
"execute_request_stream requires a read_body_chunk_block"
end

http_resp, api_key = execute_request_internal(
method, path, api_base, api_key, headers, params, &read_body_chunk_block
)

# When the read_body_chunk_block is given, we no longer have access to the
# response body at this point and so return a response object containing
# only the headers. This is because the body was consumed by the block.
resp = StripeHeadersOnlyResponse.from_net_http(http_resp)

[resp, api_key]
end

def store_last_response(object_id, resp)
return unless last_response_has_key?(object_id)

Expand Down Expand Up @@ -451,6 +430,83 @@ def self.maybe_gc_connection_managers
pruned_contexts.count
end

private def execute_request_internal(method, path,
api_base, api_key, headers, params,
&read_body_chunk_block)
raise ArgumentError, "method should be a symbol" \
unless method.is_a?(Symbol)
raise ArgumentError, "path should be a string" \
unless path.is_a?(String)

api_base ||= config.api_base
api_key ||= config.api_key
params = Util.objects_to_ids(params)

check_api_key!(api_key)

body_params = nil
query_params = nil
case method
when :get, :head, :delete
query_params = params
else
body_params = params
end

query_params, path = merge_query_params(query_params, path)

headers = request_headers(api_key, method)
.update(Util.normalize_headers(headers))
url = api_url(path, api_base)

# Merge given query parameters with any already encoded in the path.
query = query_params ? Util.encode_parameters(query_params) : nil

# Encoding body parameters is a little more complex because we may have
# to send a multipart-encoded body. `body_log` is produced separately as
# a log-friendly variant of the encoded form. File objects are displayed
# as such instead of as their file contents.
body, body_log =
body_params ? encode_body(body_params, headers) : [nil, nil]

# stores information on the request we're about to make so that we don't
# have to pass as many parameters around for logging.
context = RequestLogContext.new
context.account = headers["Stripe-Account"]
context.api_key = api_key
context.api_version = headers["Stripe-Version"]
context.body = body_log
context.idempotency_key = headers["Idempotency-Key"]
context.method = method
context.path = path
context.query = query

# A block can be passed in to read the content directly from the response.
# We want to execute this block only when the response was actually
# successful. When it wasn't, we defer to the standard error handling as
# we have to read the body and parse the error JSON.
response_block =
if block_given?
lambda do |response|
unless should_handle_as_error(response.code.to_i)
response.read_body(&read_body_chunk_block)
end
end
end

http_resp = execute_request_with_rescues(method, api_base, context) do
self.class
.default_connection_manager(config)
.execute_request(method, url,
body: body,
headers: headers,
query: query,
&response_block)
end

[http_resp, api_key]
end

private def api_url(url = "", api_base = nil)
(api_base || config.api_base) + url
end
Expand Down Expand Up @@ -490,6 +546,7 @@ def self.maybe_gc_connection_managers
# that's more condusive to logging.
flattened_params =
flattened_params.map { |k, v| [k, v.is_a?(String) ? v : v.to_s] }.to_h

else
body = Util.encode_parameters(body_params)
end
Expand All @@ -503,6 +560,10 @@ def self.maybe_gc_connection_managers
[body, body_log]
end

private def should_handle_as_error(http_status)
http_status >= 400
end

private def execute_request_with_rescues(method, api_base, context)
num_retries = 0

Expand All @@ -520,7 +581,9 @@ def self.maybe_gc_connection_managers
http_status = resp.code.to_i
context = context.dup_from_response_headers(resp)

handle_error_response(resp, context) if http_status >= 400
if should_handle_as_error(http_status)
handle_error_response(resp, context)
end

log_response(context, request_start, http_status, resp.body)
notify_request_end(context, request_duration, http_status,
Expand Down
Loading