Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support cursor based incremental #84

Merged
merged 6 commits into from
Aug 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
## 0.4.2 - 2022-23-05
## 0.4.4 - 2023-07-21
* [enhancement] Support cursor based incremental [#84](https://github.com/treasure-data/embulk-input-zendesk/pull/84)

## 0.4.3 - 2022-10-21
* [enhancement] Bump up to v0.4.3, built with the Gradle plugin v0.5.5 [#78](https://github.com/treasure-data/embulk-input-zendesk/pull/78)

## 0.4.2 - 2022-05-23
* [enhancement] Catchup embulk v0.10.32 [#77](https://github.com/treasure-data/embulk-input-zendesk/pull/77)

## 0.4.1 - 2022-29-03
## 0.4.1 - 2022-03-29
* [enhancement] Remove deprecated functions [#76](https://github.com/treasure-data/embulk-input-zendesk/pull/76)

## 0.4.0 - 2022-03-03
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Required Embulk version >= 0.9.6.
- **profile_source**: Profile source of user event, required if `target` is `user_events`.
- **user_event_source**: Source of user event, required if `target` is `user_events`.
- **user_event_type**: Type of user event, required if `target` is `user_events`.
- **enable_cursor_based_api**: Enable to use cursor based api endpoint for tickets and users target (boolean, default: `false`)

## Example

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ repositories {
def embulkVersion = '0.10.31'

group = "com.treasuredata.embulk.plugins"
version = "0.4.3-SNAPSHOT"
version = "0.4.4-SNAPSHOT"
description = "Loads records From Zendesk"

sourceCompatibility = 1.8
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.embulk.input.zendesk.models.AuthenticationMethod;
import org.embulk.input.zendesk.models.Target;
import org.embulk.input.zendesk.services.ZendeskChatService;
import org.embulk.input.zendesk.services.ZendeskCursorBasedService;
import org.embulk.input.zendesk.services.ZendeskCustomObjectService;
import org.embulk.input.zendesk.services.ZendeskNPSService;
import org.embulk.input.zendesk.services.ZendeskService;
Expand Down Expand Up @@ -157,6 +158,10 @@ public interface PluginTask
@ConfigDefault("null")
Optional<String> getUserEventSource();

@Config("enable_cursor_based_api")
@ConfigDefault("false")
boolean getEnableCursorBasedApi();

@Config("columns")
SchemaConfig getColumns();
}
Expand Down Expand Up @@ -301,7 +306,7 @@ private JsonNode addAllColumnsToSchema(final JsonNode jsonNode, final Target tar
ConfigDiff configDiff = guessData(jsonNode, target.getJsonName());
ConfigDiff parser = configDiff.getNested("parser");
if (parser.has("columns")) {
JsonNode columns = parser.get(JsonNode.class, "columns");
JsonNode columns = parser.get(JsonNode.class, "columns");
final Iterator<JsonNode> ite = columns.elements();

while (ite.hasNext()) {
Expand Down Expand Up @@ -444,6 +449,7 @@ protected ZendeskService dispatchPerTarget(ZendeskInputPlugin.PluginTask task)
switch (task.getTarget()) {
case TICKETS:
case USERS:
return task.getEnableCursorBasedApi() ? new ZendeskCursorBasedService(task) : new ZendeskSupportAPIService(task);
case ORGANIZATIONS:
case TICKET_METRICS:
case TICKET_EVENTS:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package org.embulk.input.zendesk.services;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.http.HttpStatus;
import org.apache.http.client.utils.URIBuilder;
import org.embulk.config.ConfigException;
import org.embulk.config.TaskReport;
import org.embulk.input.zendesk.RecordImporter;
import org.embulk.input.zendesk.ZendeskInputPlugin;
import org.embulk.input.zendesk.clients.ZendeskRestClient;
import org.embulk.input.zendesk.models.ZendeskException;
import org.embulk.input.zendesk.utils.ZendeskConstants;
import org.embulk.input.zendesk.utils.ZendeskDateUtils;
import org.embulk.input.zendesk.utils.ZendeskUtils;
import org.embulk.spi.DataException;
import org.embulk.spi.Exec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URISyntaxException;
import java.util.Iterator;

import static org.embulk.input.zendesk.ZendeskInputPlugin.CONFIG_MAPPER_FACTORY;

public class ZendeskCursorBasedService
implements ZendeskService
{
private static final Logger logger = LoggerFactory.getLogger(ZendeskNormalServices.class);

protected ZendeskInputPlugin.PluginTask task;

private ZendeskRestClient zendeskRestClient;

public ZendeskCursorBasedService(final ZendeskInputPlugin.PluginTask task)
{
this.task = task;
}

@Override
public boolean isSupportIncremental()
{
return true;
}

@Override
public TaskReport addRecordToImporter(int taskIndex, RecordImporter recordImporter)
{
TaskReport taskReport = CONFIG_MAPPER_FACTORY.newTaskReport();
importData(task, recordImporter, taskReport);

return taskReport;
}

@Override
public JsonNode getDataFromPath(String path, int page, boolean isPreview, long startTime)
{
throw new UnsupportedOperationException();
}

@VisibleForTesting
protected ZendeskRestClient getZendeskRestClient()
{
if (zendeskRestClient == null) {
zendeskRestClient = new ZendeskRestClient();
}
return zendeskRestClient;
}

private void importData(final ZendeskInputPlugin.PluginTask task, final RecordImporter recordImporter, final TaskReport taskReport)
{
long initStartTime = 0;

if (task.getStartTime().isPresent()) {
initStartTime = ZendeskDateUtils.getStartTime(task.getStartTime().get());
}

long nextStartTime = initStartTime;

try {
String path = buildPath(task, initStartTime);

while (true) {
final JsonNode result = fetchResultFromPath(path);

final Iterator<JsonNode> iterator = ZendeskUtils.getListRecords(result, task.getTarget().getJsonName());

int numberOfRecords = 0;
if (result.has(ZendeskConstants.Field.COUNT)) {
numberOfRecords = result.get(ZendeskConstants.Field.COUNT).asInt();
}
hieudion marked this conversation as resolved.
Show resolved Hide resolved

while (iterator.hasNext()) {
final JsonNode recordJsonNode = iterator.next();
fetchSubResourceAndAddToImporter(recordJsonNode, task, recordImporter);

// Store nextStartTime of last item
if (!iterator.hasNext() && task.getIncremental()) {
nextStartTime = ZendeskDateUtils.isoToEpochSecond(recordJsonNode.get(ZendeskConstants.Field.UPDATED_AT).asText());
}
}

if (result.has(ZendeskConstants.Field.END_OF_STREAM)) {
if (result.get(ZendeskConstants.Field.END_OF_STREAM).asBoolean()) {
break;
}
}
else {
throw new DataException("Missing end of stream, please double-check the endpoint");
}
if (Exec.isPreview()) {
break;
}

path = result.get(ZendeskConstants.Field.AFTER_URL).asText();
}

if (!Exec.isPreview() && task.getIncremental()) {
storeStartTimeForConfigDiff(taskReport, nextStartTime);
}
}
catch (Exception e) {
e.printStackTrace();
hieudion marked this conversation as resolved.
Show resolved Hide resolved
}
}

private String buildPath(ZendeskInputPlugin.PluginTask task, long startTime)
throws URISyntaxException
{
return ZendeskUtils.getURIBuilder(task.getLoginUrl())
.setPath(ZendeskConstants.Url.API
+ "/" + "incremental"
+ "/" + task.getTarget().toString()
+ "/" + "cursor.json").build().toString()
+ "?start_time=" + startTime;
}

private JsonNode fetchResultFromPath(String path)
{
final String response = getZendeskRestClient().doGet(path, task, Exec.isPreview());
return ZendeskUtils.parseJsonObject(response);
}

private void fetchSubResourceAndAddToImporter(final JsonNode jsonNode, final ZendeskInputPlugin.PluginTask task, final RecordImporter recordImporter)
{
task.getIncludes().forEach(include -> {
final String relatedObjectName = include.trim();

final URIBuilder uriBuilder = ZendeskUtils.getURIBuilder(task.getLoginUrl())
.setPath(ZendeskConstants.Url.API
+ "/" + task.getTarget().toString()
+ "/" + jsonNode.get(ZendeskConstants.Field.ID).asText()
+ "/" + relatedObjectName + ".json");
try {
final JsonNode result = getDataFromPath(uriBuilder.toString(), 0, false, 0);
if (result != null && result.has(relatedObjectName)) {
((ObjectNode) jsonNode).set(include, result.get(relatedObjectName));
}
}
catch (final ConfigException e) {
// Sometimes we get 404 when having invalid endpoint, so ignore when we get 404 InvalidEndpoint
if (!(e.getCause() instanceof ZendeskException && ((ZendeskException) e.getCause()).getStatusCode() == HttpStatus.SC_NOT_FOUND)) {
throw e;
}
}
});

recordImporter.addRecord(jsonNode);
}

private void storeStartTimeForConfigDiff(final TaskReport taskReport, final long nextStartTime)
{
taskReport.set(ZendeskConstants.Field.START_TIME, nextStartTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ public static class Field
public static final String GENERATED_TIMESTAMP = "generated_timestamp";
public static final String UPDATED_AT = "updated_at";
public static final String ID = "id";
public static final String END_OF_STREAM = "end_of_stream";
public static final String AFTER_URL = "after_url";
}

public static class Url
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static String convertToDateTimeFormat(String datetime, String dateTimeFor
}

// start_time should be start from 0
public static long getStartTime(final String time)
public static long getStartTime(final String time)
{
try {
return isoToEpochSecond(time);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.embulk.config.TaskSource;
import org.embulk.input.zendesk.models.Target;
import org.embulk.input.zendesk.services.ZendeskChatService;
import org.embulk.input.zendesk.services.ZendeskCursorBasedService;
import org.embulk.input.zendesk.services.ZendeskCustomObjectService;
import org.embulk.input.zendesk.services.ZendeskNPSService;
import org.embulk.input.zendesk.services.ZendeskService;
Expand Down Expand Up @@ -205,6 +206,25 @@ public void testDispatchPerTargetShouldReturnSupportAPIService()
testReturnSupportAPIService(Target.ORGANIZATIONS);
}

@Test
public void testDispatchPerTargetShouldReturn()
{
zendeskInputPlugin = spy(new ZendeskInputPlugin());

final ConfigSource src = ZendeskTestHelper.getConfigSource("base.yml");
src.set("target", Target.TICKETS.name().toLowerCase());
src.set("columns", Collections.EMPTY_LIST);
src.set("enable_cursor_based_api", true);
ZendeskInputPlugin.PluginTask task = CONFIG_MAPPER.map(src, ZendeskInputPlugin.PluginTask.class);
ZendeskService zendeskService = zendeskInputPlugin.dispatchPerTarget(task);
assertTrue(zendeskService instanceof ZendeskCursorBasedService);

src.set("target", Target.USERS.name().toLowerCase());
task = CONFIG_MAPPER.map(src, ZendeskInputPlugin.PluginTask.class);
zendeskService = zendeskInputPlugin.dispatchPerTarget(task);
assertTrue(zendeskService instanceof ZendeskCursorBasedService);
}

@Test
public void testDispatchPerTargetShouldReturnNPSService()
{
Expand Down
Loading