-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Incremental loading of Github events and issues. #16
Merged
Merged
Changes from 14 commits
Commits
Show all changes
16 commits
Select commit
Hold shift + click to select a range
c915697
Update README.md
LosD cbb8c0a
Update README.md
LosD eefab14
Issues should now load incrementally properly.
97739a5
Revert "Update README.md"
6077fcd
Revert "Update README.md"
8c3b08e
Whoops, shouldn't have committed POM GPG change.
0f52050
Spacing
3e590fe
Stop fetching when reaching existing event.
261184e
Merge https://github.com/uberVU/elasticsearch-river-github into featu…
0c0b569
Avoid error when checking for recent events against empty index.
7d2a598
Whoops, removed GPG again.
22516fe
Use created_at instead of _id, as it does not seem to as serial as I …
a1f5c8c
Remove optimization and replace with extra sanity checking of "last e…
67fba6a
Bump version and explain changes.
82c5e69
Language cleaned up a bit, and information regarding interval and aut…
15f022f
Revert sanity check, restoring optimization
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,30 +8,38 @@ | |
import org.elasticsearch.action.bulk.BulkProcessor; | ||
import org.elasticsearch.action.bulk.BulkRequest; | ||
import org.elasticsearch.action.bulk.BulkResponse; | ||
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse; | ||
import org.elasticsearch.action.index.IndexRequest; | ||
import org.elasticsearch.action.search.SearchResponse; | ||
import org.elasticsearch.client.Client; | ||
import org.elasticsearch.common.Base64; | ||
import org.elasticsearch.common.inject.Inject; | ||
import org.elasticsearch.common.settings.ImmutableSettings; | ||
import org.elasticsearch.common.settings.Settings; | ||
import org.elasticsearch.common.xcontent.support.XContentMapValues; | ||
import org.elasticsearch.index.query.FilterBuilders; | ||
import org.elasticsearch.index.query.FilteredQueryBuilder; | ||
import org.elasticsearch.index.query.QueryBuilders; | ||
import org.elasticsearch.indices.IndexAlreadyExistsException; | ||
import org.elasticsearch.river.AbstractRiverComponent; | ||
import org.elasticsearch.river.River; | ||
import org.elasticsearch.river.RiverName; | ||
import org.elasticsearch.river.RiverSettings; | ||
import org.elasticsearch.search.sort.FieldSortBuilder; | ||
import org.elasticsearch.search.sort.SortBuilders; | ||
import org.elasticsearch.search.sort.SortOrder; | ||
|
||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.io.InputStreamReader; | ||
import java.net.HttpURLConnection; | ||
import java.net.URL; | ||
import java.net.URLConnection; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
|
||
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; | ||
import static org.elasticsearch.index.query.QueryBuilders.termQuery; | ||
|
||
|
||
|
@@ -41,11 +49,14 @@ public class GitHubRiver extends AbstractRiverComponent implements River { | |
private final String index; | ||
private final String repository; | ||
private final String owner; | ||
private final int userRequestedInterval; | ||
private final String endpoint; | ||
private final int interval; | ||
|
||
private String password; | ||
private String username; | ||
private DataStream dataStream; | ||
private String eventETag = null; | ||
private int pollInterval = 60; | ||
|
||
@SuppressWarnings({"unchecked"}) | ||
@Inject | ||
|
@@ -61,8 +72,9 @@ public GitHubRiver(RiverName riverName, RiverSettings settings, Client client) { | |
Map<String, Object> githubSettings = (Map<String, Object>) settings.settings().get("github"); | ||
owner = XContentMapValues.nodeStringValue(githubSettings.get("owner"), null); | ||
repository = XContentMapValues.nodeStringValue(githubSettings.get("repository"), null); | ||
|
||
index = String.format("%s&%s", owner, repository); | ||
interval = XContentMapValues.nodeIntegerValue(githubSettings.get("interval"), 3600); | ||
userRequestedInterval = XContentMapValues.nodeIntegerValue(githubSettings.get("interval"), 60); | ||
|
||
// auth (optional) | ||
username = null; | ||
|
@@ -89,7 +101,7 @@ public void start() { | |
client.admin().indices().prepareCreate(index).setSettings(indexSettings).execute().actionGet(); | ||
logger.info("Created index."); | ||
} catch (IndexAlreadyExistsException e) { | ||
; | ||
logger.info("Index already created"); | ||
} catch (Exception e) { | ||
logger.error("Exception creating index.", e); | ||
} | ||
|
@@ -101,6 +113,7 @@ public void start() { | |
@Override | ||
public void close() { | ||
dataStream.setRunning(false); | ||
dataStream.interrupt(); | ||
logger.info("Stopped GitHub river."); | ||
} | ||
|
||
|
@@ -113,8 +126,35 @@ public DataStream() { | |
isRunning = true; | ||
} | ||
|
||
private void indexResponse(URLConnection conn, String type) { | ||
InputStream input = null; | ||
private boolean checkAndUpdateETag(HttpURLConnection conn) throws IOException { | ||
if (eventETag != null) { | ||
conn.setRequestProperty("If-None-Match", eventETag); | ||
} | ||
|
||
String xPollInterval = conn.getHeaderField("X-Poll-Interval"); | ||
if (xPollInterval != null) { | ||
logger.debug("Next GitHub specified minimum polling interval is {} s", xPollInterval); | ||
pollInterval = Integer.parseInt(xPollInterval); | ||
} | ||
|
||
if (conn.getResponseCode() == 304) { | ||
logger.debug("304 {}", conn.getResponseMessage()); | ||
return false; | ||
} | ||
|
||
String eTag = conn.getHeaderField("ETag"); | ||
if (eTag != null) { | ||
logger.debug("New eTag: {}", eTag); | ||
eventETag = eTag; | ||
} | ||
|
||
return true; | ||
} | ||
|
||
String previouslyIndexedEvent = null; | ||
|
||
private void indexResponse(HttpURLConnection conn, String type) { | ||
InputStream input; | ||
try { | ||
input = conn.getInputStream(); | ||
} catch (IOException e) { | ||
|
@@ -139,10 +179,16 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) | |
} | ||
}).build(); | ||
|
||
|
||
IndexRequest req = null; | ||
for (JsonElement e: array) { | ||
for (JsonElement e : array) { | ||
if (type.equals("event")) { | ||
req = indexEvent(e); | ||
if (req == null) { | ||
logger.debug("Found existing event, all remaining events should already have been indexed"); | ||
} else if (previouslyIndexedEvent != null) { | ||
logger.warn("New non-indexed event {}, even though the previous event {} has already been indexed?", req.id(), previouslyIndexedEvent); | ||
} | ||
} else if (type.equals("issue")) { | ||
req = indexOther(e, "IssueData", true); | ||
} else if (type.equals("pullreq")) { | ||
|
@@ -154,19 +200,35 @@ public void afterBulk(long executionId, BulkRequest request, Throwable failure) | |
} else if (type.equals("collaborator")) { | ||
req = indexOther(e, "CollaboratorData"); | ||
} | ||
bp.add(req); | ||
if (req != null) { | ||
bp.add(req); | ||
} | ||
} | ||
bp.close(); | ||
|
||
try { | ||
input.close(); | ||
} catch (IOException e) {} | ||
} catch (IOException e) { | ||
logger.warn("Couldn't close connection?", e); | ||
} | ||
} | ||
|
||
private boolean isEventIndexed(String id) { | ||
return client.prepareGet(index, null, id).get().isExists(); | ||
} | ||
|
||
private IndexRequest indexEvent(JsonElement e) { | ||
JsonObject obj = e.getAsJsonObject(); | ||
String type = obj.get("type").getAsString(); | ||
String id = obj.get("id").getAsString(); | ||
|
||
if (isEventIndexed(id)) { | ||
logger.debug("Found id {} already in the index. No more IDs should be found from now on", id); | ||
previouslyIndexedEvent = id; | ||
return null; | ||
} | ||
|
||
logger.debug("Indexing event {}", id); | ||
IndexRequest req = new IndexRequest(index) | ||
.type(type) | ||
.id(id).create(false) // we want to overwrite old items | ||
|
@@ -237,70 +299,135 @@ private String nextPageURL(URLConnection response) { | |
return headerData.get("url"); | ||
} | ||
|
||
private void addAuthHeader(URLConnection request) { | ||
private void addAuthHeader(URLConnection connection) { | ||
if (username == null || password == null) { | ||
return; | ||
} | ||
String auth = String.format("%s:%s", username, password); | ||
String encoded = Base64.encodeBytes(auth.getBytes()); | ||
request.setRequestProperty("Authorization", "Basic " + encoded); | ||
connection.setRequestProperty("Authorization", "Basic " + encoded); | ||
} | ||
|
||
private boolean getData(String fmt, String type) { | ||
return getData(fmt, type, null); | ||
} | ||
|
||
private void getData(String fmt, String type) { | ||
private boolean getData(String fmt, String type, String since) { | ||
try { | ||
URL url = new URL(String.format(fmt, owner, repository)); | ||
URLConnection response = url.openConnection(); | ||
addAuthHeader(response); | ||
indexResponse(response, type); | ||
|
||
while (morePagesAvailable(response)) { | ||
url = new URL(nextPageURL(response)); | ||
response = url.openConnection(); | ||
addAuthHeader(response); | ||
indexResponse(response, type); | ||
URL url; | ||
if (since != null) { | ||
url = new URL(String.format(fmt, owner, repository, since)); | ||
} else { | ||
url = new URL(String.format(fmt, owner, repository)); | ||
} | ||
HttpURLConnection connection = (HttpURLConnection) url.openConnection(); | ||
addAuthHeader(connection); | ||
if (type.equals("event")) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Indeed, it is a bit unintuitive to have this here. :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, that was exactly what I meant with "stretching the design" :D |
||
boolean modified = checkAndUpdateETag(connection); | ||
if (!modified) { | ||
return false; | ||
} | ||
} | ||
|
||
previouslyIndexedEvent = null; | ||
logger.debug("Fetching data of type {}", type); | ||
indexResponse(connection, type); | ||
|
||
while (morePagesAvailable(connection)) { | ||
logger.debug("More pages available, fetching next page"); | ||
url = new URL(nextPageURL(connection)); | ||
connection = (HttpURLConnection) url.openConnection(); | ||
addAuthHeader(connection); | ||
indexResponse(connection, type); | ||
} | ||
} catch (Exception e) { | ||
logger.error("Exception in getData", e); | ||
} | ||
|
||
return true; | ||
} | ||
|
||
private void deleteByType(String type) { | ||
DeleteByQueryResponse response = client.prepareDeleteByQuery(index) | ||
client.prepareDeleteByQuery(index) | ||
.setQuery(termQuery("_type", type)) | ||
.execute() | ||
.actionGet(); | ||
} | ||
|
||
/** | ||
* Gets the creation data of the single newest entry. | ||
* | ||
* @return ISO8601 formatted time of most recent entry, or null on empty or error. | ||
*/ | ||
private String getMostRecentEntry() { | ||
long totalEntries = client.prepareCount(index).setQuery(matchAllQuery()).execute().actionGet().getCount(); | ||
if (totalEntries > 0) { | ||
FilteredQueryBuilder updatedAtQuery = QueryBuilders | ||
.filteredQuery(QueryBuilders.matchAllQuery(), FilterBuilders.existsFilter("created_at")); | ||
FieldSortBuilder updatedAtSort = SortBuilders.fieldSort("created_at").order(SortOrder.DESC); | ||
|
||
SearchResponse response = client.prepareSearch(index) | ||
.setQuery(updatedAtQuery) | ||
.addSort(updatedAtSort) | ||
.setSize(1) | ||
.execute() | ||
.actionGet(); | ||
|
||
String createdAt = (String) response.getHits().getAt(0).getSource().get("created_at"); | ||
logger.debug("Most recent event was created at {}", createdAt); | ||
return createdAt; | ||
} else { | ||
// getData will get all data on a null. | ||
logger.info("No existing entries, assuming first run"); | ||
return null; | ||
} | ||
} | ||
|
||
@Override | ||
public void run() { | ||
while (isRunning) { | ||
getData(endpoint + "/repos/%s/%s/events?per_page=1000", "event"); | ||
getData(endpoint + "/repos/%s/%s/issues?per_page=1000", "issue"); | ||
getData(endpoint + "/repos/%s/%s/issues?state=closed&per_page=1000", "issue"); | ||
|
||
// delete pull req data - we are only storing open pull reqs | ||
// and when a pull request is closed we have no way of knowing; | ||
// this is why we have to delete them and reindex "fresh" ones | ||
deleteByType("PullRequestData"); | ||
getData(endpoint + "/repos/%s/%s/pulls", "pullreq"); | ||
|
||
// same for milestones | ||
deleteByType("MilestoneData"); | ||
getData(endpoint + "/repos/%s/%s/milestones?per_page=1000", "milestone"); | ||
|
||
// collaborators | ||
deleteByType("CollaboratorData"); | ||
getData(endpoint + "/repos/%s/%s/collaborators?per_page=1000", "collaborator"); | ||
|
||
// and for labels - they have IDs based on the MD5 of the contents, so | ||
// if a property changes, we get a "new" document | ||
deleteByType("LabelData"); | ||
getData(endpoint + "/repos/%s/%s/labels?per_page=1000", "label"); | ||
|
||
|
||
// Must be read before getting new events. | ||
String mostRecentEntry = getMostRecentEntry(); | ||
|
||
logger.debug("Checking for events"); | ||
if (getData(endpoint + "/repos/%s/%s/events?per_page=1000", | ||
"event")) { | ||
logger.debug("First run or new events found, fetching rest of the data"); | ||
if (mostRecentEntry != null) { | ||
getData(endpoint + "/repos/%s/%s/issues?state=all&per_page=1000&since=%s", | ||
"issue", mostRecentEntry); | ||
} else { | ||
getData(endpoint + "/repos/%s/%s/issues?state=all&per_page=1000", | ||
"issue"); | ||
} | ||
// delete pull req data - we are only storing open pull reqs | ||
// and when a pull request is closed we have no way of knowing; | ||
// this is why we have to delete them and reindex "fresh" ones | ||
deleteByType("PullRequestData"); | ||
getData(endpoint + "/repos/%s/%s/pulls", "pullreq"); | ||
|
||
// same for milestones | ||
deleteByType("MilestoneData"); | ||
getData(endpoint + "/repos/%s/%s/milestones?per_page=1000", "milestone"); | ||
|
||
// collaborators | ||
deleteByType("CollaboratorData"); | ||
getData(endpoint + "/repos/%s/%s/collaborators?per_page=1000", "collaborator"); | ||
|
||
// and for labels - they have IDs based on the MD5 of the contents, so | ||
// if a property changes, we get a "new" document | ||
deleteByType("LabelData"); | ||
getData(endpoint + "/repos/%s/%s/labels?per_page=1000", "label"); | ||
} else { | ||
logger.debug("No new events found"); | ||
} | ||
try { | ||
Thread.sleep(interval * 1000); // needs milliseconds | ||
} catch (InterruptedException e) {} | ||
int waitTime = Math.max(pollInterval, userRequestedInterval) * 1000; | ||
logger.debug("Waiting {} ms before polling for new events", waitTime); | ||
Thread.sleep(waitTime); // needs milliseconds | ||
} catch (InterruptedException e) { | ||
logger.info("Wait interrupted, river was probably stopped"); | ||
} | ||
} | ||
} | ||
|
||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment saying that this is now optional (see how it is for auth, below).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, if the user sets a poll interval that is really large (rarely polling), there will be the possibility of losing events, correct?
So maybe it's better to remove that argument altogether? What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has always been optional, AFAIK (the default was 3600 before).
I think we should let it stay, especially until we ETag everything. Without it, an unauthenticated user would be 100% sure to hit the request limit, and they might not care about the events at all, i.e. we don't for our project (we use it to do graphs for issue open time). But we should probably warn that it is a very real possibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whoops, my bad, I forgot :). Ok, good point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've added a bit about it in the readme.