Skip to content

Commit

Permalink
Merge pull request #38 from treasure-data/ensure_pool_shutdown
Browse files Browse the repository at this point in the history
Ensure to shutdown thread pool
  • Loading branch information
tvhung83 authored Jul 19, 2017
2 parents 53ed5f1 + 5ef77ca commit d941a0c
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 78 deletions.
157 changes: 79 additions & 78 deletions lib/embulk/input/zendesk/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def httpclient
end
end

def get_pool
def create_pool
Concurrent::ThreadPoolExecutor.new(
min_threads: 10,
max_threads: 100,
Expand Down Expand Up @@ -69,36 +69,25 @@ def validate_target
end

# they have both Incremental API and non-incremental API
%w(tickets users organizations).each do |target|
# 170717: `ticket_events` can use standard endpoint format now, ie. `<target>.json`
%w(tickets ticket_events users organizations).each do |target|
define_method(target) do |partial = true, start_time = 0, &block|
# Always use incremental_export. There is some difference between incremental_export and export.
incremental_export("/api/v2/incremental/#{target}.json", target, start_time, [], partial, &block)
end
end

# they have incremental API only
%w(ticket_events).each do |target|
define_method(target) do |partial = true, start_time = 0, &block|
path = "/api/v2/incremental/#{target}"
incremental_export(path, target, start_time, [], partial, &block)
end
end

# Ticket metrics will need to be export using both the non incremental and incremental on ticket
# We provide support by filter out ticket_metrics with created at smaller than start time
# while passing the incremental start time to the incremental ticket/ticket_metrics export
%w(ticket_metrics).each do |target|
define_method(target) do |partial = true, start_time = 0, &block|
path = "/api/v2/incremental/tickets.json"
if partial
path = "/api/v2/#{target}.json"
# If partial export then we need to use the old end point. Since new end point return both ticket and
# ticket metric with ticket come first so the current approach that cut off the response packet won't work
# Since partial is only use for preview and guess so this should be fine
export(path, target, &block)
else
incremental_export(path, "metric_sets", start_time, [], partial,{include: "metric_sets"}, &block)
end
define_method('ticket_metrics') do |partial = true, start_time = 0, &block|
if partial
# If partial export then we need to use the old end point. Since new end point return both ticket and
# ticket metric with ticket come first so the current approach that cut off the response packet won't work
# Since partial is only use for preview and guess so this should be fine
export('/api/v2/ticket_metrics.json', 'ticket_metrics', &block)
else
incremental_export('/api/v2/incremental/tickets.json', 'metric_sets', start_time, [], partial, { include: 'metric_sets' }, &block)
end
end

Expand Down Expand Up @@ -139,24 +128,23 @@ def export_parallel(path, key, start_time = 0, &block)

first_fetched[key].uniq { |r| r['id'] }.each do |record|
block.call record
# known_ticket_ids: collect fetched ticket IDs, to exclude in next step
end

pool = get_pool
(2..last_page_num).each do |page|
pool.post do
response = request(path, per_page: per_page, page: page)
fetched_records = extract_records_from_response(response, key)
Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}"
fetched_records.uniq { |r| r['id'] }.each do |record|
block.call record
retryer.with_retry do
execute_thread_pool do |pool|
(2..last_page_num).each do |page|
pool.post do
response = request(path, per_page: per_page, page: page)
fetched_records = extract_records_from_response(response, key)
Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}"
fetched_records.uniq { |r| r['id'] }.each do |record|
block.call record
end
end
end
end
end

pool.shutdown
pool.wait_for_termination

nil # this is necessary different with incremental_export
end

Expand All @@ -177,60 +165,57 @@ def export(path, key, page = 1, &block)
end
end

def incremental_export(path, key, start_time = 0, known_ids = [], partial = true,query = {}, &block)
def incremental_export(path, key, start_time = 0, known_ids = [], partial = true, query = {}, &block)
query.merge!(start_time: start_time)
if partial
records = request_partial(path, query.merge({start_time: start_time})).first(5)
records = request_partial(path, query).first(5)
records.uniq{|r| r["id"]}.each do |record|
block.call record
end
return
end

pool = get_pool
last_data = loop do
start_fetching = Time.now
response = request(path, query.merge({start_time: start_time}))
begin
data = JSON.parse(response.body)
rescue => e
raise Embulk::DataError.new(e)
end
actual_fetched = 0
records = data[key]
records.each do |record|
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#excluding-system-updates
# "generated_timestamp" will be updated when Zendesk internal changing
# "updated_at" will be updated when ticket data was changed
# start_time for query parameter will be processed on Zendesk with generated_timestamp,
# but it was calculated by record' updated_at time.
# So the doesn't changed record from previous import would be appear by Zendesk internal changes.
# We ignore record that has updated_at <= start_time
if start_time && record["generated_timestamp"] && record["updated_at"]
updated_at = Time.parse(record["updated_at"])
next if updated_at <= Time.at(start_time)
end

# de-duplicated records.
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes
# https://github.com/zendesk/zendesk_api_client_rb/issues/251
next if known_ids.include?(record["id"])
retryer.with_retry do
execute_thread_pool do |pool|
loop do
start_fetching = Time.now
response = request(path, query)
actual_fetched = 0
data = JSON.parse(response.body)
# no key found in response occasionally => retry
raise TempError, "No '#{key}' found in JSON response" unless data.key? key
data[key].each do |record|
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#excluding-system-updates
# "generated_timestamp" will be updated when Zendesk internal changing
# "updated_at" will be updated when ticket data was changed
# start_time for query parameter will be processed on Zendesk with generated_timestamp,
# but it was calculated by record' updated_at time.
# So the doesn't changed record from previous import would be appear by Zendesk internal changes.
# We ignore record that has updated_at <= start_time
if start_time && record["generated_timestamp"] && record["updated_at"]
updated_at = Time.parse(record["updated_at"])
next if updated_at <= Time.at(start_time)
end

# de-duplicated records.
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#usage-notes
# https://github.com/zendesk/zendesk_api_client_rb/issues/251
next if known_ids.include?(record["id"])

known_ids << record["id"]
pool.post { block.call record }
actual_fetched += 1
end
Embulk.logger.info "Fetched #{actual_fetched} records from start_time:#{start_time} (#{Time.at(start_time)}) within #{Time.now.to_i - start_fetching.to_i} seconds"
start_time = data["end_time"]

known_ids << record["id"]
pool.post { yield(record) }
actual_fetched += 1
# NOTE: If count is less than 1000, then stop paginating.
# Otherwise, use the next_page URL to get the next page of results.
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#pagination
break data if data["count"] < 1000
end
end
Embulk.logger.info "Fetched #{actual_fetched} records from start_time:#{start_time} (#{Time.at(start_time)}) within #{Time.now.to_i - start_fetching.to_i} seconds"
start_time = data["end_time"]

# NOTE: If count is less than 1000, then stop paginating.
# Otherwise, use the next_page URL to get the next page of results.
# https://developer.zendesk.com/rest_api/docs/core/incremental_export#pagination
break data if data["count"] < 1000
end

pool.shutdown
pool.wait_for_termination
last_data
end

def extract_records_from_response(response, key)
Expand Down Expand Up @@ -387,6 +372,22 @@ def handle_response(status_code, headers, body)
end
end

def execute_thread_pool(&block)
pool = create_pool
block.call pool
rescue TempError => t
raise t
rescue => e
raise Embulk::DataError.new(e)
ensure
Embulk.logger.debug 'ThreadPool shutting down...'
pool.shutdown
pool.wait_for_termination
Embulk.logger.debug "ThreadPool shutdown? #{pool.shutdown?}"
end
end

class TempError < StandardError
end
end
end
Expand Down
57 changes: 57 additions & 0 deletions test/embulk/input/zendesk/test_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -572,6 +572,63 @@ def stub_response(status, headers = [], body_json = nil)
end
end

sub_test_case "ensure thread pool is shutdown with/without errors, retry for TempError" do
def client
@client ||= Client.new(login_url: login_url, auth_method: "oauth", access_token: access_token, retry_limit: 1, retry_initial_wait_sec: 0)
end

setup do
stub(Embulk).logger { Logger.new(File::NULL) }
@httpclient = client.httpclient
stub(client).httpclient { @httpclient }
@pool = Concurrent::ThreadPoolExecutor.new
stub(client).create_pool { @pool }
end
test "should shutdown pool - without error" do
@httpclient.test_loopback_http_response << [
"HTTP/1.1 200",
"Content-Type: application/json",
"",
{
ticket_fields: [{ id: 1 }],
count: 1
}.to_json
].join("\r\n")
handler = proc { }
client.ticket_fields(false, &handler)
assert_equal(true, @pool.shutdown?)
end

test "should shutdown pool - with TempError (retry)" do
response = [
"HTTP/1.1 200",
"Content-Type: application/json",
"",
{ }.to_json # no required key: `tickets`, raise TempError
].join("\r\n")
@httpclient.test_loopback_http_response << response
@httpclient.test_loopback_http_response << response # retry 1
assert_raise(TempError) do
client.tickets(false)
end
assert_equal(true, @pool.shutdown?)
end

test "should shutdown pool - with DataError (no retry)" do
response = [
"HTTP/1.1 400", # unhandled error, wrapped in DataError
"Content-Type: application/json",
"",
{ }.to_json
].join("\r\n")
@httpclient.test_loopback_http_response << response
assert_raise(DataError) do
client.tickets(false)
end
assert_equal(true, @pool.shutdown?)
end
end

def login_url
"http://example.com"
end
Expand Down

0 comments on commit d941a0c

Please sign in to comment.