Skip to content

Commit

Permalink
fix: fix auth error message with insert values command (#3257)
Browse files Browse the repository at this point in the history
  • Loading branch information
spena authored Aug 22, 2019
1 parent 4b94f22 commit abe410a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.exception.KsqlTopicAuthorizationException;
import io.confluent.ksql.execution.expression.tree.Expression;
import io.confluent.ksql.execution.expression.tree.Literal;
import io.confluent.ksql.execution.expression.tree.NullLiteral;
Expand Down Expand Up @@ -60,10 +61,14 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.connect.data.Struct;

// CHECKSTYLE_RULES.OFF: ClassDataAbstractionCoupling
public class InsertValuesExecutor {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private static final Duration MAX_SEND_TIMEOUT = Duration.ofSeconds(5);

Expand Down Expand Up @@ -163,6 +168,17 @@ public void execute(
);

producer.sendRecord(record, serviceContext, config.getProducerClientConfigProps());
} catch (final TopicAuthorizationException e) {
// TopicAuthorizationException does not give much detailed information about why it failed,
// except which topics are denied. Here we just add the ACL to make the error message
// consistent with other authorization error messages.
final Exception rootCause = new KsqlTopicAuthorizationException(
AclOperation.WRITE,
e.unauthorizedTopics()
);

throw new KsqlException("Failed to insert values into stream/table: "
+ insertValues.getTarget().getSuffix(), rootCause);
} catch (final Exception e) {
throw new KsqlException("Failed to insert values into stream/table: "
+ insertValues.getTarget().getSuffix(), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static org.junit.internal.matchers.ThrowableMessageMatcher.hasMessage;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -63,6 +64,7 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -74,6 +76,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -515,6 +518,30 @@ public void shouldThrowOnSerializingValueError() {
executor.execute(statement, engine, serviceContext);
}

@Test
public void shouldThrowOnTopicAuthorizationException() {
// Given:
final ConfiguredStatement<InsertValues> statement = givenInsertValues(
allFieldNames(SCHEMA),
ImmutableList.of(
new LongLiteral(1L),
new StringLiteral("str"),
new StringLiteral("str"),
new LongLiteral(2L))
);
doThrow(new TopicAuthorizationException(Collections.singleton("t1")))
.when(producer).send(any());

// Expect:
expectedException.expect(KsqlException.class);
expectedException.expectCause(hasMessage(
containsString("Authorization denied to Write on topic(s): [t1]"))
);

// When:
executor.execute(statement, engine, serviceContext);
}

@Test
public void shouldThrowIfRowKeyAndKeyDoNotMatch() {
// Given:
Expand Down

0 comments on commit abe410a

Please sign in to comment.