Skip to content

Commit

Permalink
Merge pull request #16 from LosD/feature/incremental-update
Browse files Browse the repository at this point in the history
Incremental loading of Github events and issues.
  • Loading branch information
mihneadb committed Nov 19, 2014
2 parents e8b07ad + 15f022f commit fb5ec3f
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 53 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Works for private repos as well if you provide authentication.
Assuming you have elasticsearch's `bin` folder in your `PATH`:

```
plugin -i com.ubervu/elasticsearch-river-github/1.7.0
plugin -i com.ubervu/elasticsearch-river-github/1.7.1
```

Otherwise, you have to find the directory yourself. It should be
Expand All @@ -32,7 +32,7 @@ curl -XPUT localhost:9200/_river/my_gh_river/_meta -d '{
"github": {
"owner": "gabrielfalcao",
"repository": "lettuce",
"interval": 3600,
"interval": 60,
"authentication": {
"username": "MYUSER", # or token
"password": "MYPASSWORD" # or x-oauth-basic when using a token
Expand All @@ -42,9 +42,11 @@ curl -XPUT localhost:9200/_river/my_gh_river/_meta -d '{
}'
```

Interval is given in seconds and it changes how often the river looks for new data.
_interval_ is optional, given in seconds and changes how often the river looks for new data. Since 1.7.1 the default value has been reduced to one minute as we now only load issues and events that has changed, which should decrease API calls and improve the time to update quite significantly. The actual polling interval will be affected by GitHub's minimum allowed polling interval, which is normally 60 seconds, but may increase when servers are busy.

The authentication bit is optional. It helps with the API rate limit and when accessing private data. You can use your own GitHub credentials or a token. When using a token, fill in the token as the username and `x-oauth-basic` as the password, as the [docs](http://developer.github.com/v3/auth/#basic-authentication) mention.
_authentication_ is optional and helps with the API rate limit (5000 requests/hour instead of 60 requests/hour) and when accessing private data. You can use your own GitHub credentials or a token. When using a token, fill in the token as the username and `x-oauth-basic` as the password, as the [docs](http://developer.github.com/v3/auth/#basic-authentication) mention.

If you do not use _authentication_, you may want to set _interval_ to a higher value, like 900 (every 15 minutes), as the GitHub rate limit will probably be breached when using low values. This is __not__ recommended if you require the GitHub events without holes, as Github only allows access to the last 300 events. In that case, authenticating is highly recommended. _This will probably change in a later version, at least for repositories without too much traffic, as we should be able to check for changes before loading most types of entries._

##Deleting the river

Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.ubervu</groupId>
<artifactId>elasticsearch-river-github</artifactId>
<version>1.7.0</version>
<version>1.7.1</version>
<packaging>jar</packaging>
<description>Github River for ElasticSearch</description>
<inceptionYear>2014</inceptionYear>
Expand Down
215 changes: 167 additions & 48 deletions src/main/java/com/ubervu/river/github/GitHubRiver.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,30 +8,38 @@
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.Base64;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.FilteredQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndexAlreadyExistsException;
import org.elasticsearch.river.AbstractRiverComponent;
import org.elasticsearch.river.River;
import org.elasticsearch.river.RiverName;
import org.elasticsearch.river.RiverSettings;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilders;
import org.elasticsearch.search.sort.SortOrder;

import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;


Expand All @@ -41,11 +49,14 @@ public class GitHubRiver extends AbstractRiverComponent implements River {
private final String index;
private final String repository;
private final String owner;
private final int userRequestedInterval;
private final String endpoint;
private final int interval;

private String password;
private String username;
private DataStream dataStream;
private String eventETag = null;
private int pollInterval = 60;

@SuppressWarnings({"unchecked"})
@Inject
Expand All @@ -61,8 +72,9 @@ public GitHubRiver(RiverName riverName, RiverSettings settings, Client client) {
Map<String, Object> githubSettings = (Map<String, Object>) settings.settings().get("github");
owner = XContentMapValues.nodeStringValue(githubSettings.get("owner"), null);
repository = XContentMapValues.nodeStringValue(githubSettings.get("repository"), null);

index = String.format("%s&%s", owner, repository);
interval = XContentMapValues.nodeIntegerValue(githubSettings.get("interval"), 3600);
userRequestedInterval = XContentMapValues.nodeIntegerValue(githubSettings.get("interval"), 60);

// auth (optional)
username = null;
Expand All @@ -89,7 +101,7 @@ public void start() {
client.admin().indices().prepareCreate(index).setSettings(indexSettings).execute().actionGet();
logger.info("Created index.");
} catch (IndexAlreadyExistsException e) {
;
logger.info("Index already created");
} catch (Exception e) {
logger.error("Exception creating index.", e);
}
Expand All @@ -101,6 +113,7 @@ public void start() {
@Override
public void close() {
dataStream.setRunning(false);
dataStream.interrupt();
logger.info("Stopped GitHub river.");
}

Expand All @@ -113,13 +126,38 @@ public DataStream() {
isRunning = true;
}

private void indexResponse(URLConnection conn, String type) {
InputStream input = null;
private boolean checkAndUpdateETag(HttpURLConnection conn) throws IOException {
if (eventETag != null) {
conn.setRequestProperty("If-None-Match", eventETag);
}

String xPollInterval = conn.getHeaderField("X-Poll-Interval");
if (xPollInterval != null) {
logger.debug("Next GitHub specified minimum polling interval is {} s", xPollInterval);
pollInterval = Integer.parseInt(xPollInterval);
}

if (conn.getResponseCode() == 304) {
logger.debug("304 {}", conn.getResponseMessage());
return false;
}

String eTag = conn.getHeaderField("ETag");
if (eTag != null) {
logger.debug("New eTag: {}", eTag);
eventETag = eTag;
}

return true;
}

private boolean indexResponse(HttpURLConnection conn, String type) {
InputStream input;
try {
input = conn.getInputStream();
} catch (IOException e) {
logger.info("Exception encountered (403 usually is rate limit exceeded): ", e);
return;
return false;
}
JsonStreamParser jsp = new JsonStreamParser(new InputStreamReader(input));

Expand All @@ -139,10 +177,17 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)
}
}).build();

boolean continueIndexing = true;

IndexRequest req = null;
for (JsonElement e: array) {
if (type.equals("event")) {
req = indexEvent(e);
if (req == null) {
continueIndexing = false;
logger.debug("Found existing event, all remaining events has already been indexed");
break;
}
} else if (type.equals("issue")) {
req = indexOther(e, "IssueData", true);
} else if (type.equals("pullreq")) {
Expand All @@ -160,13 +205,26 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure)

try {
input.close();
} catch (IOException e) {}
} catch (IOException e) {
logger.warn("Couldn't close connection?", e);
}

return continueIndexing;
}

private boolean isEventIndexed(String id) {
return client.prepareGet(index, null, id).get().isExists();
}

private IndexRequest indexEvent(JsonElement e) {
JsonObject obj = e.getAsJsonObject();
String type = obj.get("type").getAsString();
String id = obj.get("id").getAsString();

if (isEventIndexed(id)) {
return null;
}

IndexRequest req = new IndexRequest(index)
.type(type)
.id(id).create(false) // we want to overwrite old items
Expand Down Expand Up @@ -237,70 +295,131 @@ private String nextPageURL(URLConnection response) {
return headerData.get("url");
}

private void addAuthHeader(URLConnection request) {
private void addAuthHeader(URLConnection connection) {
if (username == null || password == null) {
return;
}
String auth = String.format("%s:%s", username, password);
String encoded = Base64.encodeBytes(auth.getBytes());
request.setRequestProperty("Authorization", "Basic " + encoded);
connection.setRequestProperty("Authorization", "Basic " + encoded);
}

private boolean getData(String fmt, String type) {
return getData(fmt, type, null);
}

private void getData(String fmt, String type) {
private boolean getData(String fmt, String type, String since) {
try {
URL url = new URL(String.format(fmt, owner, repository));
URLConnection response = url.openConnection();
addAuthHeader(response);
indexResponse(response, type);

while (morePagesAvailable(response)) {
url = new URL(nextPageURL(response));
response = url.openConnection();
addAuthHeader(response);
indexResponse(response, type);
URL url;
if (since != null) {
url = new URL(String.format(fmt, owner, repository, since));
} else {
url = new URL(String.format(fmt, owner, repository));
}
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
addAuthHeader(connection);
if (type.equals("event")) {
boolean modified = checkAndUpdateETag(connection);
if (!modified) {
return false;
}
}
boolean continueIndexing = indexResponse(connection, type);

while (continueIndexing && morePagesAvailable(connection)) {
url = new URL(nextPageURL(connection));
connection = (HttpURLConnection) url.openConnection();
addAuthHeader(connection);
continueIndexing = indexResponse(connection, type);
}
} catch (Exception e) {
logger.error("Exception in getData", e);
}

return true;
}

private void deleteByType(String type) {
DeleteByQueryResponse response = client.prepareDeleteByQuery(index)
client.prepareDeleteByQuery(index)
.setQuery(termQuery("_type", type))
.execute()
.actionGet();
}

/**
* Gets the creation data of the single newest entry.
*
* @return ISO8601 formatted time of most recent entry, or null on empty or error.
*/
private String getMostRecentEntry() {
long totalEntries = client.prepareCount(index).setQuery(matchAllQuery()).execute().actionGet().getCount();
if (totalEntries > 0) {
FilteredQueryBuilder updatedAtQuery = QueryBuilders
.filteredQuery(QueryBuilders.matchAllQuery(), FilterBuilders.existsFilter("created_at"));
FieldSortBuilder updatedAtSort = SortBuilders.fieldSort("created_at").order(SortOrder.DESC);

SearchResponse response = client.prepareSearch(index)
.setQuery(updatedAtQuery)
.addSort(updatedAtSort)
.setSize(1)
.execute()
.actionGet();

String createdAt = (String) response.getHits().getAt(0).getSource().get("created_at");
logger.debug("Most recent event was created at {}", createdAt);
return createdAt;
} else {
// getData will get all data on a null.
logger.info("No existing entries, assuming first run");
return null;
}
}

@Override
public void run() {
while (isRunning) {
getData(endpoint + "/repos/%s/%s/events?per_page=1000", "event");
getData(endpoint + "/repos/%s/%s/issues?per_page=1000", "issue");
getData(endpoint + "/repos/%s/%s/issues?state=closed&per_page=1000", "issue");

// delete pull req data - we are only storing open pull reqs
// and when a pull request is closed we have no way of knowing;
// this is why we have to delete them and reindex "fresh" ones
deleteByType("PullRequestData");
getData(endpoint + "/repos/%s/%s/pulls", "pullreq");

// same for milestones
deleteByType("MilestoneData");
getData(endpoint + "/repos/%s/%s/milestones?per_page=1000", "milestone");

// collaborators
deleteByType("CollaboratorData");
getData(endpoint + "/repos/%s/%s/collaborators?per_page=1000", "collaborator");

// and for labels - they have IDs based on the MD5 of the contents, so
// if a property changes, we get a "new" document
deleteByType("LabelData");
getData(endpoint + "/repos/%s/%s/labels?per_page=1000", "label");


// Must be read before getting new events.
String mostRecentEntry = getMostRecentEntry();

logger.debug("Checking for events");
if (getData(endpoint + "/repos/%s/%s/events?per_page=1000",
"event")) {
logger.debug("First run or new events found, fetching rest of the data");
if (mostRecentEntry != null) {
getData(endpoint + "/repos/%s/%s/issues?state=all&per_page=1000&since=%s",
"issue", mostRecentEntry);
} else {
getData(endpoint + "/repos/%s/%s/issues?state=all&per_page=1000",
"issue");
}
// delete pull req data - we are only storing open pull reqs
// and when a pull request is closed we have no way of knowing;
// this is why we have to delete them and reindex "fresh" ones
deleteByType("PullRequestData");
getData(endpoint + "/repos/%s/%s/pulls", "pullreq");

// same for milestones
deleteByType("MilestoneData");
getData(endpoint + "/repos/%s/%s/milestones?per_page=1000", "milestone");

// collaborators
deleteByType("CollaboratorData");
getData(endpoint + "/repos/%s/%s/collaborators?per_page=1000", "collaborator");

// and for labels - they have IDs based on the MD5 of the contents, so
// if a property changes, we get a "new" document
deleteByType("LabelData");
getData(endpoint + "/repos/%s/%s/labels?per_page=1000", "label");
} else {
logger.debug("No new events found");
}
try {
Thread.sleep(interval * 1000); // needs milliseconds
} catch (InterruptedException e) {}
int waitTime = Math.max(pollInterval, userRequestedInterval) * 1000;
logger.debug("Waiting {} ms before polling for new events", waitTime);
Thread.sleep(waitTime); // needs milliseconds
} catch (InterruptedException e) {
logger.info("Wait interrupted, river was probably stopped");
}
}
}

Expand Down

0 comments on commit fb5ec3f

Please sign in to comment.