Skip to content

Commit 7c60708

Browse files
Patryk Jatczakpiotrczarnas
Patryk Jatczak
authored andcommitted
Merged PR 2742: setting id on columnSpec on column presence in INFORMATION_SCHEMA.KEY_COLUMN_USAGE
Related work items: #12163
2 parents 4bab44f + 5ff6bee commit 7c60708

File tree

2 files changed

+136
-1
lines changed

2 files changed

+136
-1
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright © 2021 DQOps (support@dqops.com)
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.dqops.postgresql.connection;
17+
18+
import com.dqops.connectors.ConnectionProvider;
19+
import com.dqops.connectors.ConnectionProviderRegistryObjectMother;
20+
import com.dqops.connectors.ProviderType;
21+
import com.dqops.connectors.SourceTableModel;
22+
import com.dqops.connectors.postgresql.PostgresqlConnectionSpecObjectMother;
23+
import com.dqops.connectors.postgresql.PostgresqlSourceConnection;
24+
import com.dqops.core.jobqueue.JobCancellationToken;
25+
import com.dqops.core.secrets.SecretValueLookupContext;
26+
import com.dqops.core.secrets.SecretValueProviderObjectMother;
27+
import com.dqops.metadata.sources.ConnectionSpec;
28+
import com.dqops.metadata.sources.TableSpec;
29+
import com.dqops.postgresql.BasePostgresqlIntegrationTest;
30+
import com.dqops.sampledata.IntegrationTestSampleDataObjectMother;
31+
import com.dqops.sampledata.SampleCsvFileNames;
32+
import com.dqops.sampledata.SampleTableMetadata;
33+
import com.dqops.sampledata.SampleTableMetadataObjectMother;
34+
import org.junit.jupiter.api.AfterEach;
35+
import org.junit.jupiter.api.Assertions;
36+
import org.junit.jupiter.api.BeforeEach;
37+
import org.junit.jupiter.api.Test;
38+
import org.springframework.boot.test.context.SpringBootTest;
39+
40+
import java.util.ArrayList;
41+
import java.util.List;
42+
43+
@SpringBootTest
44+
public class PostgresqlSourceConnectionIntegrationTests extends BasePostgresqlIntegrationTest {
45+
private PostgresqlSourceConnection sut;
46+
private ConnectionSpec connectionSpec;
47+
private SecretValueLookupContext secretValueLookupContext;
48+
49+
@BeforeEach
50+
void setUp() {
51+
ConnectionProvider connectionProvider = ConnectionProviderRegistryObjectMother.getConnectionProvider(ProviderType.postgresql);
52+
secretValueLookupContext = new SecretValueLookupContext(null);
53+
connectionSpec = PostgresqlConnectionSpecObjectMother.create().expandAndTrim(SecretValueProviderObjectMother.getInstance(), secretValueLookupContext);
54+
this.sut = (PostgresqlSourceConnection)connectionProvider.createConnection(connectionSpec, false, this.secretValueLookupContext);
55+
}
56+
57+
@AfterEach
58+
void tearDown() {
59+
this.sut.close(); // maybe it does nothing, but it should be called anyway as an example
60+
}
61+
62+
@Test
63+
void open_whenCalled_thenJustReturns() {
64+
this.sut.open(this.secretValueLookupContext);
65+
}
66+
67+
@Test
68+
void retrieveTableMetadata_tableHasUniqueColumn_columnHasSetIsIdFieldToTrue() {
69+
SampleTableMetadata sampleTableMetadata = SampleTableMetadataObjectMother.createSampleTableMetadataForCsvFile(SampleCsvFileNames.continuous_days_one_row_per_day, ProviderType.postgresql);
70+
IntegrationTestSampleDataObjectMother.ensureTableExists(sampleTableMetadata);
71+
72+
this.sut.open(this.secretValueLookupContext);
73+
74+
String tableName = sampleTableMetadata.getTableSpec().getPhysicalTableName().getTableName();
75+
76+
String alterTableQuery = String.format("ALTER TABLE %s ADD UNIQUE (id)", tableName);
77+
this.sut.executeCommand(alterTableQuery, JobCancellationToken.createDummyJobCancellationToken());
78+
79+
List<SourceTableModel> tables = this.sut.listTables("public", tableName, 300, secretValueLookupContext);
80+
ArrayList<String> tableNames = new ArrayList<>();
81+
tableNames.add(tables.get(0).getTableName().getTableName());
82+
83+
List<TableSpec> tableSpecs = this.sut.retrieveTableMetadata("public", tableName, 300, tableNames, null, null);
84+
85+
TableSpec tableSpec = tableSpecs.get(0);
86+
Assertions.assertTrue(tableSpec.getColumns().get("id").isId());
87+
Assertions.assertFalse(tableSpec.getColumns().get("date").isId());
88+
Assertions.assertFalse(tableSpec.getColumns().get("value").isId());
89+
}
90+
91+
}

dqops/src/main/java/com/dqops/connectors/AbstractSqlSourceConnection.java

+45-1
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import tech.tablesaw.columns.Column;
2828

2929
import java.util.*;
30-
import java.util.stream.Collectors;
3130

3231
/**
3332
* Base class for source connections that are using SQL. The derived providers can reuse the logic for querying the metadata using the INFORMATION_SCHEMA management views.
@@ -236,6 +235,19 @@ public List<TableSpec> retrieveTableMetadata(String schemaName,
236235
column.setName(column.name().toLowerCase(Locale.ROOT));
237236
}
238237

238+
HashMap<String, HashSet<String>> tableColumnMap = new HashMap<>();
239+
try {
240+
String keyColumnUsageSql = buildKeyColumnUsageSql(schemaName, tableNames);
241+
tech.tablesaw.api.Table keyColumnUsageResult = this.executeQuery(keyColumnUsageSql, JobCancellationToken.createDummyJobCancellationToken(), null, false);
242+
for (Row row : keyColumnUsageResult) {
243+
String tableName = row.getString("table_name");
244+
String columnName = row.getString("column_name");
245+
tableColumnMap.computeIfAbsent(tableName, k -> new HashSet<>()).add(columnName);
246+
}
247+
} catch (Exception ex) {
248+
// exception is swallowed
249+
}
250+
239251
HashMap<String, TableSpec> tablesByTableName = new LinkedHashMap<>();
240252

241253
for (Row colRow : tableResult) {
@@ -298,6 +310,10 @@ else if (tableResult.containsColumn("character_octet_length") &&
298310
columnType.setNullable(isNullable);
299311
columnSpec.setTypeSnapshot(columnType);
300312
tableSpec.getColumns().put(columnName, columnSpec);
313+
314+
if(tableColumnMap.containsKey(physicalTableName) && tableColumnMap.get(physicalTableName).contains(columnName)){
315+
columnSpec.setId(true);
316+
}
301317
}
302318

303319
return tableSpecs;
@@ -350,6 +366,34 @@ public String buildListColumnsSql(String schemaName, List<String> tableNames) {
350366
return sql;
351367
}
352368

369+
/**
370+
* Creates an SQL for listing columns from information_schema.key_column_usage for the given tables.
371+
* @param schemaName Schema name.
372+
* @param tableNames Table names to list.
373+
* @return SQL of the INFORMATION_SCHEMA query.
374+
*/
375+
public String buildKeyColumnUsageSql(String schemaName, List<String> tableNames) {
376+
StringBuilder KeyColumnUsageQueryBuilder = new StringBuilder();
377+
KeyColumnUsageQueryBuilder.append("SELECT TABLE_NAME, COLUMN_NAME\n");
378+
KeyColumnUsageQueryBuilder.append("FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE\n");
379+
KeyColumnUsageQueryBuilder.append("WHERE TABLE_SCHEMA = '").append(schemaName).append("'");
380+
if (tableNames != null && tableNames.size() > 0) {
381+
KeyColumnUsageQueryBuilder.append(" AND TABLE_NAME IN (");
382+
for (int i = 0; i < tableNames.size(); i++) {
383+
String tableName = tableNames.get(i);
384+
if (i > 0) {
385+
KeyColumnUsageQueryBuilder.append(",");
386+
}
387+
KeyColumnUsageQueryBuilder.append('\'');
388+
KeyColumnUsageQueryBuilder.append(tableName.replace("'", "''"));
389+
KeyColumnUsageQueryBuilder.append('\'');
390+
}
391+
KeyColumnUsageQueryBuilder.append(") ");
392+
}
393+
String KeyColumnUsageQuery = KeyColumnUsageQueryBuilder.toString();
394+
return KeyColumnUsageQuery;
395+
}
396+
353397
/**
354398
* Executes a provider specific SQL that returns a query. For example a SELECT statement or any other SQL text that also returns rows.
355399
*

0 commit comments

Comments
 (0)