Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental loading of Github events and issues. #16

Merged
merged 16 commits into from
Nov 19, 2014
Merged
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
215 changes: 167 additions & 48 deletions src/main/java/com/ubervu/river/github/GitHubRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,38 @@
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.FilteredQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;


Expand All @@ -41,11 +49,14 @@ public class GitHubRiver extends AbstractRiverComponent implements River {
private final String index;
private final String repository;
private final String owner;
private final int userRequestedInterval;
private final String endpoint;
private final int interval;

private String password;
private String username;
private DataStream dataStream;
private String eventETag = null;
private int pollInterval = 60;

@SuppressWarnings({"unchecked"})
@Inject
Expand All @@ -61,8 +72,9 @@ public GitHubRiver(RiverName riverName, RiverSettings settings, Client client) {
Map<String, Object> githubSettings = (Map<String, Object>) settings.settings().get("github");
owner = XContentMapValues.nodeStringValue(githubSettings.get("owner"), null);
repository = XContentMapValues.nodeStringValue(githubSettings.get("repository"), null);

index = String.format("%s&%s", owner, repository);
interval = XContentMapValues.nodeIntegerValue(githubSettings.get("interval"), 3600);
userRequestedInterval = XContentMapValues.nodeIntegerValue(githubSettings.get("interval"), 60);

// auth (optional)
username = null;
Expand All @@ -89,7 +101,7 @@ public void start() {
client.admin().indices().prepareCreate(index).setSettings(indexSettings).execute().actionGet();
logger.info("Created index.");
} catch (IndexAlreadyExistsException e) {
;
logger.info("Index already created");
} catch (Exception e) {
logger.error("Exception creating index.", e);
}
Expand All @@ -101,6 +113,7 @@ public void start() {
@Override
public void close() {
dataStream.setRunning(false);
dataStream.interrupt();
logger.info("Stopped GitHub river.");
}

Expand All @@ -113,13 +126,38 @@ public DataStream() {
isRunning = true;
}

private void indexResponse(URLConnection conn, String type) {
InputStream input = null;
private boolean checkAndUpdateETag(HttpURLConnection conn) throws IOException {
if (eventETag != null) {
conn.setRequestProperty("If-None-Match", eventETag);
}

String xPollInterval = conn.getHeaderField("X-Poll-Interval");
if (xPollInterval != null) {
logger.debug("Next GitHub specified minimum polling interval is {} s", xPollInterval);
pollInterval = Integer.parseInt(xPollInterval);
}

if (conn.getResponseCode() == 304) {
logger.debug("304 {}", conn.getResponseMessage());
return false;
}

String eTag = conn.getHeaderField("ETag");
if (eTag != null) {
logger.debug("New eTag: {}", eTag);
eventETag = eTag;
}

return true;
}

private boolean indexResponse(HttpURLConnection conn, String type) {
InputStream input;
try {
input = conn.getInputStream();
} catch (IOException e) {
logger.info("Exception encountered (403 usually is rate limit exceeded): ", e);
return;
return false;
}
JsonStreamParser jsp = new JsonStreamParser(new InputStreamReader(input));

Expand All @@ -139,10 +177,17 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}).build();

boolean continueIndexing = true;

IndexRequest req = null;
for (JsonElement e: array) {
if (type.equals("event")) {
req = indexEvent(e);
if (req == null) {
continueIndexing = false;
logger.debug("Found existing event, all remaining events has already been indexed");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be 100% sure about this :D. Wouldn't want to miss any events!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be bad.

We could run through them all in the beginning, just to be sure (though we lose quite a bit of the speedup). If we still make the check, we could log a warning (or at least an info) if an unindexed event is registered after the continueIndexing flag (which should probably be renamed in that case) has been set. If we see none like this for a few releases, it should be safe to enable the optimization.

A middle ground could also be to run through the rest of the current block, and keep the continueIndexing warning. If there is any bugs or bad assumptions, there would still be a chance of losing events, but at least it would be quite a bit smaller, as it would have to hit in the boundary between blocks at for thing like off-by-one bugs to have an effect. If "last event first" doesn't hold in all cases, then all bets are of course off.

It's actually a liiiitle strange that they didn't implement "since" for events.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we try the first way you suggested for 1-2 days and watch the logs. If we see no occurrence, we should be fine.

re: since for events - maybe there is a way. I suggest you shoot an email to support@github.com, I've done it a couple times and they were very helpful. Maybe there's something that we are missing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sent them a mail, so let's see what they say. I'll make the change tonight. :)

break;
}
} else if (type.equals("issue")) {
req = indexOther(e, "IssueData", true);
} else if (type.equals("pullreq")) {
Expand All @@ -160,13 +205,26 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)

try {
input.close();
} catch (IOException e) {}
} catch (IOException e) {
logger.warn("Couldn't close connection?", e);
}

return continueIndexing;
}

private boolean isEventIndexed(String id) {
return client.prepareGet(index, null, id).get().isExists();
}

private IndexRequest indexEvent(JsonElement e) {
JsonObject obj = e.getAsJsonObject();
String type = obj.get("type").getAsString();
String id = obj.get("id").getAsString();

if (isEventIndexed(id)) {
return null;
}

IndexRequest req = new IndexRequest(index)
.type(type)
.id(id).create(false) // we want to overwrite old items
Expand Down Expand Up @@ -237,70 +295,131 @@ private String nextPageURL(URLConnection response) {
return headerData.get("url");
}

private void addAuthHeader(URLConnection request) {
private void addAuthHeader(URLConnection connection) {
if (username == null || password == null) {
return;
}
String auth = String.format("%s:%s", username, password);
String encoded = Base64.encodeBytes(auth.getBytes());
request.setRequestProperty("Authorization", "Basic " + encoded);
connection.setRequestProperty("Authorization", "Basic " + encoded);
}

private boolean getData(String fmt, String type) {
return getData(fmt, type, null);
}

private void getData(String fmt, String type) {
private boolean getData(String fmt, String type, String since) {
try {
URL url = new URL(String.format(fmt, owner, repository));
URLConnection response = url.openConnection();
addAuthHeader(response);
indexResponse(response, type);

while (morePagesAvailable(response)) {
url = new URL(nextPageURL(response));
response = url.openConnection();
addAuthHeader(response);
indexResponse(response, type);
URL url;
if (since != null) {
url = new URL(String.format(fmt, owner, repository, since));
} else {
url = new URL(String.format(fmt, owner, repository));
}
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
addAuthHeader(connection);
if (type.equals("event")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed, it is a bit unintuitive to have this here. :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that was exactly what I meant with "stretching the design" :D

boolean modified = checkAndUpdateETag(connection);
if (!modified) {
return false;
}
}
boolean continueIndexing = indexResponse(connection, type);

while (continueIndexing && morePagesAvailable(connection)) {
url = new URL(nextPageURL(connection));
connection = (HttpURLConnection) url.openConnection();
addAuthHeader(connection);
continueIndexing = indexResponse(connection, type);
}
} catch (Exception e) {
logger.error("Exception in getData", e);
}

return true;
}

private void deleteByType(String type) {
DeleteByQueryResponse response = client.prepareDeleteByQuery(index)
client.prepareDeleteByQuery(index)
.setQuery(termQuery("_type", type))
.execute()
.actionGet();
}

/**
* Gets the creation data of the single newest entry.
*
* @return ISO8601 formatted time of most recent entry, or null on empty or error.
*/
private String getMostRecentEntry() {
long totalEntries = client.prepareCount(index).setQuery(matchAllQuery()).execute().actionGet().getCount();
if (totalEntries > 0) {
FilteredQueryBuilder updatedAtQuery = QueryBuilders
.filteredQuery(QueryBuilders.matchAllQuery(), FilterBuilders.existsFilter("created_at"));
FieldSortBuilder updatedAtSort = SortBuilders.fieldSort("created_at").order(SortOrder.DESC);

SearchResponse response = client.prepareSearch(index)
.setQuery(updatedAtQuery)
.addSort(updatedAtSort)
.setSize(1)
.execute()
.actionGet();

String createdAt = (String) response.getHits().getAt(0).getSource().get("created_at");
logger.debug("Most recent event was created at {}", createdAt);
return createdAt;
} else {
// getData will get all data on a null.
logger.info("No existing entries, assuming first run");
return null;
}
}

@Override
public void run() {
while (isRunning) {
getData(endpoint + "/repos/%s/%s/events?per_page=1000", "event");
getData(endpoint + "/repos/%s/%s/issues?per_page=1000", "issue");
getData(endpoint + "/repos/%s/%s/issues?state=closed&per_page=1000", "issue");

// delete pull req data - we are only storing open pull reqs
// and when a pull request is closed we have no way of knowing;
// this is why we have to delete them and reindex "fresh" ones
deleteByType("PullRequestData");
getData(endpoint + "/repos/%s/%s/pulls", "pullreq");

// same for milestones
deleteByType("MilestoneData");
getData(endpoint + "/repos/%s/%s/milestones?per_page=1000", "milestone");

// collaborators
deleteByType("CollaboratorData");
getData(endpoint + "/repos/%s/%s/collaborators?per_page=1000", "collaborator");

// and for labels - they have IDs based on the MD5 of the contents, so
// if a property changes, we get a "new" document
deleteByType("LabelData");
getData(endpoint + "/repos/%s/%s/labels?per_page=1000", "label");


// Must be read before getting new events.
String mostRecentEntry = getMostRecentEntry();

logger.debug("Checking for events");
if (getData(endpoint + "/repos/%s/%s/events?per_page=1000",
"event")) {
logger.debug("First run or new events found, fetching rest of the data");
if (mostRecentEntry != null) {
getData(endpoint + "/repos/%s/%s/issues?state=all&per_page=1000&since=%s",
"issue", mostRecentEntry);
} else {
getData(endpoint + "/repos/%s/%s/issues?state=all&per_page=1000",
"issue");
}
// delete pull req data - we are only storing open pull reqs
// and when a pull request is closed we have no way of knowing;
// this is why we have to delete them and reindex "fresh" ones
deleteByType("PullRequestData");
getData(endpoint + "/repos/%s/%s/pulls", "pullreq");

// same for milestones
deleteByType("MilestoneData");
getData(endpoint + "/repos/%s/%s/milestones?per_page=1000", "milestone");

// collaborators
deleteByType("CollaboratorData");
getData(endpoint + "/repos/%s/%s/collaborators?per_page=1000", "collaborator");

// and for labels - they have IDs based on the MD5 of the contents, so
// if a property changes, we get a "new" document
deleteByType("LabelData");
getData(endpoint + "/repos/%s/%s/labels?per_page=1000", "label");
} else {
logger.debug("No new events found");
}
try {
Thread.sleep(interval * 1000); // needs milliseconds
} catch (InterruptedException e) {}
int waitTime = Math.max(pollInterval, userRequestedInterval) * 1000;
logger.debug("Waiting {} ms before polling for new events", waitTime);
Thread.sleep(waitTime); // needs milliseconds
} catch (InterruptedException e) {
logger.info("Wait interrupted, river was probably stopped");
}
}
}

Expand Down