From ed621d9866f5c696539dc2817ec89c5e5034ef9e Mon Sep 17 00:00:00 2001 From: Lorna Barber Date: Thu, 2 Jun 2022 14:47:24 -0700 Subject: [PATCH] add method to list streams --- .../jdbc/DBMetadataResultSetMetadata.java | 33 +++++ .../jdbc/SnowflakeDatabaseMetaData.java | 126 ++++++++++++++++++ 2 files changed, 159 insertions(+) diff --git a/src/main/java/net/snowflake/client/jdbc/DBMetadataResultSetMetadata.java b/src/main/java/net/snowflake/client/jdbc/DBMetadataResultSetMetadata.java index e69945079..7cbabe584 100644 --- a/src/main/java/net/snowflake/client/jdbc/DBMetadataResultSetMetadata.java +++ b/src/main/java/net/snowflake/client/jdbc/DBMetadataResultSetMetadata.java @@ -296,6 +296,39 @@ public enum DBMetadataResultSetMetadata { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR)), + + GET_STREAMS( + Arrays.asList( + "created_on", + "name", + "database_name", + "schema_name", + "owner", + "comment", + "table_name", + "source_type", + "base_tables", + "type", + "stale", + "mode", + "stale_after"), + Arrays.asList( + "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", "TEXT", + "TEXT", "TEXT"), + Arrays.asList( + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR, + Types.VARCHAR)), ; private List columnNames; diff --git a/src/main/java/net/snowflake/client/jdbc/SnowflakeDatabaseMetaData.java b/src/main/java/net/snowflake/client/jdbc/SnowflakeDatabaseMetaData.java index d0f357ddb..c9a0b34a3 100644 --- a/src/main/java/net/snowflake/client/jdbc/SnowflakeDatabaseMetaData.java +++ b/src/main/java/net/snowflake/client/jdbc/SnowflakeDatabaseMetaData.java @@ -2571,6 +2571,132 @@ public ResultSet getTypeInfo() throws SQLException { statement); } + /** + * Function to return a list of streams + * + * @param originalCatalog catalog name + * @param originalSchemaPattern schema name pattern + * @param streamName stream name + * @return a result set + * @throws SQLException if any SQL error occurs. + */ + public ResultSet getStreams( + String originalCatalog, String originalSchemaPattern, String streamName) throws SQLException { + logger.debug( + "public ResultSet getStreams(String catalog={}, String schemaPattern={}" + + "String streamName={}", + originalCatalog, + originalSchemaPattern, + streamName); + raiseSQLExceptionIfConnectionIsClosed(); + Statement statement = connection.createStatement(); + + // apply session context when catalog is unspecified + SFPair resPair = applySessionContext(originalCatalog, originalSchemaPattern); + final String catalog = resPair.left; + final String schemaPattern = resPair.right; + + final Pattern compiledSchemaPattern = Wildcard.toRegexPattern(schemaPattern, true); + final Pattern compiledStreamNamePattern = Wildcard.toRegexPattern(streamName, true); + + String showCommand = "show streams"; + + if (streamName != null + && !streamName.isEmpty() + && !streamName.trim().equals("%") + && !streamName.trim().equals(".*")) { + showCommand += " like '" + streamName + "'"; + } + + if (catalog == null) { + showCommand += " in account"; + } else if (catalog.isEmpty()) { + return SnowflakeDatabaseMetaDataResultSet.getEmptyResultSet(GET_STREAMS, statement); + } else { + String catalogEscaped = escapeSqlQuotes(catalog); + if (schemaPattern == null || isSchemaNameWildcardPattern(schemaPattern)) { + showCommand += " in database \"" + catalogEscaped + "\""; + } else if (schemaPattern.isEmpty()) { + return SnowflakeDatabaseMetaDataResultSet.getEmptyResultSet(GET_STREAMS, statement); + } else { + String schemaUnescaped = unescapeChars(schemaPattern); + if (streamName == null || Wildcard.isWildcardPatternStr(streamName)) { + showCommand += " in schema \"" + catalogEscaped + "\".\"" + schemaUnescaped + "\""; + } else if (streamName.isEmpty()) { + return SnowflakeDatabaseMetaDataResultSet.getEmptyResultSet(GET_STREAMS, statement); + } else { + String streamNameUnescaped = unescapeChars(streamName); + showCommand += + " in table \"" + + catalogEscaped + + "\".\"" + + schemaUnescaped + + "\".\"" + + streamNameUnescaped + + "\""; + } + } + } + + logger.debug("sql command to get stream metadata: {}", showCommand); + + ResultSet resultSet = + executeAndReturnEmptyResultIfNotFound(statement, showCommand, GET_STREAMS); + sendInBandTelemetryMetadataMetrics( + resultSet, "getStreams", originalCatalog, originalSchemaPattern, streamName, "none"); + + return new SnowflakeDatabaseMetaDataQueryResultSet(GET_STREAMS, resultSet, statement) { + @Override + public boolean next() throws SQLException { + logger.debug("public boolean next()"); + incrementRow(); + + // iterate throw the show streams result until we find an entry + // that matches the stream name + while (showObjectResultSet.next()) { + String createdOn = showObjectResultSet.getString(1); + String name = showObjectResultSet.getString(2); + String databaseName = showObjectResultSet.getString(3); + String schemaName = showObjectResultSet.getString(4); + String owner = showObjectResultSet.getString(5); + String comment = showObjectResultSet.getString(6); + String tableName = showObjectResultSet.getString(7); + String sourceType = showObjectResultSet.getString(8); + String baseTables = showObjectResultSet.getString(9); + String type = showObjectResultSet.getString(10); + String stale = showObjectResultSet.getString(11); + String mode = showObjectResultSet.getString(12); + String staleAfter = showObjectResultSet.getString(13); + + if ((compiledStreamNamePattern == null + || compiledStreamNamePattern.matcher(streamName).matches()) + && (compiledSchemaPattern == null + || compiledSchemaPattern.matcher(schemaName).matches()) + && (compiledStreamNamePattern == null + || compiledStreamNamePattern.matcher(streamName).matches())) { + logger.debug("Found a matched column:" + tableName + "." + streamName); + nextRow[0] = createdOn; + nextRow[1] = name; + nextRow[2] = databaseName; + nextRow[3] = schemaName; + nextRow[4] = owner; + nextRow[5] = comment; + nextRow[6] = tableName; + nextRow[7] = sourceType; + nextRow[8] = baseTables; + nextRow[9] = type; + nextRow[10] = stale; + nextRow[11] = mode; + nextRow[12] = staleAfter; + return true; + } + } + close(); + return false; + } + }; + } + @Override public ResultSet getIndexInfo( String catalog, String schema, String table, boolean unique, boolean approximate)