Skip to content

Commit

Permalink
add method to list streams
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorna Barber authored and SimbaGithub committed Jul 4, 2022
1 parent 152dd60 commit ed621d9
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,39 @@ public enum DBMetadataResultSetMetadata {
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR)),

GET_STREAMS(
Arrays.asList(
"created_on",
"name",
"database_name",
"schema_name",
"owner",
"comment",
"table_name",
"source_type",
"base_tables",
"type",
"stale",
"mode",
"stale_after"),
Arrays.asList(
"TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT",
"TEXT", "TEXT"),
Arrays.asList(
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR,
Types.VARCHAR)),
;

private List<String> columnNames;
Expand Down
126 changes: 126 additions & 0 deletions src/main/java/net/snowflake/client/jdbc/SnowflakeDatabaseMetaData.java
Original file line number Diff line number Diff line change
Expand Up @@ -2571,6 +2571,132 @@ public ResultSet getTypeInfo() throws SQLException {
statement);
}

/**
* Function to return a list of streams
*
* @param originalCatalog catalog name
* @param originalSchemaPattern schema name pattern
* @param streamName stream name
* @return a result set
* @throws SQLException if any SQL error occurs.
*/
public ResultSet getStreams(
String originalCatalog, String originalSchemaPattern, String streamName) throws SQLException {
logger.debug(
"public ResultSet getStreams(String catalog={}, String schemaPattern={}"
+ "String streamName={}",
originalCatalog,
originalSchemaPattern,
streamName);
raiseSQLExceptionIfConnectionIsClosed();
Statement statement = connection.createStatement();

// apply session context when catalog is unspecified
SFPair<String, String> resPair = applySessionContext(originalCatalog, originalSchemaPattern);
final String catalog = resPair.left;
final String schemaPattern = resPair.right;

final Pattern compiledSchemaPattern = Wildcard.toRegexPattern(schemaPattern, true);
final Pattern compiledStreamNamePattern = Wildcard.toRegexPattern(streamName, true);

String showCommand = "show streams";

if (streamName != null
&& !streamName.isEmpty()
&& !streamName.trim().equals("%")
&& !streamName.trim().equals(".*")) {
showCommand += " like '" + streamName + "'";
}

if (catalog == null) {
showCommand += " in account";
} else if (catalog.isEmpty()) {
return SnowflakeDatabaseMetaDataResultSet.getEmptyResultSet(GET_STREAMS, statement);
} else {
String catalogEscaped = escapeSqlQuotes(catalog);
if (schemaPattern == null || isSchemaNameWildcardPattern(schemaPattern)) {
showCommand += " in database \"" + catalogEscaped + "\"";
} else if (schemaPattern.isEmpty()) {
return SnowflakeDatabaseMetaDataResultSet.getEmptyResultSet(GET_STREAMS, statement);
} else {
String schemaUnescaped = unescapeChars(schemaPattern);
if (streamName == null || Wildcard.isWildcardPatternStr(streamName)) {
showCommand += " in schema \"" + catalogEscaped + "\".\"" + schemaUnescaped + "\"";
} else if (streamName.isEmpty()) {
return SnowflakeDatabaseMetaDataResultSet.getEmptyResultSet(GET_STREAMS, statement);
} else {
String streamNameUnescaped = unescapeChars(streamName);
showCommand +=
" in table \""
+ catalogEscaped
+ "\".\""
+ schemaUnescaped
+ "\".\""
+ streamNameUnescaped
+ "\"";
}
}
}

logger.debug("sql command to get stream metadata: {}", showCommand);

ResultSet resultSet =
executeAndReturnEmptyResultIfNotFound(statement, showCommand, GET_STREAMS);
sendInBandTelemetryMetadataMetrics(
resultSet, "getStreams", originalCatalog, originalSchemaPattern, streamName, "none");

return new SnowflakeDatabaseMetaDataQueryResultSet(GET_STREAMS, resultSet, statement) {
@Override
public boolean next() throws SQLException {
logger.debug("public boolean next()");
incrementRow();

// iterate throw the show streams result until we find an entry
// that matches the stream name
while (showObjectResultSet.next()) {
String createdOn = showObjectResultSet.getString(1);
String name = showObjectResultSet.getString(2);
String databaseName = showObjectResultSet.getString(3);
String schemaName = showObjectResultSet.getString(4);
String owner = showObjectResultSet.getString(5);
String comment = showObjectResultSet.getString(6);
String tableName = showObjectResultSet.getString(7);
String sourceType = showObjectResultSet.getString(8);
String baseTables = showObjectResultSet.getString(9);
String type = showObjectResultSet.getString(10);
String stale = showObjectResultSet.getString(11);
String mode = showObjectResultSet.getString(12);
String staleAfter = showObjectResultSet.getString(13);

if ((compiledStreamNamePattern == null
|| compiledStreamNamePattern.matcher(streamName).matches())
&& (compiledSchemaPattern == null
|| compiledSchemaPattern.matcher(schemaName).matches())
&& (compiledStreamNamePattern == null
|| compiledStreamNamePattern.matcher(streamName).matches())) {
logger.debug("Found a matched column:" + tableName + "." + streamName);
nextRow[0] = createdOn;
nextRow[1] = name;
nextRow[2] = databaseName;
nextRow[3] = schemaName;
nextRow[4] = owner;
nextRow[5] = comment;
nextRow[6] = tableName;
nextRow[7] = sourceType;
nextRow[8] = baseTables;
nextRow[9] = type;
nextRow[10] = stale;
nextRow[11] = mode;
nextRow[12] = staleAfter;
return true;
}
}
close();
return false;
}
};
}

@Override
public ResultSet getIndexInfo(
String catalog, String schema, String table, boolean unique, boolean approximate)
Expand Down

0 comments on commit ed621d9

Please sign in to comment.