Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add --sqlFile argument to support user-provided SQL query #66

Merged
merged 18 commits into from
Aug 5, 2019

Conversation

rulle-io
Copy link
Contributor

The idea was to minimize existing code changes.
Argument --tableName is left mandatory to provide some table name, even when user provides own query.

@rulle-io rulle-io closed this Jun 11, 2019
@rulle-io rulle-io reopened this Jun 11, 2019
@rulle-io
Copy link
Contributor Author

@labianchin

Copy link
Collaborator

@labianchin labianchin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like some big changes. I will have a look later this week.

But have you consider using SQL views instead of a SQL file? (i.e. pointing --table to the view name).

I think @anish749 can also have a look since it touches the parallel queries parts.

pom.xml Outdated
@@ -126,7 +126,7 @@
<jackson.version>2.9.8</jackson.version>
<slf4j.version>1.7.25</slf4j.version>
<auto-value.version>1.5.3</auto-value.version>
<guava.version>20.0</guava.version>
<guava.version>21.0</guava.version>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that needed? I think we need to be careful to use version compatible with Beam SDK.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets keep this change in a separate PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

@rulle-io
Copy link
Contributor Author

@labianchin The use case I am trying to cover is when a user cannot create view(s) in a DB and has rights to execute a SELECT query only.
Moreover, this approach is much more flexible:

  • one can join multiple tables
  • add custom selection criteria
  • select only certain columns

/**
* Wrapper class for raw SQL query (SELECT statement).
*/
public class SqlQueryWrapper implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generic Comment, since this PR changes the way the query is being constructed:

Would a query builder pattern make sense?
I am thinking:
DbeamQueryBuilder as a class,
fromTable and fromQueryFile as Smart Constructors.

withLimit, withSplitColumn, withPartitionColumn as modifiers for the query.

getExtractQueries, and getMinMaxSplitQuery to trigger building the actual query and returning a String.

The primary aim is to have a better way to compose complex queries by adding things together.

In the implementation, every query modifier, forms a new sub query.

val source = OneOf(userProvidedQuery, tableName, currentQueryState) // delegated logic for finding out the source.

withPartitionColumnModifier means SELECT * FROM ($source) s WHERE $partitionColumnModifier
with.... means SELECT * FROM ($source) s WHERE $....

the source is a disjunction which can itself be another query. So the final query would look like:

SELECT * FROM (
  SELECT * FROM (
    user_provided_query
  ) x WHERE partitionColumn > '....' 
) y WHERE min(splitColumn) > ... and max(splitColumn) <... 

Adding a query modifier, puts the previous query in a sub query and queries from it.
This would allow a more structured way to manage modifiers and sources.

Another way might be having CTEs.
Since we are dealing with rDBMS here, I think it is safe to assume that query planners would push down the predicates as needed.

Copy link
Contributor Author

@rulle-io rulle-io Jun 16, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@anish749 I addressed some comments.

@anish749
Copy link
Contributor

This PR adds some nice and useful functionality, which is great.

It does touch the query builder logic, so we might need to test it out well. I need to take a closer look at the logic. I've mostly skimmed through this PR.

The way we have historically aimed at this problem is by having views which abstract out joins, filters, and prunes only the columns that need to be extracted and then expose that view. It is basically saying that the sql query doesn't stay in a file but is present in the database as a code, which is version managed, and the CI/CD system that manages database migrations manages the code of the view. It feels easier to deal with this system.

I do understand the need for having a sqlFile, and it is much more flexible, when it comes to extracting. It allows dbeam to be used to extract adhoc queries as well.

I am not sure if it would make version managing and deployment of sql in files easier. The files would need to go in custom docker images since the file path is only being passed as argument, when we think about deployment.

@anish749
Copy link
Contributor

anish749 commented Jun 17, 2019 via email

private DbeamQueryBuilder(final String sqlQuery) {
String uppSql = sqlQuery.toUpperCase();
if (!uppSql.startsWith("SELECT")) {
throw new IllegalArgumentException("Sql query should start with SELECT");
Copy link
Contributor

@anish749 anish749 Jun 28, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check might be too restrictive.
Some databases permit adding settings at a query level, which influence how a query is executed.

Eg: in postgres:

SET work_mem = '256MB';
SELECT * FROM users ... INNER JOIN .... ;
RESET work_mem;

Ideally we shouldn't be parsing / validating SQL queries.

Also one can use CTEs which means the query will most likely not start with a SELECT

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. Addressed.

Copy link
Contributor

@anish749 anish749 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a few comments. I think we should iterate a little more on this,
have some integration testing on at least postgres and mysql before merging.

throw new IllegalArgumentException("Sql query missing FROM clause");
}

// TODO: may be check that LIMIT is not present;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way to tackle this to enclose the user submitted query inside a sub query or a CTE.

Eg:

SELECT * FROM ($user_query) u WHERE 1 = 1 

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we might want to strip off the trailing ; before using this in a sub query and then add the ; again later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree.


public DbeamQueryBuilder withPartitionCondition(
String partitionColumn, String startPointIncl, String endPointExcl) {
sqlBuilder.append(createSqlPartitionCondition(partitionColumn, startPointIncl, endPointExcl));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to handle if we go for subqueries is that.

SELECT * FROM ($current_query) xyz WHERE  ${createSqlPartitionCondition(partitionColumn, startPointIncl, endPointExcl)}

return sqlQuery.replaceAll("[\\s|;]+$", "");
}

public DbeamQueryBuilder withLimit(Optional<Integer> limitOpt) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to use Long for LIMIT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

* Generates a new query to get MIN/MAX values for splitColumn.
*
* @param splitColumn column to use
* @param minSplitColumnName MIN() column value alias
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a feeling we can do away with passing minSplitColumnName / maxSplitColumnName and just have it fixed here. I think there should be only one caller for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use them later in the code.

@@ -38,6 +38,11 @@

void setTable(String value);

@Description("A local file containing the SQL SELECT query.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the file reader we have would allow us to read the file from GCS. If so we should update this description.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated.

private static Optional<String> resolveSqlQueryParameter(JdbcExportPipelineOptions options)
throws IOException {
if (options.getSqlFile() != null) {
return Optional.ofNullable(new ParameterFileReader().readAsResource(options.getSqlFile()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need Optional.ofNullable here? When does readAsResource return null? I think it should be reported to the user instead of the exception getting swallowed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

}

@Test
public void testItRemovesTrailingSymbols() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@@ -78,7 +104,7 @@ class JdbcExportArgsTest extends FlatSpec with Matchers {
}
}
it should "parse correctly with missing password parameter" in {
val options = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=some_table")
val options = optionsFromArgs("--connectionUrl=jdbc:postgresql://some_db --table=" + "some_table")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unintended change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

}

boolean isContainsWhere = false;
if (uppSql.toUpperCase().contains("WHERE ")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toUpperCase() on something already upper case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed.

Copy link
Collaborator

@labianchin labianchin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be in a good direction. Thanks @ra1861 !!

I was initially a bit skeptical on how this feature would behave with other parameters. From your tests I can now see that it can work well.

I left a few comments and also please rebase (there are some changes on h2 driver that I had to change/fix tests, see 0882999).

/**
* Wrapper class for raw SQL query.
*/
public class DbeamQueryBuilder implements Serializable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice abstraction! That is great to decouple how queries are build. In the future another DbeamQueryBase can be extended for very specific JDBC drivers (e.g. MS SQL server).

One nitpick: we don't need to preview the class names with Dbeam, the package already provides namespacing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

.setTableName(tableName)
.setBaseSqlQuery(baseSqlQuery)
.setPartitionPeriod(Days.ONE)
.build();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be like the following:

return create(tableName, Optional.empty());

So we avoid a bit of duplication.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, done.

private long[] findInputBounds(Connection connection, String tableName, String partitionCondition,
String splitColumn)
private long[] findInputBounds(
Connection connection, DbeamQueryBuilder baseSqlQuery, String splitColumn)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

baseSqlQuery -> baseSqlQueryBuilder?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class PasswordReader {
class PasswordReader extends ParameterFileReader {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for extends here.

Just update the calls to ParameterFileReader.readFromFile(...) given this is a static method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, reverted. No need to have this functionality here.

private static Optional<String> resolveSqlQueryParameter(JdbcExportPipelineOptions options)
throws IOException {
if (options.getSqlFile() != null) {
return Optional.of(new ParameterFileReader().readAsResource(options.getSqlFile()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am confused here...

By using readAsResource() does it means that users have to bundle the sqlFile into the JAR?

Would it make more sense to use Beam's FileSystems? So that users can point to a gs://some-bucket/some-object.sql, in the same way as password file..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, changed code to adhere to existing methods.


Assert.assertEquals(expected, actual);
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for keeping tests extensive! Some other scenarios we could consider:

  1. Multi line queries
  2. First line with comments (before SELECT)
  3. Queries with CTE (don't think we will be able to support)
  4. ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

private def queriesForBounds2(
min: Long, max: Long, parallelism: Int, splitColumn: String, queryFormat: String): java.util.List[String] = {
val queries = QueryBuilderArgs.queriesForBounds(min, max, parallelism, splitColumn, DbeamQueryBuilder.fromTablename(tablename))
val q2 = queries.asScala.map(x => x.toString()).toList.asJava
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this asScala ... asJava necessary? Isn't queries / QueryBuilderArgs.queriesForBounds() already java.util.List[String]?

rulle-io added 18 commits July 18, 2019 23:51
Tests fixed.
Create a dedicated class for sqlQuery.
Apply google code formatting.
Create a dedicated class for sqlQuery.
Move all SQL mangling into one file.
Tests fixed.
Apply google code formatting.
Add Builder/like methods withX()/build().
More unit-tests.
Refactoring of SQL parameters handling logic.
Add more tests.
Restore 'maven-enforcer-plugin.version' value.
Change type of 'limit' parameter.
Unit-tests are rewritten.
Add javadocs.
Adjust test to work with absolute file paths.
@codecov
Copy link

codecov bot commented Jul 18, 2019

Codecov Report

Merging #66 into master will increase coverage by 0.11%.
The diff coverage is 93.18%.

@@             Coverage Diff              @@
##             master      #66      +/-   ##
============================================
+ Coverage      89.7%   89.81%   +0.11%     
- Complexity      177      202      +25     
============================================
  Files            22       23       +1     
  Lines           680      766      +86     
  Branches         52       53       +1     
============================================
+ Hits            610      688      +78     
- Misses           47       53       +6     
- Partials         23       25       +2

@labianchin labianchin merged commit 95e1347 into spotify:master Aug 5, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants