Skip to content

Commit

Permalink
Merge pull request #49 from treasure-data/disable_paging_for_non_incr…
Browse files Browse the repository at this point in the history
…emental_endpoints

Disable paging for non-incremental endpoints
  • Loading branch information
tvhung83 authored Jan 25, 2019
2 parents 5e026ca + 479c030 commit 1dc9cdf
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 24 deletions.
2 changes: 1 addition & 1 deletion .ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
jruby-9.1.5.0
jruby-9.1.17.0
35 changes: 18 additions & 17 deletions lib/embulk/input/zendesk/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ def validate_app_marketplace

# they have non-incremental API only
UNAVAILABLE_INCREMENTAL_EXPORT.each do |target|
define_method(target) do |partial = true, start_time = 0, &block|
define_method(target) do |partial = true, start_time = 0, dedup = true, &block|
path = "/api/v2/#{target}.json"
if partial
export(path, target, &block)
else
export_parallel(path, target, start_time, &block)
export_parallel(path, target, start_time, dedup, false, &block)
end
end
end
Expand All @@ -129,26 +129,27 @@ def fetch_subresource(record_id, base, target)

private

def export_parallel(path, key, start_time = 0, &block)
def export_parallel(path, key, start_time = 0, dedup = true, paging = true, &block)
per_page = 100 # 100 is maximum https://developer.zendesk.com/rest_api/docs/core/introduction#pagination
first_response = request(path, false, per_page: per_page, page: 1)
first_fetched = JSON.parse(first_response.body)
total_count = first_fetched["count"]
last_page_num = (total_count / per_page.to_f).ceil
Embulk.logger.info "#{key} records=#{total_count} last_page=#{last_page_num}"

first_fetched[key].uniq { |r| r['id'] }.each do |record|
block.call record
end

execute_thread_pool do |pool|
(2..last_page_num).each do |page|
pool.post do
response = request(path, false, per_page: per_page, page: page)
fetched_records = extract_records_from_response(response, key)
Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}"
fetched_records.uniq { |r| r['id'] }.each do |record|
block.call record
Embulk.logger.info "#{key} records=#{total_count} last_page=#{paging ? last_page_num : 1}"

handler = lambda { |records| records.each { |r| block.call r } }
handler.call(dedup ? first_fetched[key].uniq { |r| r['id'] } : first_fetched[key])

# stop if endpoints have no pagination, ie. API returns all records
# `ticket_fields`, `ticket_forms`
if paging
execute_thread_pool do |pool|
(2..last_page_num).each do |page|
pool.post do
response = request(path, false, per_page: per_page, page: page)
fetched_records = extract_records_from_response(response, key)
Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}"
handler.call(dedup ? fetched_records.uniq { |r| r['id'] } : fetched_records)
end
end
end
Expand Down
41 changes: 35 additions & 6 deletions test/embulk/input/zendesk/test_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ def client
"",
{
ticket_metrics: records,
next_page: "https://treasuredata.zendesk.com/api/v2/incremental/tickets
.json?include=metric_sets&start_time=1488535542",
next_page: "https://treasuredata.zendesk.com/api/v2/incremental/tickets.json?include=metric_sets&start_time=1488535542",
}.to_json
].join("\r\n")

Expand Down Expand Up @@ -299,7 +298,7 @@ def client

test "invoke export when partial=false" do
# Added default `start_time`
mock(client).export_parallel(anything, "ticket_fields", 0)
mock(client).export_parallel(anything, "ticket_fields", 0, true, false) # new args: `dedup`, `paging`
client.ticket_fields(false)
end
end
Expand All @@ -312,10 +311,40 @@ def client

test "invoke export when partial=false" do
# Added default `start_time`
mock(client).export_parallel(anything, "ticket_forms", 0)
mock(client).export_parallel(anything, "ticket_forms", 0, true, false) # new args: `dedup`, `paging`)
client.ticket_forms(false)
end
end

sub_test_case "no pagination" do
data("ticket_fields", "ticket_fields")
data("ticket_forms", "ticket_forms")
test "non-incremental targets" do |target|
response = [
"HTTP/1.1 200",
"Content-Type: application/json",
"",
{
target => 200.times.map{|n| {"id" => n}},
count: 200,
next_page: nil,
previous_page: nil,
}.to_json
].join("\r\n")

# mock multiple responses, to simulate real API behavior
@httpclient.test_loopback_http_response << response
@httpclient.test_loopback_http_response << response
counter = Concurrent::AtomicFixnum.new(0)
handler = proc { counter.increment }
# validate expected target
proxy(@httpclient).get("#{login_url}/api/v2/#{target}.json",anything,anything)
# (`partial`, `start_time`, `default`, `block`)
client.public_send(target, false, 0, true, &handler)
# only ingest 200 records
assert_equal(200, counter.value)
end
end
end


Expand Down Expand Up @@ -629,12 +658,12 @@ def client
"Content-Type: application/json",
"",
{
ticket_fields: [{ id: 1 }],
tickets: [{ id: 1 }],
count: 1
}.to_json
].join("\r\n")
handler = proc { }
client.ticket_fields(false, &handler)
client.tickets(false, &handler)
assert_equal(true, @pool.shutdown?)
end

Expand Down

0 comments on commit 1dc9cdf

Please sign in to comment.