diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java index a4f5209d43ce..d231f2b40cc6 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/BaseJdbcClient.java @@ -250,7 +250,8 @@ public JdbcTableHandle getTableHandle(ConnectorSession session, PreparedQuery pr // The query is opaque, so we don't know referenced tables Optional.empty(), 0, - Optional.empty()); + Optional.empty(), + ImmutableList.of()); } catch (SQLException e) { throw new TrinoException(JDBC_ERROR, "Failed to get table handle for prepared query. " + firstNonNull(e.getMessage(), e), e); @@ -1328,6 +1329,7 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) checkArgument(handle.isNamedRelation(), "Unable to delete from synthetic table: %s", handle); checkArgument(handle.getLimit().isEmpty(), "Unable to delete when limit is set: %s", handle); checkArgument(handle.getSortOrder().isEmpty(), "Unable to delete when sort order is set: %s", handle); + checkArgument(handle.getUpdateAssignments().isEmpty(), "Unable to delete when update assignments are set: %s", handle); verify(handle.getAuthorization().isEmpty(), "Unexpected authorization is required for table: %s".formatted(handle)); try (Connection connection = connectionFactory.openConnection(session)) { verify(connection.getAutoCommit()); @@ -1347,6 +1349,33 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) } } + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + checkArgument(handle.isNamedRelation(), "Unable to update from synthetic table: %s", handle); + checkArgument(handle.getLimit().isEmpty(), "Unable to update when limit is set: %s", handle); + checkArgument(handle.getSortOrder().isEmpty(), "Unable to update when sort order is set: %s", handle); + checkArgument(!handle.getUpdateAssignments().isEmpty(), "Unable to update when update assignments are not set: %s", handle); + verify(handle.getAuthorization().isEmpty(), "Unexpected authorization is required for table: %s".formatted(handle)); + try (Connection connection = connectionFactory.openConnection(session)) { + verify(connection.getAutoCommit()); + PreparedQuery preparedQuery = queryBuilder.prepareUpdateQuery( + this, + session, + connection, + handle.getRequiredNamedRelation(), + handle.getConstraint(), + getAdditionalPredicate(handle.getConstraintExpressions(), Optional.empty()), + handle.getUpdateAssignments()); + try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery, Optional.empty())) { + return OptionalLong.of(preparedStatement.executeUpdate()); + } + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + @Override public void truncateTable(ConnectorSession session, JdbcTableHandle handle) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java index 63f4f6da2e06..67bd6194c933 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/CachingJdbcClient.java @@ -602,6 +602,14 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) return deletedRowsCount; } + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + OptionalLong updatedRowsCount = delegate.update(session, handle); + onDataChanged(handle.getRequiredNamedRelation().getSchemaTableName()); + return updatedRowsCount; + } + @Override public void truncateTable(ConnectorSession session, JdbcTableHandle handle) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java index f0a06840519c..3ae7da923f1b 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultJdbcMetadata.java @@ -45,6 +45,7 @@ import io.trino.spi.connector.LimitApplicationResult; import io.trino.spi.connector.ProjectionApplicationResult; import io.trino.spi.connector.RetryMode; +import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SchemaTablePrefix; import io.trino.spi.connector.SortItem; @@ -96,6 +97,7 @@ import static io.trino.plugin.jdbc.JdbcWriteSessionProperties.isNonTransactionalInsert; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; +import static io.trino.spi.connector.RowChangeParadigm.CHANGE_ONLY_UPDATED_COLUMNS; import static io.trino.spi.type.BigintType.BIGINT; import static java.lang.Math.max; import static java.util.Objects.requireNonNull; @@ -247,7 +249,8 @@ public Optional> applyFilter(C handle.getColumns(), handle.getOtherReferencedTables(), handle.getNextSyntheticColumnId(), - handle.getAuthorization()); + handle.getAuthorization(), + handle.getUpdateAssignments()); return Optional.of( remainingExpression.isPresent() @@ -269,7 +272,8 @@ private JdbcTableHandle flushAttributesAsQuery(ConnectorSession session, JdbcTab Optional.of(columns), handle.getAllReferencedTables(), handle.getNextSyntheticColumnId(), - handle.getAuthorization()); + handle.getAuthorization(), + handle.getUpdateAssignments()); } @Override @@ -315,7 +319,8 @@ public Optional> applyProjecti Optional.of(newColumns), handle.getOtherReferencedTables(), handle.getNextSyntheticColumnId(), - handle.getAuthorization()), + handle.getAuthorization(), + handle.getUpdateAssignments()), projections, assignments.entrySet().stream() .map(assignment -> new Assignment( @@ -425,7 +430,8 @@ public Optional> applyAggrega Optional.of(newColumnsList), handle.getAllReferencedTables(), nextSyntheticColumnId, - handle.getAuthorization()); + handle.getAuthorization(), + handle.getUpdateAssignments()); return Optional.of(new AggregationApplicationResult<>(handle, projections.build(), resultAssignments.build(), ImmutableMap.of(), precalculateStatisticsForPushdown)); } @@ -516,7 +522,8 @@ public Optional> applyJoin( .addAll(rightReferencedTables) .build())), nextSyntheticColumnId, - leftHandle.getAuthorization()), + leftHandle.getAuthorization(), + leftHandle.getUpdateAssignments()), ImmutableMap.copyOf(newLeftColumns), ImmutableMap.copyOf(newRightColumns), precalculateStatisticsForPushdown)); @@ -578,7 +585,8 @@ public Optional> applyLimit(Connect handle.getColumns(), handle.getOtherReferencedTables(), handle.getNextSyntheticColumnId(), - handle.getAuthorization()); + handle.getAuthorization(), + handle.getUpdateAssignments()); return Optional.of(new LimitApplicationResult<>(handle, jdbcClient.isLimitGuaranteed(session), precalculateStatisticsForPushdown)); } @@ -632,7 +640,8 @@ public Optional> applyTopN( handle.getColumns(), handle.getOtherReferencedTables(), handle.getNextSyntheticColumnId(), - handle.getAuthorization()); + handle.getAuthorization(), + handle.getUpdateAssignments()); return Optional.of(new TopNApplicationResult<>(sortedTableHandle, jdbcClient.isTopNGuaranteed(session), precalculateStatisticsForPushdown)); } @@ -901,6 +910,24 @@ public OptionalLong executeDelete(ConnectorSession session, ConnectorTableHandle return jdbcClient.delete(session, (JdbcTableHandle) handle); } + @Override + public Optional applyUpdate(ConnectorSession session, ConnectorTableHandle handle, Map assignments) + { + return Optional.of(((JdbcTableHandle) handle).withAssignments(assignments)); + } + + @Override + public RowChangeParadigm getRowChangeParadigm(ConnectorSession session, ConnectorTableHandle tableHandle) + { + return CHANGE_ONLY_UPDATED_COLUMNS; + } + + @Override + public OptionalLong executeUpdate(ConnectorSession session, ConnectorTableHandle handle) + { + return jdbcClient.update(session, (JdbcTableHandle) handle); + } + @Override public void truncateTable(ConnectorSession session, ConnectorTableHandle tableHandle) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultQueryBuilder.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultQueryBuilder.java index 4977bc7c6463..dfd3b0674329 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultQueryBuilder.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DefaultQueryBuilder.java @@ -176,6 +176,57 @@ public PreparedQuery prepareDeleteQuery( return new PreparedQuery(sql, accumulator.build()); } + @Override + public PreparedQuery prepareUpdateQuery( + JdbcClient client, + ConnectorSession session, + Connection connection, + JdbcNamedRelationHandle baseRelation, + TupleDomain tupleDomain, + Optional additionalPredicate, + List assignments) + { + ImmutableList.Builder accumulator = ImmutableList.builder(); + + String sql = "UPDATE " + getRelation(client, baseRelation.getRemoteTableName()) + " SET "; + + assignments.forEach(entry -> { + JdbcColumnHandle columnHandle = entry.column(); + accumulator.add( + new QueryParameter( + columnHandle.getJdbcTypeHandle(), + columnHandle.getColumnType(), + entry.queryParameter().getValue())); + }); + + sql += assignments.stream() + .map(JdbcAssignmentItem::column) + .map(columnHandle -> { + String bindExpression = getWriteFunction( + client, + session, + connection, + columnHandle.getJdbcTypeHandle(), + columnHandle.getColumnType()) + .getBindExpression(); + return client.quoted(columnHandle.getColumnName()) + " = " + bindExpression; + }) + .collect(joining(", ")); + + ImmutableList.Builder conjuncts = ImmutableList.builder(); + + toConjuncts(client, session, connection, tupleDomain, conjuncts, accumulator::add); + additionalPredicate.ifPresent(predicate -> { + conjuncts.add(predicate.expression()); + accumulator.addAll(predicate.parameters()); + }); + List clauses = conjuncts.build(); + if (!clauses.isEmpty()) { + sql += " WHERE " + Joiner.on(" AND ").join(clauses); + } + return new PreparedQuery(sql, accumulator.build()); + } + @Override public PreparedStatement prepareStatement( JdbcClient client, diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java index d182814982d5..0e70ec7beb94 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/ForwardingJdbcClient.java @@ -433,6 +433,12 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) return delegate().delete(session, handle); } + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + return delegate().update(session, handle); + } + @Override public void truncateTable(ConnectorSession session, JdbcTableHandle handle) { diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcAssignmentItem.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcAssignmentItem.java new file mode 100644 index 000000000000..fd46ffbd778f --- /dev/null +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcAssignmentItem.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.jdbc; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import static java.util.Objects.requireNonNull; + +public record JdbcAssignmentItem(@JsonProperty("column") JdbcColumnHandle column, @JsonProperty("queryParameter") QueryParameter queryParameter) +{ + public JdbcAssignmentItem + { + requireNonNull(column, "column is null"); + requireNonNull(queryParameter, "queryParameter is null"); + } +} diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java index 02d92c3c9506..9cb1e5ed7b43 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcClient.java @@ -236,5 +236,7 @@ default Optional getTableScanRedirection(Con void truncateTable(ConnectorSession session, JdbcTableHandle handle); + OptionalLong update(ConnectorSession session, JdbcTableHandle handle); + OptionalInt getMaxWriteParallelism(ConnectorSession session); } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java index 3360bd49bdb9..c48c737cbd8a 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/JdbcTableHandle.java @@ -21,9 +21,11 @@ import io.trino.plugin.jdbc.expression.ParameterizedExpression; import io.trino.spi.connector.ColumnHandle; import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.expression.Constant; import io.trino.spi.predicate.TupleDomain; import java.util.List; +import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.OptionalLong; @@ -31,6 +33,7 @@ import java.util.stream.Collectors; import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; import static java.util.Objects.requireNonNull; public final class JdbcTableHandle @@ -59,6 +62,7 @@ public final class JdbcTableHandle private final int nextSyntheticColumnId; private final Optional authorization; + private final List updateAssignments; public JdbcTableHandle(SchemaTableName schemaTableName, RemoteTableName remoteTableName, Optional comment) { @@ -71,7 +75,8 @@ public JdbcTableHandle(SchemaTableName schemaTableName, RemoteTableName remoteTa Optional.empty(), Optional.of(ImmutableSet.of()), 0, - Optional.empty()); + Optional.empty(), + ImmutableList.of()); } @JsonCreator @@ -84,7 +89,8 @@ public JdbcTableHandle( @JsonProperty("columns") Optional> columns, @JsonProperty("otherReferencedTables") Optional> otherReferencedTables, @JsonProperty("nextSyntheticColumnId") int nextSyntheticColumnId, - @JsonProperty("authorization") Optional authorization) + @JsonProperty("authorization") Optional authorization, + @JsonProperty("updateAssignments") List updateAssignments) { this.relationHandle = requireNonNull(relationHandle, "relationHandle is null"); this.constraint = requireNonNull(constraint, "constraint is null"); @@ -96,6 +102,7 @@ public JdbcTableHandle( this.otherReferencedTables = otherReferencedTables.map(ImmutableSet::copyOf); this.nextSyntheticColumnId = nextSyntheticColumnId; this.authorization = requireNonNull(authorization, "authorization is null"); + this.updateAssignments = requireNonNull(updateAssignments, "updateAssignments is null"); } public JdbcTableHandle intersectedWithConstraint(TupleDomain newConstraint) @@ -109,7 +116,28 @@ public JdbcTableHandle intersectedWithConstraint(TupleDomain newCo columns, otherReferencedTables, nextSyntheticColumnId, - authorization); + authorization, + updateAssignments); + } + + public JdbcTableHandle withAssignments(Map assignments) + { + return new JdbcTableHandle( + relationHandle, + constraint, + constraintExpressions, + sortOrder, + limit, + columns, + otherReferencedTables, + nextSyntheticColumnId, + authorization, + assignments.entrySet() + .stream() + .map(e -> { + return new JdbcAssignmentItem((JdbcColumnHandle) e.getKey(), new QueryParameter(e.getValue().getType(), Optional.ofNullable(e.getValue().getValue()))); + }) + .collect(toImmutableList())); } public JdbcNamedRelationHandle asPlainTable() @@ -168,6 +196,12 @@ public Optional> getOtherReferencedTables() return otherReferencedTables; } + @JsonProperty + public List getUpdateAssignments() + { + return updateAssignments; + } + /** * Remote tables referenced by the query. {@link Optional#empty()} when unknown. */ @@ -236,13 +270,14 @@ public boolean equals(Object obj) Objects.equals(this.limit, o.limit) && Objects.equals(this.columns, o.columns) && this.nextSyntheticColumnId == o.nextSyntheticColumnId && - Objects.equals(this.authorization, o.authorization); + Objects.equals(this.authorization, o.authorization) && + Objects.equals(this.updateAssignments, o.updateAssignments); } @Override public int hashCode() { - return Objects.hash(relationHandle, constraint, constraintExpressions, sortOrder, limit, columns, nextSyntheticColumnId, authorization); + return Objects.hash(relationHandle, constraint, constraintExpressions, sortOrder, limit, columns, nextSyntheticColumnId, authorization, updateAssignments); } @Override @@ -267,6 +302,10 @@ else if (!constraint.isAll()) { limit.ifPresent(value -> builder.append(" limit=").append(value)); columns.ifPresent(value -> builder.append(" columns=").append(value)); authorization.ifPresent(value -> builder.append(" authorization=").append(value)); + if (!updateAssignments.isEmpty()) { + builder.append(" updateAssignments="); + updateAssignments.forEach(builder::append); + } return builder.toString(); } } diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/QueryBuilder.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/QueryBuilder.java index 1a71fd4aca4d..52cb58e80cb5 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/QueryBuilder.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/QueryBuilder.java @@ -60,6 +60,15 @@ PreparedQuery prepareDeleteQuery( TupleDomain tupleDomain, Optional additionalPredicate); + PreparedQuery prepareUpdateQuery( + JdbcClient client, + ConnectorSession session, + Connection connection, + JdbcNamedRelationHandle baseRelation, + TupleDomain tupleDomain, + Optional additionalPredicate, + List assignments); + PreparedStatement prepareStatement( JdbcClient client, ConnectorSession session, diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/JdbcClientStats.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/JdbcClientStats.java index 1ed0947a3de2..01e90fc63527 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/JdbcClientStats.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/JdbcClientStats.java @@ -64,6 +64,7 @@ public final class JdbcClientStats private final JdbcApiStats convertPredicate = new JdbcApiStats(); private final JdbcApiStats getTableScanRedirection = new JdbcApiStats(); private final JdbcApiStats delete = new JdbcApiStats(); + private final JdbcApiStats update = new JdbcApiStats(); private final JdbcApiStats truncateTable = new JdbcApiStats(); @Managed @@ -388,6 +389,13 @@ public JdbcApiStats getDelete() return delete; } + @Managed + @Nested + public JdbcApiStats getUpdate() + { + return update; + } + @Managed @Nested public JdbcApiStats getTruncateTable() diff --git a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java index 9ea18c34916b..ae1e01d029e9 100644 --- a/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java +++ b/plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/jmx/StatisticsAwareJdbcClient.java @@ -453,6 +453,12 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) return stats.getDelete().wrap(() -> delegate().delete(session, handle)); } + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + return stats.getUpdate().wrap(() -> delegate().update(session, handle)); + } + @Override public void truncateTable(ConnectorSession session, JdbcTableHandle handle) { diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorSmokeTest.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorSmokeTest.java index fcae6d144aca..ddc45ee861b0 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorSmokeTest.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorSmokeTest.java @@ -23,7 +23,10 @@ public abstract class BaseJdbcConnectorSmokeTest protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) { switch (connectorBehavior) { - case SUPPORTS_UPDATE: // not supported by any JDBC connector + case SUPPORTS_UPDATE: + return true; + case SUPPORTS_ROW_LEVEL_UPDATE: + return false; case SUPPORTS_MERGE: // not supported by any JDBC connector return false; diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java index 6b379de08f5a..c3210b7b617c 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/BaseJdbcConnectorTest.java @@ -101,14 +101,18 @@ import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_LIMIT_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_MERGE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_NATIVE_QUERY; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_NOT_NULL_CONSTRAINT; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_ARITHMETIC_EXPRESSION_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN_WITH_LIKE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ROW_LEVEL_DELETE; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ROW_LEVEL_UPDATE; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_ROW_TYPE; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TOPN_PUSHDOWN; import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_TOPN_PUSHDOWN_WITH_VARCHAR; +import static io.trino.testing.TestingConnectorBehavior.SUPPORTS_UPDATE; import static io.trino.testing.TestingNames.randomNameSuffix; import static io.trino.testing.assertions.Assert.assertEventually; import static java.lang.String.format; @@ -135,11 +139,12 @@ public void afterClass() protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) { return switch (connectorBehavior) { + case SUPPORTS_UPDATE -> true; case SUPPORTS_CREATE_MATERIALIZED_VIEW, SUPPORTS_CREATE_VIEW, SUPPORTS_MERGE, SUPPORTS_PREDICATE_EXPRESSION_PUSHDOWN, - SUPPORTS_UPDATE -> false; + SUPPORTS_ROW_LEVEL_UPDATE -> false; // Dynamic filters can be pushed down only if predicate push down is supported. // It is possible for a connector to have predicate push down support but not push down dynamic filters. // TODO default SUPPORTS_DYNAMIC_FILTER_PUSHDOWN to SUPPORTS_PREDICATE_PUSHDOWN @@ -1516,6 +1521,170 @@ protected TestView createSleepingView(Duration minimalSleepDuration) throw new UnsupportedOperationException(); } + @Override + public void testUpdateNotNullColumn() + { + // we don't support metadata update for null expressions yet, remove override as soon as support will be added + if (hasBehavior(SUPPORTS_ROW_LEVEL_UPDATE)) { + super.testUpdateNotNullColumn(); + return; + } + + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE)); + + if (!hasBehavior(SUPPORTS_NOT_NULL_CONSTRAINT)) { + assertQueryFails( + "CREATE TABLE not_null_constraint (not_null_col INTEGER NOT NULL)", + format("line 1:35: Catalog '%s' does not support non-null column for column name 'not_null_col'", getSession().getCatalog().orElseThrow())); + return; + } + + try (TestTable table = new TestTable(getQueryRunner()::execute, "update_not_null", "(nullable_col INTEGER, not_null_col INTEGER NOT NULL)")) { + assertUpdate(format("INSERT INTO %s (nullable_col, not_null_col) VALUES (1, 10)", table.getName()), 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES (1, 10)"); + assertQueryFails("UPDATE " + table.getName() + " SET not_null_col = NULL WHERE nullable_col = 1", MODIFYING_ROWS_MESSAGE); + assertQueryFails("UPDATE " + table.getName() + " SET not_null_col = TRY(5/0) where nullable_col = 1", MODIFYING_ROWS_MESSAGE); + } + } + + @Override + public void testUpdateRowType() + { + // we don't support metadata update for expressions yet, remove override as soon as support will be added + if (hasBehavior(SUPPORTS_ROW_LEVEL_UPDATE)) { + super.testUpdateRowType(); + return; + } + + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_UPDATE) && hasBehavior(SUPPORTS_ROW_TYPE)); + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_with_predicates_on_row_types", "(int_t INT, row_t ROW(f1 INT, f2 INT))")) { + String tableName = table.getName(); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, ROW(2, 3)), (11, ROW(12, 13)), (21, ROW(22, 23))", 3); + assertQueryFails("UPDATE " + tableName + " SET int_t = int_t - 1 WHERE row_t.f2 = 3", MODIFYING_ROWS_MESSAGE); + } + } + + @Override + public void testUpdateRowConcurrently() + throws Exception + { + // we don't support metadata update for expressions yet, remove override as soon as support will be added + if (hasBehavior(SUPPORTS_ROW_LEVEL_UPDATE)) { + super.testUpdateRowConcurrently(); + return; + } + + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_UPDATE)); + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_row", "(a INT, b INT, c INT)", ImmutableList.of("1, 2, 3"))) { + assertQueryFails("UPDATE " + table.getName() + " SET a = a + 1", MODIFYING_ROWS_MESSAGE); + } + } + + @Override + public void testUpdateAllValues() + { + // we don't support metadata update for update all, remove override as soon as support will be added + if (hasBehavior(SUPPORTS_ROW_LEVEL_UPDATE)) { + super.testUpdateAllValues(); + return; + } + + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_UPDATE)); + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_all", "(a INT, b INT, c INT)", ImmutableList.of("1, 2, 3"))) { + assertQueryFails("UPDATE " + table.getName() + " SET a = 1, b = 1, c = 2", MODIFYING_ROWS_MESSAGE); + } + } + + @Override + public void testUpdateWithPredicates() + { + // we don't support metadata update for expressions yet, remove override as soon as support will be added + // TODO add more test cases to basic test + if (hasBehavior(SUPPORTS_ROW_LEVEL_UPDATE)) { + super.testUpdateWithPredicates(); + return; + } + + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_UPDATE)); + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_row_predicates", "(a INT, b INT, c INT)")) { + String tableName = table.getName(); + assertUpdate("INSERT INTO " + tableName + " VALUES (1, 2, 3), (11, 12, 13), (21, 22, 23)", 3); + assertUpdate("UPDATE " + tableName + " SET a = 5 WHERE c = 3", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (5, 2, 3), (11, 12, 13), (21, 22, 23)"); + + assertUpdate("UPDATE " + tableName + " SET c = 6 WHERE a = 11", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (5, 2, 3), (11, 12, 6), (21, 22, 23)"); + + assertUpdate("UPDATE " + tableName + " SET b = 44 WHERE b = 22", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (5, 2, 3), (11, 12, 6), (21, 44, 23)"); + + assertUpdate("UPDATE " + tableName + " SET b = 45 WHERE a > 5", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (5, 2, 3), (11, 45, 6), (21, 45, 23)"); + + assertUpdate("UPDATE " + tableName + " SET b = 46 WHERE a < 21", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (5, 46, 3), (11, 46, 6), (21, 45, 23)"); + + assertUpdate("UPDATE " + tableName + " SET b = 47 WHERE a != 11", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (5, 47, 3), (11, 46, 6), (21, 47, 23)"); + + assertUpdate("UPDATE " + tableName + " SET b = 48 WHERE a IN (5, 11)", 2); + assertQuery("SELECT * FROM " + tableName, "VALUES (5, 48, 3), (11, 48, 6), (21, 47, 23)"); + + assertUpdate("UPDATE " + tableName + " SET b = 49 WHERE a NOT IN (5, 11)", 1); + assertQuery("SELECT * FROM " + tableName, "VALUES (5, 48, 3), (11, 48, 6), (21, 49, 23)"); + + assertQueryFails("UPDATE " + tableName + " SET b = b + 3 WHERE a NOT IN (5, 11)", MODIFYING_ROWS_MESSAGE); + } + } + + @Test + public void testConstantUpdateWithVarcharEqualityPredicates() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_UPDATE)); + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_varchar", "(col1 INT, col2 varchar(1))", ImmutableList.of("1, 'a'", "2, 'A'"))) { + if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_EQUALITY)) { + assertQueryFails("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 = 'A'", MODIFYING_ROWS_MESSAGE); + return; + } + assertUpdate("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 = 'A'", 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES (1, 'a'), (20, 'A')"); + } + } + + @Test + public void testConstantUpdateWithVarcharInequalityPredicates() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_UPDATE)); + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_varchar", "(col1 INT, col2 varchar(1))", ImmutableList.of("1, 'a'", "2, 'A'"))) { + if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY)) { + assertQueryFails("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 != 'A'", MODIFYING_ROWS_MESSAGE); + return; + } + + assertUpdate("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 != 'A'", 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES (20, 'a'), (2, 'A')"); + } + } + + @Test + public void testConstantUpdateWithVarcharGreaterAndLowerPredicate() + { + skipTestUnless(hasBehavior(SUPPORTS_CREATE_TABLE) && hasBehavior(SUPPORTS_UPDATE)); + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_varchar", "(col1 INT, col2 varchar(1))", ImmutableList.of("1, 'a'", "2, 'A'"))) { + if (!hasBehavior(SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY)) { + assertQueryFails("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 > 'A'", MODIFYING_ROWS_MESSAGE); + assertQueryFails("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 < 'A'", MODIFYING_ROWS_MESSAGE); + return; + } + + assertUpdate("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 > 'A'", 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES (20, 'a'), (2, 'A')"); + + assertUpdate("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 < 'a'", 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES (20, 'a'), (20, 'A')"); + } + } + @Test public void testDeleteWithBigintEqualityPredicate() { diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java index b0c158560c5f..49a0711f9992 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestCachingJdbcClient.java @@ -652,7 +652,8 @@ public void testCacheGetTableStatisticsWithQueryRelationHandle() Optional.empty(), Optional.of(Set.of(new SchemaTableName(schema, "first"))), 0, - Optional.empty()); + Optional.empty(), + ImmutableList.of()); // load assertStatisticsCacheStats(cachingJdbcClient).loads(1).misses(1).afterRunning(() -> { diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcConnectionCreation.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcConnectionCreation.java index a31a026eb105..663fa1585802 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcConnectionCreation.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcConnectionCreation.java @@ -81,7 +81,7 @@ public Object[][] testCases() {"CREATE TABLE copy_of_nation AS SELECT * FROM nation", 6, Optional.empty()}, {"INSERT INTO copy_of_nation SELECT * FROM nation", 6, Optional.empty()}, {"DELETE FROM copy_of_nation WHERE nationkey = 3", 1, Optional.empty()}, - {"UPDATE copy_of_nation SET name = 'POLAND' WHERE nationkey = 1", 1, Optional.of(MODIFYING_ROWS_MESSAGE)}, + {"UPDATE copy_of_nation SET name = 'POLAND' WHERE nationkey = 1", 1, Optional.empty()}, {"MERGE INTO copy_of_nation n USING region r ON r.regionkey= n.regionkey WHEN MATCHED THEN DELETE", 1, Optional.of(MODIFYING_ROWS_MESSAGE)}, {"DROP TABLE copy_of_nation", 1, Optional.empty()}, {"SHOW SCHEMAS", 1, Optional.empty()}, diff --git a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSetProvider.java b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSetProvider.java index fa9f7c9b3c5a..25bdd51871e7 100644 --- a/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSetProvider.java +++ b/plugin/trino-base-jdbc/src/test/java/io/trino/plugin/jdbc/TestJdbcRecordSetProvider.java @@ -201,7 +201,8 @@ private RecordCursor getCursor(JdbcTableHandle jdbcTableHandle, List toColumnMapping(ConnectorSession session, Connection connection, JdbcTypeHandle typeHandle) { diff --git a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/BaseClickHouseConnectorSmokeTest.java b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/BaseClickHouseConnectorSmokeTest.java index d484ac9cb9a0..34d9b25f59c9 100644 --- a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/BaseClickHouseConnectorSmokeTest.java +++ b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/BaseClickHouseConnectorSmokeTest.java @@ -26,6 +26,7 @@ public abstract class BaseClickHouseConnectorSmokeTest protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) { switch (connectorBehavior) { + case SUPPORTS_UPDATE: case SUPPORTS_DELETE: return false; case SUPPORTS_TRUNCATE: diff --git a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java index edbd500eee4f..fe7d59daeda6 100644 --- a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java +++ b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseConnectorTest.java @@ -78,7 +78,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) SUPPORTS_PREDICATE_PUSHDOWN_WITH_VARCHAR_INEQUALITY, SUPPORTS_ROW_TYPE, SUPPORTS_SET_COLUMN_TYPE, - SUPPORTS_TOPN_PUSHDOWN -> false; + SUPPORTS_TOPN_PUSHDOWN, + SUPPORTS_UPDATE -> false; default -> super.hasBehavior(connectorBehavior); }; } diff --git a/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java b/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java index 5f7a1865f91b..b9dc5592764f 100644 --- a/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java +++ b/plugin/trino-druid/src/main/java/io/trino/plugin/druid/DruidJdbcClient.java @@ -415,7 +415,8 @@ private JdbcTableHandle prepareTableHandleForQuery(JdbcTableHandle table) table.getColumns(), table.getOtherReferencedTables(), table.getNextSyntheticColumnId(), - table.getAuthorization()); + table.getAuthorization(), + table.getUpdateAssignments()); } return table; @@ -462,6 +463,12 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE); } + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE); + } + @Override public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata) { diff --git a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java index 2e0643b2b6d9..0b2df1b0df1d 100644 --- a/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java +++ b/plugin/trino-druid/src/test/java/io/trino/plugin/druid/BaseDruidConnectorTest.java @@ -81,7 +81,8 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) SUPPORTS_RENAME_TABLE, SUPPORTS_ROW_TYPE, SUPPORTS_SET_COLUMN_TYPE, - SUPPORTS_TOPN_PUSHDOWN -> false; + SUPPORTS_TOPN_PUSHDOWN, + SUPPORTS_UPDATE -> false; default -> super.hasBehavior(connectorBehavior); }; } diff --git a/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/BaseMySqlFailureRecoveryTest.java b/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/BaseMySqlFailureRecoveryTest.java index 21a44959c612..8ed01bb00bec 100644 --- a/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/BaseMySqlFailureRecoveryTest.java +++ b/plugin/trino-mysql/src/test/java/io/trino/plugin/mysql/BaseMySqlFailureRecoveryTest.java @@ -19,11 +19,14 @@ import io.trino.plugin.jdbc.BaseJdbcFailureRecoveryTest; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; +import org.testng.SkipException; import java.util.List; import java.util.Map; +import java.util.Optional; import static io.trino.plugin.mysql.MySqlQueryRunner.createMySqlQueryRunner; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public abstract class BaseMySqlFailureRecoveryTest extends BaseJdbcFailureRecoveryTest @@ -52,4 +55,26 @@ protected QueryRunner createQueryRunner( "exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager")); }); } + + @Override + protected void testUpdateWithSubquery() + { + assertThatThrownBy(super::testUpdateWithSubquery).hasMessageContaining("Unexpected Join over for-update table scan"); + throw new SkipException("skipped"); + } + + @Override + protected void testUpdate() + { + // This simple update on JDBC ends up as a very simple, single-fragment, coordinator-only plan, + // which has no ability to recover from errors. This test simply verifies that's still the case. + Optional setupQuery = Optional.of("CREATE TABLE AS SELECT * FROM orders"); + String testQuery = "UPDATE
SET shippriority = 101 WHERE custkey = 1"; + Optional cleanupQuery = Optional.of("DROP TABLE
"); + + assertThatQuery(testQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .isCoordinatorOnly(); + } } diff --git a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/BaseOracleFailureRecoveryTest.java b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/BaseOracleFailureRecoveryTest.java index be0c7ae0ff13..a41f88860013 100644 --- a/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/BaseOracleFailureRecoveryTest.java +++ b/plugin/trino-oracle/src/test/java/io/trino/plugin/oracle/BaseOracleFailureRecoveryTest.java @@ -20,14 +20,17 @@ import io.trino.testing.QueryRunner; import io.trino.testng.services.Flaky; import io.trino.tpch.TpchTable; +import org.testng.SkipException; import org.testng.annotations.Test; import java.util.List; import java.util.Map; +import java.util.Optional; import static io.trino.plugin.oracle.OracleQueryRunner.createOracleQueryRunner; import static io.trino.plugin.oracle.TestingOracleServer.TEST_PASS; import static io.trino.plugin.oracle.TestingOracleServer.TEST_USER; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public abstract class BaseOracleFailureRecoveryTest extends BaseJdbcFailureRecoveryTest @@ -69,4 +72,26 @@ public void testParallel(Runnable runnable) { super.testParallel(runnable); } + + @Override + protected void testUpdateWithSubquery() + { + assertThatThrownBy(super::testUpdateWithSubquery).hasMessageContaining("Unexpected Join over for-update table scan"); + throw new SkipException("skipped"); + } + + @Override + protected void testUpdate() + { + // This simple update on JDBC ends up as a very simple, single-fragment, coordinator-only plan, + // which has no ability to recover from errors. This test simply verifies that's still the case. + Optional setupQuery = Optional.of("CREATE TABLE
AS SELECT * FROM orders"); + String testQuery = "UPDATE
SET shippriority = 101 WHERE custkey = 1"; + Optional cleanupQuery = Optional.of("DROP TABLE
"); + + assertThatQuery(testQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .isCoordinatorOnly(); + } } diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClient.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClient.java index b65a51aeeb25..9a381e6b1043 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClient.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixClient.java @@ -117,6 +117,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.Set; import java.util.StringJoiner; import java.util.function.BiFunction; @@ -174,6 +175,7 @@ import static io.trino.plugin.phoenix5.TypeUtils.toBoxedArray; import static io.trino.spi.StandardErrorCode.ALREADY_EXISTS; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.connector.ConnectorMetadata.MODIFYING_ROWS_MESSAGE; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateType.DATE; @@ -359,6 +361,12 @@ public boolean isTopNGuaranteed(ConnectorSession session) return false; } + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + throw new TrinoException(NOT_SUPPORTED, MODIFYING_ROWS_MESSAGE); + } + @Override protected Optional> limitFunction() { @@ -944,7 +952,8 @@ public JdbcTableHandle updatedScanColumnTable(ConnectorSession session, Connecto Optional.of(getUpdatedScanColumnHandles(session, tableHandle, scanColumnHandles, mergeRowIdColumnHandle)), tableHandle.getOtherReferencedTables(), tableHandle.getNextSyntheticColumnId(), - tableHandle.getAuthorization()); + tableHandle.getAuthorization(), + tableHandle.getUpdateAssignments()); } private List getUpdatedScanColumnHandles(ConnectorSession session, JdbcTableHandle tableHandle, List scanColumnHandles, JdbcColumnHandle mergeRowIdColumnHandle) diff --git a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java index 03d9d2334318..c3be6ecaaadc 100644 --- a/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java +++ b/plugin/trino-phoenix5/src/main/java/io/trino/plugin/phoenix5/PhoenixMetadata.java @@ -45,6 +45,7 @@ import io.trino.spi.connector.RowChangeParadigm; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SortingProperty; +import io.trino.spi.expression.Constant; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.security.TrinoPrincipal; import io.trino.spi.statistics.ComputedStatistics; @@ -186,6 +187,13 @@ private String toRemoteSchemaName(ConnectorSession session, String schemaName) } } + @Override + public Optional applyUpdate(ConnectorSession session, ConnectorTableHandle handle, Map assignments) + { + // Phoenix support row level update, so we should reject this path, earlier than in JDBC client + return Optional.empty(); + } + @Override public void createTable(ConnectorSession session, ConnectorTableMetadata tableMetadata, boolean ignoreExisting) { diff --git a/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java b/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java index 65083dd9342f..2c5e1dcab91c 100644 --- a/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java +++ b/plugin/trino-phoenix5/src/test/java/io/trino/plugin/phoenix5/TestPhoenixConnectorTest.java @@ -85,6 +85,7 @@ protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) return switch (connectorBehavior) { case SUPPORTS_MERGE, SUPPORTS_PREDICATE_ARITHMETIC_EXPRESSION_PUSHDOWN, + SUPPORTS_ROW_LEVEL_UPDATE, SUPPORTS_UPDATE -> true; case SUPPORTS_ADD_COLUMN_WITH_COMMENT, SUPPORTS_AGGREGATION_PUSHDOWN, diff --git a/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClient.java b/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClient.java index 3925b52642e4..ed0673f02aba 100644 --- a/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClient.java +++ b/plugin/trino-postgresql/src/main/java/io/trino/plugin/postgresql/PostgreSqlClient.java @@ -866,6 +866,7 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) checkArgument(handle.isNamedRelation(), "Unable to delete from synthetic table: %s", handle); checkArgument(handle.getLimit().isEmpty(), "Unable to delete when limit is set: %s", handle); checkArgument(handle.getSortOrder().isEmpty(), "Unable to delete when sort order is set: %s", handle); + checkArgument(handle.getUpdateAssignments().isEmpty(), "Unable to delete when update assignments are set: %s", handle); try (Connection connection = connectionFactory.openConnection(session)) { verify(connection.getAutoCommit()); PreparedQuery preparedQuery = queryBuilder.prepareDeleteQuery( @@ -887,6 +888,35 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) } } + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + checkArgument(handle.isNamedRelation(), "Unable to update from synthetic table: %s", handle); + checkArgument(handle.getLimit().isEmpty(), "Unable to update when limit is set: %s", handle); + checkArgument(handle.getSortOrder().isEmpty(), "Unable to update when sort order is set: %s", handle); + checkArgument(!handle.getUpdateAssignments().isEmpty(), "Unable to update when update assignments are not set: %s", handle); + try (Connection connection = connectionFactory.openConnection(session)) { + verify(connection.getAutoCommit()); + PreparedQuery preparedQuery = queryBuilder.prepareUpdateQuery( + this, + session, + connection, + handle.getRequiredNamedRelation(), + handle.getConstraint(), + getAdditionalPredicate(handle.getConstraintExpressions(), Optional.empty()), + handle.getUpdateAssignments()); + try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery, Optional.empty())) { + int affectedRows = preparedStatement.executeUpdate(); + // In getPreparedStatement we set autocommit to false so here we need an explicit commit + connection.commit(); + return OptionalLong.of(affectedRows); + } + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + @Override public TableStatistics getTableStatistics(ConnectorSession session, JdbcTableHandle handle) { diff --git a/plugin/trino-postgresql/src/test/java/io/trino/plugin/postgresql/BasePostgresFailureRecoveryTest.java b/plugin/trino-postgresql/src/test/java/io/trino/plugin/postgresql/BasePostgresFailureRecoveryTest.java index 82864489a332..8e83d29cfae3 100644 --- a/plugin/trino-postgresql/src/test/java/io/trino/plugin/postgresql/BasePostgresFailureRecoveryTest.java +++ b/plugin/trino-postgresql/src/test/java/io/trino/plugin/postgresql/BasePostgresFailureRecoveryTest.java @@ -19,11 +19,14 @@ import io.trino.plugin.jdbc.BaseJdbcFailureRecoveryTest; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; +import org.testng.SkipException; import java.util.List; import java.util.Map; +import java.util.Optional; import static io.trino.plugin.postgresql.PostgreSqlQueryRunner.createPostgreSqlQueryRunner; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; public abstract class BasePostgresFailureRecoveryTest extends BaseJdbcFailureRecoveryTest @@ -52,4 +55,26 @@ protected QueryRunner createQueryRunner( "exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager")); }); } + + @Override + protected void testUpdateWithSubquery() + { + assertThatThrownBy(super::testUpdateWithSubquery).hasMessageContaining("Unexpected Join over for-update table scan"); + throw new SkipException("skipped"); + } + + @Override + protected void testUpdate() + { + // This simple update on JDBC ends up as a very simple, single-fragment, coordinator-only plan, + // which has no ability to recover from errors. This test simply verifies that's still the case. + Optional setupQuery = Optional.of("CREATE TABLE
AS SELECT * FROM orders"); + String testQuery = "UPDATE
SET shippriority = 101 WHERE custkey = 1"; + Optional cleanupQuery = Optional.of("DROP TABLE
"); + + assertThatQuery(testQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .isCoordinatorOnly(); + } } diff --git a/plugin/trino-postgresql/src/test/java/io/trino/plugin/postgresql/TestPostgreSqlJdbcConnectionCreation.java b/plugin/trino-postgresql/src/test/java/io/trino/plugin/postgresql/TestPostgreSqlJdbcConnectionCreation.java index fb0803c9b185..fc5c1b9739b9 100644 --- a/plugin/trino-postgresql/src/test/java/io/trino/plugin/postgresql/TestPostgreSqlJdbcConnectionCreation.java +++ b/plugin/trino-postgresql/src/test/java/io/trino/plugin/postgresql/TestPostgreSqlJdbcConnectionCreation.java @@ -92,7 +92,7 @@ public Object[][] testCases() {"CREATE TABLE copy_of_nation AS SELECT * FROM nation", 6, Optional.empty()}, {"INSERT INTO copy_of_nation SELECT * FROM nation", 6, Optional.empty()}, {"DELETE FROM copy_of_nation WHERE nationkey = 3", 1, Optional.empty()}, - {"UPDATE copy_of_nation SET name = 'POLAND' WHERE nationkey = 1", 1, Optional.of(MODIFYING_ROWS_MESSAGE)}, + {"UPDATE copy_of_nation SET name = 'POLAND' WHERE nationkey = 1", 1, Optional.empty()}, {"MERGE INTO copy_of_nation n USING region r ON r.regionkey= n.regionkey WHEN MATCHED THEN DELETE", 1, Optional.of(MODIFYING_ROWS_MESSAGE)}, {"DROP TABLE copy_of_nation", 1, Optional.empty()}, {"SHOW SCHEMAS", 1, Optional.empty()}, diff --git a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClient.java b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClient.java index 469779500bdf..67184ff6604c 100644 --- a/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClient.java +++ b/plugin/trino-redshift/src/main/java/io/trino/plugin/redshift/RedshiftClient.java @@ -462,6 +462,7 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) checkArgument(handle.isNamedRelation(), "Unable to delete from synthetic table: %s", handle); checkArgument(handle.getLimit().isEmpty(), "Unable to delete when limit is set: %s", handle); checkArgument(handle.getSortOrder().isEmpty(), "Unable to delete when sort order is set: %s", handle); + checkArgument(handle.getUpdateAssignments().isEmpty(), "Unable to delete when update assignments are set: %s", handle); try (Connection connection = connectionFactory.openConnection(session)) { verify(connection.getAutoCommit()); PreparedQuery preparedQuery = queryBuilder.prepareDeleteQuery(this, session, connection, handle.getRequiredNamedRelation(), handle.getConstraint(), Optional.empty()); @@ -477,6 +478,35 @@ public OptionalLong delete(ConnectorSession session, JdbcTableHandle handle) } } + @Override + public OptionalLong update(ConnectorSession session, JdbcTableHandle handle) + { + checkArgument(handle.isNamedRelation(), "Unable to update from synthetic table: %s", handle); + checkArgument(handle.getLimit().isEmpty(), "Unable to update when limit is set: %s", handle); + checkArgument(handle.getSortOrder().isEmpty(), "Unable to update when sort order is set: %s", handle); + checkArgument(!handle.getUpdateAssignments().isEmpty(), "Unable to update when update assignments are not set: %s", handle); + try (Connection connection = connectionFactory.openConnection(session)) { + verify(connection.getAutoCommit()); + PreparedQuery preparedQuery = queryBuilder.prepareUpdateQuery( + this, + session, + connection, + handle.getRequiredNamedRelation(), + handle.getConstraint(), + getAdditionalPredicate(handle.getConstraintExpressions(), Optional.empty()), + handle.getUpdateAssignments()); + try (PreparedStatement preparedStatement = queryBuilder.prepareStatement(this, session, connection, preparedQuery, Optional.empty())) { + int affectedRows = preparedStatement.executeUpdate(); + // connection.getAutoCommit() == true is not enough to make UPDATE effective and explicit commit is required + connection.commit(); + return OptionalLong.of(affectedRows); + } + } + catch (SQLException e) { + throw new TrinoException(JDBC_ERROR, e); + } + } + @Override protected void addColumn(ConnectorSession session, Connection connection, RemoteTableName table, ColumnMetadata column) throws SQLException diff --git a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/BaseRedshiftFailureRecoveryTest.java b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/BaseRedshiftFailureRecoveryTest.java index 5a28c861c13e..9b27092d7f12 100644 --- a/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/BaseRedshiftFailureRecoveryTest.java +++ b/plugin/trino-redshift/src/test/java/io/trino/plugin/redshift/BaseRedshiftFailureRecoveryTest.java @@ -19,11 +19,14 @@ import io.trino.plugin.jdbc.BaseJdbcFailureRecoveryTest; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; +import org.testng.SkipException; import java.util.List; import java.util.Map; +import java.util.Optional; import static io.trino.plugin.redshift.RedshiftQueryRunner.createRedshiftQueryRunner; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy; public abstract class BaseRedshiftFailureRecoveryTest extends BaseJdbcFailureRecoveryTest @@ -51,4 +54,26 @@ protected QueryRunner createQueryRunner( "exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager")); }); } + + @Override + protected void testUpdateWithSubquery() + { + assertThatThrownBy(super::testUpdateWithSubquery).hasMessageContaining("Unexpected Join over for-update table scan"); + throw new SkipException("skipped"); + } + + @Override + protected void testUpdate() + { + // This simple update on JDBC ends up as a very simple, single-fragment, coordinator-only plan, + // which has no ability to recover from errors. This test simply verifies that's still the case. + Optional setupQuery = Optional.of("CREATE TABLE
AS SELECT * FROM orders"); + String testQuery = "UPDATE
SET shippriority = 101 WHERE custkey = 1"; + Optional cleanupQuery = Optional.of("DROP TABLE
"); + + assertThatQuery(testQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .isCoordinatorOnly(); + } } diff --git a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java index 2efdff76eeee..aedbd809daef 100644 --- a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java +++ b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerConnectorTest.java @@ -852,6 +852,17 @@ THEN INSERT(id) VALUES(SOURCE.id) } } + @Test + @Override + public void testConstantUpdateWithVarcharInequalityPredicates() + { + // Sql Server supports push down predicate for not equal operator + try (TestTable table = new TestTable(getQueryRunner()::execute, "test_update_varchar", "(col1 INT, col2 varchar(1))", ImmutableList.of("1, 'a'", "2, 'A'"))) { + assertUpdate("UPDATE " + table.getName() + " SET col1 = 20 WHERE col2 != 'A'", 1); + assertQuery("SELECT * FROM " + table.getName(), "VALUES (20, 'a'), (2, 'A')"); + } + } + private TestProcedure createTestingProcedure(String baseQuery) { return createTestingProcedure("", baseQuery); diff --git a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerFailureRecoveryTest.java b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerFailureRecoveryTest.java index 7e5214bb05d8..11c5d85614bd 100644 --- a/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerFailureRecoveryTest.java +++ b/plugin/trino-sqlserver/src/test/java/io/trino/plugin/sqlserver/BaseSqlServerFailureRecoveryTest.java @@ -19,11 +19,14 @@ import io.trino.plugin.jdbc.BaseJdbcFailureRecoveryTest; import io.trino.testing.QueryRunner; import io.trino.tpch.TpchTable; +import org.testng.SkipException; import java.util.List; import java.util.Map; +import java.util.Optional; import static io.trino.plugin.sqlserver.SqlServerQueryRunner.createSqlServerQueryRunner; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public abstract class BaseSqlServerFailureRecoveryTest extends BaseJdbcFailureRecoveryTest @@ -52,4 +55,26 @@ protected QueryRunner createQueryRunner( "exchange.base-directories", System.getProperty("java.io.tmpdir") + "/trino-local-file-system-exchange-manager")); }); } + + @Override + protected void testUpdateWithSubquery() + { + assertThatThrownBy(super::testUpdateWithSubquery).hasMessageContaining("Unexpected Join over for-update table scan"); + throw new SkipException("skipped"); + } + + @Override + protected void testUpdate() + { + // This simple update on JDBC ends up as a very simple, single-fragment, coordinator-only plan, + // which has no ability to recover from errors. This test simply verifies that's still the case. + Optional setupQuery = Optional.of("CREATE TABLE
AS SELECT * FROM orders"); + String testQuery = "UPDATE
SET shippriority = 101 WHERE custkey = 1"; + Optional cleanupQuery = Optional.of("DROP TABLE
"); + + assertThatQuery(testQuery) + .withSetupQuery(setupQuery) + .withCleanupQuery(cleanupQuery) + .isCoordinatorOnly(); + } }