diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index 39c4456633bd..20b635d545a5 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -20,6 +20,7 @@ import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException; import io.confluent.ksql.GenericRow; import io.confluent.ksql.KsqlExecutionContext; +import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.execution.expression.tree.Expression; import io.confluent.ksql.execution.expression.tree.Literal; import io.confluent.ksql.execution.expression.tree.NullLiteral; @@ -60,10 +61,14 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.acl.AclOperation; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.connect.data.Struct; +// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling public class InsertValuesExecutor { + // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling private static final Duration MAX_SEND_TIMEOUT = Duration.ofSeconds(5); @@ -163,6 +168,17 @@ public void execute( ); producer.sendRecord(record, serviceContext, config.getProducerClientConfigProps()); + } catch (final TopicAuthorizationException e) { + // TopicAuthorizationException does not give much detailed information about why it failed, + // except which topics are denied. Here we just add the ACL to make the error message + // consistent with other authorization error messages. + final Exception rootCause = new KsqlTopicAuthorizationException( + AclOperation.WRITE, + e.unauthorizedTopics() + ); + + throw new KsqlException("Failed to insert values into stream/table: " + + insertValues.getTarget().getSuffix(), rootCause); } catch (final Exception e) { throw new KsqlException("Failed to insert values into stream/table: " + insertValues.getTarget().getSuffix(), e); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java index 733bb810af47..d538211ac52f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java @@ -19,6 +19,7 @@ import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -63,6 +64,7 @@ import java.math.BigDecimal; import java.math.MathContext; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Optional; import java.util.Set; @@ -74,6 +76,7 @@ import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.connect.data.Schema; @@ -515,6 +518,30 @@ public void shouldThrowOnSerializingValueError() { executor.execute(statement, engine, serviceContext); } + @Test + public void shouldThrowOnTopicAuthorizationException() { + // Given: + final ConfiguredStatement statement = givenInsertValues( + allFieldNames(SCHEMA), + ImmutableList.of( + new LongLiteral(1L), + new StringLiteral("str"), + new StringLiteral("str"), + new LongLiteral(2L)) + ); + doThrow(new TopicAuthorizationException(Collections.singleton("t1"))) + .when(producer).send(any()); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectCause(hasMessage( + containsString("Authorization denied to Write on topic(s): [t1]")) + ); + + // When: + executor.execute(statement, engine, serviceContext); + } + @Test public void shouldThrowIfRowKeyAndKeyDoNotMatch() { // Given: