Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable paging for non-incremental endpoints #49

Merged
merged 4 commits into from
Jan 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ruby-version
Original file line number Diff line number Diff line change
@@ -1 +1 @@
jruby-9.1.5.0
jruby-9.1.17.0
35 changes: 18 additions & 17 deletions lib/embulk/input/zendesk/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,12 @@ def validate_app_marketplace

# they have non-incremental API only
UNAVAILABLE_INCREMENTAL_EXPORT.each do |target|
define_method(target) do |partial = true, start_time = 0, &block|
define_method(target) do |partial = true, start_time = 0, dedup = true, &block|
path = "/api/v2/#{target}.json"
if partial
export(path, target, &block)
else
export_parallel(path, target, start_time, &block)
export_parallel(path, target, start_time, dedup, false, &block)
end
end
end
Expand All @@ -129,26 +129,27 @@ def fetch_subresource(record_id, base, target)

private

def export_parallel(path, key, start_time = 0, &block)
def export_parallel(path, key, start_time = 0, dedup = true, paging = true, &block)
per_page = 100 # 100 is maximum https://developer.zendesk.com/rest_api/docs/core/introduction#pagination
first_response = request(path, false, per_page: per_page, page: 1)
first_fetched = JSON.parse(first_response.body)
total_count = first_fetched["count"]
last_page_num = (total_count / per_page.to_f).ceil
Embulk.logger.info "#{key} records=#{total_count} last_page=#{last_page_num}"

first_fetched[key].uniq { |r| r['id'] }.each do |record|
block.call record
end

execute_thread_pool do |pool|
(2..last_page_num).each do |page|
pool.post do
response = request(path, false, per_page: per_page, page: page)
fetched_records = extract_records_from_response(response, key)
Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}"
fetched_records.uniq { |r| r['id'] }.each do |record|
block.call record
Embulk.logger.info "#{key} records=#{total_count} last_page=#{paging ? last_page_num : 1}"

handler = lambda { |records| records.each { |r| block.call r } }
handler.call(dedup ? first_fetched[key].uniq { |r| r['id'] } : first_fetched[key])

# stop if endpoints have no pagination, ie. API returns all records
# `ticket_fields`, `ticket_forms`
if paging
execute_thread_pool do |pool|
(2..last_page_num).each do |page|
pool.post do
response = request(path, false, per_page: per_page, page: page)
fetched_records = extract_records_from_response(response, key)
Embulk.logger.info "Fetched #{key} on page=#{page} >>> size: #{fetched_records.length}"
handler.call(dedup ? fetched_records.uniq { |r| r['id'] } : fetched_records)
end
end
end
Expand Down
41 changes: 35 additions & 6 deletions test/embulk/input/zendesk/test_client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,7 @@ def client
"",
{
ticket_metrics: records,
next_page: "https://treasuredata.zendesk.com/api/v2/incremental/tickets
.json?include=metric_sets&start_time=1488535542",
next_page: "https://treasuredata.zendesk.com/api/v2/incremental/tickets.json?include=metric_sets&start_time=1488535542",
}.to_json
].join("\r\n")

Expand Down Expand Up @@ -299,7 +298,7 @@ def client

test "invoke export when partial=false" do
# Added default `start_time`
mock(client).export_parallel(anything, "ticket_fields", 0)
mock(client).export_parallel(anything, "ticket_fields", 0, true, false) # new args: `dedup`, `paging`
client.ticket_fields(false)
end
end
Expand All @@ -312,10 +311,40 @@ def client

test "invoke export when partial=false" do
# Added default `start_time`
mock(client).export_parallel(anything, "ticket_forms", 0)
mock(client).export_parallel(anything, "ticket_forms", 0, true, false) # new args: `dedup`, `paging`)
client.ticket_forms(false)
end
end

sub_test_case "no pagination" do
data("ticket_fields", "ticket_fields")
data("ticket_forms", "ticket_forms")
test "non-incremental targets" do |target|
response = [
"HTTP/1.1 200",
"Content-Type: application/json",
"",
{
target => 200.times.map{|n| {"id" => n}},
count: 200,
next_page: nil,
previous_page: nil,
}.to_json
].join("\r\n")

# mock multiple responses, to simulate real API behavior
@httpclient.test_loopback_http_response << response
@httpclient.test_loopback_http_response << response
counter = Concurrent::AtomicFixnum.new(0)
handler = proc { counter.increment }
# validate expected target
proxy(@httpclient).get("#{login_url}/api/v2/#{target}.json",anything,anything)
# (`partial`, `start_time`, `default`, `block`)
client.public_send(target, false, 0, true, &handler)
# only ingest 200 records
assert_equal(200, counter.value)
end
end
end


Expand Down Expand Up @@ -629,12 +658,12 @@ def client
"Content-Type: application/json",
"",
{
ticket_fields: [{ id: 1 }],
tickets: [{ id: 1 }],
count: 1
}.to_json
].join("\r\n")
handler = proc { }
client.ticket_fields(false, &handler)
client.tickets(false, &handler)
assert_equal(true, @pool.shutdown?)
end

Expand Down