-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for parallel exports #52
Conversation
PTAL @labianchin |
Codecov Report
@@ Coverage Diff @@
## master #52 +/- ##
========================================
Coverage ? 88.9%
Complexity ? 165
========================================
Files ? 21
Lines ? 622
Branches ? 43
========================================
Hits ? 553
Misses ? 47
Partials ? 22 |
return Lists.newArrayList( | ||
String.format("SELECT * FROM %s%s%s", this.tableName(), where, limit)); | ||
|
||
if (parallelism().isPresent() && splitColumn().isPresent()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe extract this if
branch into separate method?
final ResultSet | ||
resultSet = | ||
statement.executeQuery(query); | ||
resultSet.first(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[main] INFO com.spotify.dbeam.avro.BeamJdbcAvroSchema - Elapsed time to schema 0.452 seconds
[main] ERROR com.spotify.dbeam.jobs.ExceptionHandling - Failure:
org.postgresql.util.PSQLException: Operation requires a scrollable ResultSet, but this ResultSet is FORWARD_ONLY.
at org.postgresql.jdbc.PgResultSet.checkScrollable(PgResultSet.java:280)
at org.postgresql.jdbc.PgResultSet.first(PgResultSet.java:355)
at com.spotify.dbeam.args.QueryBuilderArgs.findSplitLimits(QueryBuilderArgs.java:178)
at com.spotify.dbeam.args.QueryBuilderArgs.buildQueries(QueryBuilderArgs.java:139)
at com.spotify.dbeam.jobs.JdbcAvroJob.prepareExport(JdbcAvroJob.java:93)
at com.spotify.dbeam.jobs.JdbcAvroJob.runExport(JdbcAvroJob.java:134)
at com.spotify.dbeam.jobs.JdbcAvroJob.main(JdbcAvroJob.java:142)
Maybe use checkState(resultSet.next(), "Min/Max query returned empty results");
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch.. I'll add a check
@labianchin this is now ready for merging. I would update the documentation and add best practices separately. |
Let's ship this, try a bit and then iterate a bit more on it. We might still be missing metrics and maybe some tests. But given this is a new feature and it does not break existing flows, we can ship and try. |
* add draft support for parallel exports * separate functions for split queries * fix doc * fix logic and add test cases * add check for -ve parallelism * refactor metering for parallel exports * check result set * rename to query parallelism to not conflict with beam's targetParallelism * fix record gauge reporting * change order of e2e test * fix metering for parallel queries * add e2e test for parallel query
* add draft support for parallel exports * separate functions for split queries * fix doc * fix logic and add test cases * add check for -ve parallelism * refactor metering for parallel exports * check result set * rename to query parallelism to not conflict with beam's targetParallelism * fix record gauge reporting * change order of e2e test * fix metering for parallel queries * add e2e test for parallel query
* add draft support for parallel exports * separate functions for split queries * fix doc * fix logic and add test cases * add check for -ve parallelism * refactor metering for parallel exports * check result set * rename to query parallelism to not conflict with beam's targetParallelism * fix record gauge reporting * change order of e2e test * fix metering for parallel queries * add e2e test for parallel query
First draft for adding support for parallel exports based on an int / long splitting column.
Implements #51