Skip to content

Commit

Permalink
Remove optimization and replace with extra sanity checking of "last e…
Browse files Browse the repository at this point in the history
…vent always first" theory.

If/when confirmed, this can be reverted.
  • Loading branch information
Dennis Du Krøger committed Nov 18, 2014
1 parent 22516fe commit a1f5c8c
Showing 1 changed file with 21 additions and 13 deletions.
34 changes: 21 additions & 13 deletions src/main/java/com/ubervu/river/github/GitHubRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,15 @@ private boolean checkAndUpdateETag(HttpURLConnection conn) throws IOException {
return true;
}

private boolean indexResponse(HttpURLConnection conn, String type) {
String previouslyIndexedEvent = null;

private void indexResponse(HttpURLConnection conn, String type) {
InputStream input;
try {
input = conn.getInputStream();
} catch (IOException e) {
logger.info("Exception encountered (403 usually is rate limit exceeded): ", e);
return false;
return;
}
JsonStreamParser jsp = new JsonStreamParser(new InputStreamReader(input));

Expand All @@ -177,16 +179,15 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}).build();

boolean continueIndexing = true;

IndexRequest req = null;
for (JsonElement e: array) {
for (JsonElement e : array) {
if (type.equals("event")) {
req = indexEvent(e);
if (req == null) {
continueIndexing = false;
logger.debug("Found existing event, all remaining events has already been indexed");
break;
logger.debug("Found existing event, all remaining events should already have been indexed");
} else if (previouslyIndexedEvent != null) {
logger.warn("New non-indexed event {}, even though the previous event {} has already been indexed?", req.id(), previouslyIndexedEvent);
}
} else if (type.equals("issue")) {
req = indexOther(e, "IssueData", true);
Expand All @@ -199,7 +200,9 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
} else if (type.equals("collaborator")) {
req = indexOther(e, "CollaboratorData");
}
bp.add(req);
if (req != null) {
bp.add(req);
}
}
bp.close();

Expand All @@ -208,8 +211,6 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
} catch (IOException e) {
logger.warn("Couldn't close connection?", e);
}

return continueIndexing;
}

private boolean isEventIndexed(String id) {
Expand All @@ -222,9 +223,12 @@ private IndexRequest indexEvent(JsonElement e) {
String id = obj.get("id").getAsString();

if (isEventIndexed(id)) {
logger.debug("Found id {} already in the index. No more IDs should be found from now on", id);
previouslyIndexedEvent = id;
return null;
}

logger.debug("Indexing event {}", id);
IndexRequest req = new IndexRequest(index)
.type(type)
.id(id).create(false) // we want to overwrite old items
Expand Down Expand Up @@ -324,13 +328,17 @@ private boolean getData(String fmt, String type, String since) {
return false;
}
}
boolean continueIndexing = indexResponse(connection, type);

while (continueIndexing && morePagesAvailable(connection)) {
previouslyIndexedEvent = null;
logger.debug("Fetching data of type {}", type);
indexResponse(connection, type);

while (morePagesAvailable(connection)) {
logger.debug("More pages available, fetching next page");
url = new URL(nextPageURL(connection));
connection = (HttpURLConnection) url.openConnection();
addAuthHeader(connection);
continueIndexing = indexResponse(connection, type);
indexResponse(connection, type);
}
} catch (Exception e) {
logger.error("Exception in getData", e);
Expand Down

0 comments on commit a1f5c8c

Please sign in to comment.