From cb52c0648d1549c67ff8b3df8204567dcde6a489 Mon Sep 17 00:00:00 2001 From: Roman Zabaluev Date: Mon, 10 Feb 2025 14:27:39 +0400 Subject: [PATCH 1/6] Update FUNDING.yml Add open collective --- .github/FUNDING.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/FUNDING.yml b/.github/FUNDING.yml index d8a0e0375..e450aaedd 100644 --- a/.github/FUNDING.yml +++ b/.github/FUNDING.yml @@ -1 +1,2 @@ github: [kafbat] +open_collective: kafka-ui From 844fbc908c9c40ba5bf5e7a646f543f6a8986c0a Mon Sep 17 00:00:00 2001 From: SIX Douglas Date: Mon, 10 Feb 2025 11:45:26 +0100 Subject: [PATCH 2/6] BE: Clean some Sonar issues (#832) --- api/pom.xml | 2 +- .../io/kafbat/ui/exception/CelException.java | 5 +- .../GlobalErrorWebExceptionHandler.java | 16 +++--- .../ui/model/rbac/provider/Provider.java | 8 +-- .../kafbat/ui/serdes/builtin/Base64Serde.java | 2 +- .../io/kafbat/ui/serdes/builtin/HexSerde.java | 2 +- .../ui/serdes/builtin/ProtobufFileSerde.java | 4 +- .../kafbat/ui/service/audit/AuditRecord.java | 14 +++--- .../integration/odd/ConnectorInfo.java | 16 +++--- .../odd/schema/JsonSchemaExtractor.java | 30 ++++------- .../kafbat/ui/service/ksql/KsqlGrammar.java | 11 ++-- .../service/metrics/JmxMetricsFormatter.java | 8 +-- .../jsonschema/AvroJsonSchemaConverter.java | 3 +- .../kafbat/ui/KafkaConnectServiceTests.java | 50 +++++++++---------- .../io/kafbat/ui/KafkaConsumerGroupTests.java | 2 +- .../java/io/kafbat/ui/KafkaConsumerTests.java | 36 ++++++------- .../io/kafbat/ui/KafkaTopicCreateTests.java | 4 +- .../java/io/kafbat/ui/ReadOnlyModeTests.java | 10 ++-- .../kafbat/ui/SchemaRegistryServiceTests.java | 14 +++--- .../AzureEntraLoginCallbackHandlerTest.java | 18 +++---- .../azure/AzureEntraOAuthBearerTokenTest.java | 2 +- .../ApplicationConfigControllerTest.java | 2 +- .../kafbat/ui/emitter/MessageFiltersTest.java | 2 +- .../ui/serdes/builtin/HexSerdeTest.java | 2 +- .../serdes/builtin/ProtobufRawSerdeTest.java | 2 +- .../service/ApplicationInfoServiceTest.java | 4 +- .../java/io/kafbat/ui/service/ConfigTest.java | 6 +-- .../io/kafbat/ui/service/LogDirsTest.java | 22 ++++---- .../ui/service/OffsetsResetServiceTest.java | 2 +- .../service/SchemaRegistryPaginationTest.java | 4 +- .../kafbat/ui/service/SendAndReadTests.java | 22 ++++---- .../service/TopicsServicePaginationTest.java | 18 +++---- .../service/audit/AuditIntegrationTest.java | 2 +- .../ui/service/masking/DataMaskingTest.java | 11 ++-- 34 files changed, 172 insertions(+), 184 deletions(-) diff --git a/api/pom.xml b/api/pom.xml index bbeb9dff8..dd2c3a378 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -329,7 +329,7 @@ org.apache.maven.plugins maven-surefire-plugin - @{argLine} --illegal-access=permit + @{argLine} diff --git a/api/src/main/java/io/kafbat/ui/exception/CelException.java b/api/src/main/java/io/kafbat/ui/exception/CelException.java index e904368f6..d71d0da20 100644 --- a/api/src/main/java/io/kafbat/ui/exception/CelException.java +++ b/api/src/main/java/io/kafbat/ui/exception/CelException.java @@ -1,7 +1,10 @@ package io.kafbat.ui.exception; +import lombok.Getter; + +@Getter public class CelException extends CustomBaseException { - private String celOriginalExpression; + private final String celOriginalExpression; public CelException(String celOriginalExpression, String errorMessage) { super("CEL error. Original expression: %s. Error message: %s".formatted(celOriginalExpression, errorMessage)); diff --git a/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java b/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java index 61236f801..e6c0c76a5 100644 --- a/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java +++ b/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java @@ -54,18 +54,18 @@ private Mono renderErrorResponse(ServerRequest request) { Throwable throwable = getError(request); // validation and params binding errors - if (throwable instanceof WebExchangeBindException) { - return render((WebExchangeBindException) throwable, request); + if (throwable instanceof WebExchangeBindException webExchangeBindException) { + return render(webExchangeBindException, request); } // requests mapping & access errors - if (throwable instanceof ResponseStatusException) { - return render((ResponseStatusException) throwable, request); + if (throwable instanceof ResponseStatusException responseStatusException) { + return render(responseStatusException, request); } // custom exceptions - if (throwable instanceof CustomBaseException) { - return render((CustomBaseException) throwable, request); + if (throwable instanceof CustomBaseException customBaseException) { + return render(customBaseException, request); } return renderDefault(throwable, request); @@ -151,9 +151,7 @@ private String requestId(ServerRequest request) { } private Consumer headers(ServerRequest request) { - return (HttpHeaders headers) -> { - CorsGlobalConfiguration.fillCorsHeader(headers, request.exchange().getRequest()); - }; + return (HttpHeaders headers) -> CorsGlobalConfiguration.fillCorsHeader(headers, request.exchange().getRequest()); } private BigDecimal currentTimestamp() { diff --git a/api/src/main/java/io/kafbat/ui/model/rbac/provider/Provider.java b/api/src/main/java/io/kafbat/ui/model/rbac/provider/Provider.java index 94b1cd039..3fbae2423 100644 --- a/api/src/main/java/io/kafbat/ui/model/rbac/provider/Provider.java +++ b/api/src/main/java/io/kafbat/ui/model/rbac/provider/Provider.java @@ -21,11 +21,11 @@ public static Provider fromString(String name) { } public static class Name { - public static String GOOGLE = "google"; - public static String GITHUB = "github"; - public static String COGNITO = "cognito"; + public static final String GOOGLE = "google"; + public static final String GITHUB = "github"; + public static final String COGNITO = "cognito"; - public static String OAUTH = "oauth"; + public static final String OAUTH = "oauth"; } } diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/Base64Serde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/Base64Serde.java index 02e56ff22..515354695 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/Base64Serde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/Base64Serde.java @@ -40,7 +40,7 @@ public Serde.Serializer serializer(String topic, Serde.Target type) { return inputString -> { inputString = inputString.trim(); // it is actually a hack to provide ability to sent empty array as a key/value - if (inputString.length() == 0) { + if (inputString.isEmpty()) { return new byte[] {}; } return decoder.decode(inputString); diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/HexSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/HexSerde.java index a3c958a06..ab7f66ebb 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/HexSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/HexSerde.java @@ -62,7 +62,7 @@ public Serializer serializer(String topic, Target type) { return input -> { input = input.trim(); // it is a hack to provide ability to sent empty array as a key/value - if (input.length() == 0) { + if (input.isEmpty()) { return new byte[] {}; } return HexFormat.of().parseHex(prepareInputForParse(input)); diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java index 2c0939c03..20ea47a06 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java @@ -383,9 +383,9 @@ private ProtoFile loadKnownProtoFile(String path, Descriptors.FileDescriptor fil String protoFileString = null; // know type file contains either message or enum if (!fileDescriptor.getMessageTypes().isEmpty()) { - protoFileString = new ProtobufSchema(fileDescriptor.getMessageTypes().get(0)).canonicalString(); + protoFileString = new ProtobufSchema(fileDescriptor.getMessageTypes().getFirst()).canonicalString(); } else if (!fileDescriptor.getEnumTypes().isEmpty()) { - protoFileString = new ProtobufSchema(fileDescriptor.getEnumTypes().get(0)).canonicalString(); + protoFileString = new ProtobufSchema(fileDescriptor.getEnumTypes().getFirst()).canonicalString(); } else { throw new IllegalStateException(); } diff --git a/api/src/main/java/io/kafbat/ui/service/audit/AuditRecord.java b/api/src/main/java/io/kafbat/ui/service/audit/AuditRecord.java index d7ef659bf..2bb69c20f 100644 --- a/api/src/main/java/io/kafbat/ui/service/audit/AuditRecord.java +++ b/api/src/main/java/io/kafbat/ui/service/audit/AuditRecord.java @@ -53,14 +53,12 @@ static OperationResult successful() { } static OperationResult error(Throwable th) { - OperationError err = OperationError.UNRECOGNIZED_ERROR; - if (th instanceof AccessDeniedException) { - err = OperationError.ACCESS_DENIED; - } else if (th instanceof ValidationException) { - err = OperationError.VALIDATION_ERROR; - } else if (th instanceof CustomBaseException) { - err = OperationError.EXECUTION_ERROR; - } + OperationError err = switch (th) { + case AccessDeniedException ignored -> OperationError.ACCESS_DENIED; + case ValidationException ignored -> OperationError.VALIDATION_ERROR; + case CustomBaseException ignored -> OperationError.EXECUTION_ERROR; + case null, default -> OperationError.UNRECOGNIZED_ERROR; + }; return new OperationResult(false, err); } diff --git a/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorInfo.java b/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorInfo.java index c1703f0b1..186c4a8a1 100644 --- a/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorInfo.java +++ b/api/src/main/java/io/kafbat/ui/service/integration/odd/ConnectorInfo.java @@ -5,7 +5,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Function; +import java.util.function.UnaryOperator; import java.util.stream.Stream; import javax.annotation.Nullable; import org.apache.commons.collections.CollectionUtils; @@ -22,7 +22,7 @@ static ConnectorInfo extract(String className, ConnectorTypeDTO type, Map config, List topicsFromApi, // can be empty for old Connect API versions - Function topicOddrnBuilder) { + UnaryOperator topicOddrnBuilder) { return switch (className) { case "org.apache.kafka.connect.file.FileStreamSinkConnector", "org.apache.kafka.connect.file.FileStreamSourceConnector", @@ -43,7 +43,7 @@ static ConnectorInfo extract(String className, private static ConnectorInfo extractFileIoConnector(ConnectorTypeDTO type, List topics, Map config, - Function topicOddrnBuilder) { + UnaryOperator topicOddrnBuilder) { return new ConnectorInfo( extractInputs(type, topics, config, topicOddrnBuilder), extractOutputs(type, topics, config, topicOddrnBuilder) @@ -53,7 +53,7 @@ private static ConnectorInfo extractFileIoConnector(ConnectorTypeDTO type, private static ConnectorInfo extractJdbcSink(ConnectorTypeDTO type, List topics, Map config, - Function topicOddrnBuilder) { + UnaryOperator topicOddrnBuilder) { String tableNameFormat = (String) config.getOrDefault("table.name.format", "${topic}"); List targetTables = extractTopicNamesBestEffort(topics, config) .map(topic -> tableNameFormat.replace("${kafka}", topic)) @@ -106,7 +106,7 @@ private static ConnectorInfo extractDebeziumMysql(Map config) { private static ConnectorInfo extractS3Sink(ConnectorTypeDTO type, List topics, Map config, - Function topicOrrdnBuilder) { + UnaryOperator topicOrrdnBuilder) { String bucketName = (String) config.get("s3.bucket.name"); String topicsDir = (String) config.getOrDefault("topics.dir", "topics"); String directoryDelim = (String) config.getOrDefault("directory.delim", "/"); @@ -122,7 +122,7 @@ private static ConnectorInfo extractS3Sink(ConnectorTypeDTO type, private static List extractInputs(ConnectorTypeDTO type, List topicsFromApi, Map config, - Function topicOrrdnBuilder) { + UnaryOperator topicOrrdnBuilder) { return type == ConnectorTypeDTO.SINK ? extractTopicsOddrns(config, topicsFromApi, topicOrrdnBuilder) : List.of(); @@ -131,7 +131,7 @@ private static List extractInputs(ConnectorTypeDTO type, private static List extractOutputs(ConnectorTypeDTO type, List topicsFromApi, Map config, - Function topicOrrdnBuilder) { + UnaryOperator topicOrrdnBuilder) { return type == ConnectorTypeDTO.SOURCE ? extractTopicsOddrns(config, topicsFromApi, topicOrrdnBuilder) : List.of(); @@ -158,7 +158,7 @@ private static Stream extractTopicNamesBestEffort( private static List extractTopicsOddrns(Map config, List topicsFromApi, - Function topicOrrdnBuilder) { + UnaryOperator topicOrrdnBuilder) { return extractTopicNamesBestEffort(topicsFromApi, config) .map(topicOrrdnBuilder) .toList(); diff --git a/api/src/main/java/io/kafbat/ui/service/integration/odd/schema/JsonSchemaExtractor.java b/api/src/main/java/io/kafbat/ui/service/integration/odd/schema/JsonSchemaExtractor.java index 328fcda26..aadf93d7c 100644 --- a/api/src/main/java/io/kafbat/ui/service/integration/odd/schema/JsonSchemaExtractor.java +++ b/api/src/main/java/io/kafbat/ui/service/integration/odd/schema/JsonSchemaExtractor.java @@ -286,25 +286,17 @@ private static DataSetField createDataSetField(Schema schema, } private static DataSetFieldType.TypeEnum mapType(Schema type) { - if (type instanceof NumberSchema) { - return DataSetFieldType.TypeEnum.NUMBER; - } - if (type instanceof StringSchema) { - return DataSetFieldType.TypeEnum.STRING; - } - if (type instanceof BooleanSchema || type instanceof TrueSchema || type instanceof FalseSchema) { - return DataSetFieldType.TypeEnum.BOOLEAN; - } - if (type instanceof ObjectSchema) { - return DataSetFieldType.TypeEnum.STRUCT; - } - if (type instanceof ReferenceSchema s) { - return mapType(s.getReferredSchema()); - } - if (type instanceof CombinedSchema) { - return DataSetFieldType.TypeEnum.UNION; - } - return DataSetFieldType.TypeEnum.UNKNOWN; + return switch (type) { + case NumberSchema ignored -> DataSetFieldType.TypeEnum.NUMBER; + case StringSchema ignored -> DataSetFieldType.TypeEnum.STRING; + case BooleanSchema ignored -> DataSetFieldType.TypeEnum.BOOLEAN; + case TrueSchema ignored -> DataSetFieldType.TypeEnum.BOOLEAN; + case FalseSchema ignored -> DataSetFieldType.TypeEnum.BOOLEAN; + case ObjectSchema ignored -> DataSetFieldType.TypeEnum.STRUCT; + case ReferenceSchema referenceSchema -> mapType(referenceSchema.getReferredSchema()); + case CombinedSchema ignored -> DataSetFieldType.TypeEnum.UNION; + default -> DataSetFieldType.TypeEnum.UNKNOWN; + }; } } diff --git a/api/src/main/java/io/kafbat/ui/service/ksql/KsqlGrammar.java b/api/src/main/java/io/kafbat/ui/service/ksql/KsqlGrammar.java index 3243841eb..1068ac193 100644 --- a/api/src/main/java/io/kafbat/ui/service/ksql/KsqlGrammar.java +++ b/api/src/main/java/io/kafbat/ui/service/ksql/KsqlGrammar.java @@ -74,13 +74,10 @@ public static CaseInsensitiveStream from(CharStream stream) { @Override public int LA(final int i) { final int result = stream.LA(i); - switch (result) { - case 0: - case IntStream.EOF: - return result; - default: - return Character.toUpperCase(result); - } + return switch (result) { + case 0, IntStream.EOF -> result; + default -> Character.toUpperCase(result); + }; } }; } diff --git a/api/src/main/java/io/kafbat/ui/service/metrics/JmxMetricsFormatter.java b/api/src/main/java/io/kafbat/ui/service/metrics/JmxMetricsFormatter.java index e4eeed278..37323c7dd 100644 --- a/api/src/main/java/io/kafbat/ui/service/metrics/JmxMetricsFormatter.java +++ b/api/src/main/java/io/kafbat/ui/service/metrics/JmxMetricsFormatter.java @@ -51,10 +51,10 @@ private static Optional convertNumericValue(Object value) { return Optional.empty(); } try { - if (value instanceof Long) { - return Optional.of(new BigDecimal((Long) value)); - } else if (value instanceof Integer) { - return Optional.of(new BigDecimal((Integer) value)); + if (value instanceof Long longValue) { + return Optional.of(new BigDecimal(longValue)); + } else if (value instanceof Integer integerValue) { + return Optional.of(new BigDecimal(integerValue)); } return Optional.of(new BigDecimal(value.toString())); } catch (NumberFormatException nfe) { diff --git a/api/src/main/java/io/kafbat/ui/util/jsonschema/AvroJsonSchemaConverter.java b/api/src/main/java/io/kafbat/ui/util/jsonschema/AvroJsonSchemaConverter.java index b5640eb06..4756c36b0 100644 --- a/api/src/main/java/io/kafbat/ui/util/jsonschema/AvroJsonSchemaConverter.java +++ b/api/src/main/java/io/kafbat/ui/util/jsonschema/AvroJsonSchemaConverter.java @@ -153,12 +153,11 @@ private JsonType convertType(Schema schema) { case INT, LONG -> new SimpleJsonType(JsonType.Type.INTEGER); case MAP, RECORD -> new SimpleJsonType(JsonType.Type.OBJECT); case ENUM -> new EnumJsonType(schema.getEnumSymbols()); - case BYTES, STRING -> new SimpleJsonType(JsonType.Type.STRING); case NULL -> new SimpleJsonType(JsonType.Type.NULL); case ARRAY -> new SimpleJsonType(JsonType.Type.ARRAY); case FIXED, FLOAT, DOUBLE -> new SimpleJsonType(JsonType.Type.NUMBER); case BOOLEAN -> new SimpleJsonType(JsonType.Type.BOOLEAN); - default -> new SimpleJsonType(JsonType.Type.STRING); + default -> new SimpleJsonType(JsonType.Type.STRING); // BYTES, STRING and the remaining possibilities }; } } diff --git a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java index c5fbb14b4..82d7a3041 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConnectServiceTests.java @@ -24,12 +24,10 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.ParameterizedTypeReference; -import org.springframework.http.HttpStatus; -import org.springframework.test.web.reactive.server.ExchangeResult; import org.springframework.test.web.reactive.server.WebTestClient; @Slf4j -public class KafkaConnectServiceTests extends AbstractIntegrationTest { +class KafkaConnectServiceTests extends AbstractIntegrationTest { private final String connectName = "kafka-connect"; private final String connectorName = UUID.randomUUID().toString(); private final Map config = Map.of( @@ -46,7 +44,7 @@ public class KafkaConnectServiceTests extends AbstractIntegrationTest { @BeforeEach - public void setUp() { + void setUp() { webTestClient.post() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName) @@ -64,7 +62,7 @@ public void setUp() { } @AfterEach - public void tearDown() { + void tearDown() { webTestClient.delete() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", LOCAL, connectName, connectorName) @@ -73,7 +71,7 @@ public void tearDown() { } @Test - public void shouldListAllConnectors() { + void shouldListAllConnectors() { webTestClient.get() .uri("/api/clusters/{clusterName}/connectors", LOCAL) .exchange() @@ -84,7 +82,7 @@ public void shouldListAllConnectors() { } @Test - public void shouldFilterByNameConnectors() { + void shouldFilterByNameConnectors() { webTestClient.get() .uri( "/api/clusters/{clusterName}/connectors?search={search}", @@ -98,7 +96,7 @@ public void shouldFilterByNameConnectors() { } @Test - public void shouldFilterByStatusConnectors() { + void shouldFilterByStatusConnectors() { webTestClient.get() .uri( "/api/clusters/{clusterName}/connectors?search={search}", @@ -112,7 +110,7 @@ public void shouldFilterByStatusConnectors() { } @Test - public void shouldFilterByTypeConnectors() { + void shouldFilterByTypeConnectors() { webTestClient.get() .uri( "/api/clusters/{clusterName}/connectors?search={search}", @@ -126,7 +124,7 @@ public void shouldFilterByTypeConnectors() { } @Test - public void shouldNotFilterConnectors() { + void shouldNotFilterConnectors() { webTestClient.get() .uri( "/api/clusters/{clusterName}/connectors?search={search}", @@ -140,7 +138,7 @@ public void shouldNotFilterConnectors() { } @Test - public void shouldListConnectors() { + void shouldListConnectors() { webTestClient.get() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName) .exchange() @@ -150,7 +148,7 @@ public void shouldListConnectors() { } @Test - public void shouldReturnNotFoundForNonExistingCluster() { + void shouldReturnNotFoundForNonExistingCluster() { webTestClient.get() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", "nonExistingCluster", connectName) @@ -159,7 +157,7 @@ public void shouldReturnNotFoundForNonExistingCluster() { } @Test - public void shouldReturnNotFoundForNonExistingConnectName() { + void shouldReturnNotFoundForNonExistingConnectName() { webTestClient.get() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, "nonExistingConnect") @@ -168,7 +166,7 @@ public void shouldReturnNotFoundForNonExistingConnectName() { } @Test - public void shouldRetrieveConnector() { + void shouldRetrieveConnector() { ConnectorDTO expected = new ConnectorDTO() .connect(connectName) .status(new ConnectorStatusDTO() @@ -190,7 +188,7 @@ public void shouldRetrieveConnector() { } @Test - public void shouldUpdateConfig() { + void shouldUpdateConfig() { webTestClient.put() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName) @@ -221,7 +219,7 @@ public void shouldUpdateConfig() { } @Test - public void shouldReturn400WhenConnectReturns400ForInvalidConfigCreate() { + void shouldReturn400WhenConnectReturns400ForInvalidConfigCreate() { var connectorName = UUID.randomUUID().toString(); webTestClient.post() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName) @@ -247,7 +245,7 @@ public void shouldReturn400WhenConnectReturns400ForInvalidConfigCreate() { } @Test - public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() { + void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() { var connectorName = UUID.randomUUID().toString(); webTestClient.post() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName) @@ -272,7 +270,7 @@ public void shouldReturn400WhenConnectReturns500ForInvalidConfigCreate() { @Test @SuppressWarnings("checkstyle:LineLength") - public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() { + void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() { webTestClient.put() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName) @@ -312,7 +310,7 @@ public void shouldReturn400WhenConnectReturns400ForInvalidConfigUpdate() { } @Test - public void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() { + void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() { webTestClient.put() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}/config", LOCAL, connectName, connectorName) @@ -341,7 +339,7 @@ public void shouldReturn400WhenConnectReturns500ForInvalidConfigUpdate() { } @Test - public void shouldRetrieveConnectorPlugins() { + void shouldRetrieveConnectorPlugins() { webTestClient.get() .uri("/api/clusters/{clusterName}/connects/{connectName}/plugins", LOCAL, connectName) .exchange() @@ -351,7 +349,7 @@ public void shouldRetrieveConnectorPlugins() { } @Test - public void shouldSuccessfullyValidateConnectorPluginConfiguration() { + void shouldSuccessfullyValidateConnectorPluginConfiguration() { var pluginName = "FileStreamSinkConnector"; var path = "/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate"; @@ -372,7 +370,7 @@ public void shouldSuccessfullyValidateConnectorPluginConfiguration() { } @Test - public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() { + void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() { var pluginName = "FileStreamSinkConnector"; var path = "/api/clusters/{clusterName}/connects/{connectName}/plugins/{pluginName}/config/validate"; @@ -398,13 +396,13 @@ public void shouldValidateAndReturnErrorsOfConnectorPluginConfiguration() { .findFirst().orElseThrow(); assertEquals( "Invalid value 0 for configuration tasks.max: Value must be at least 1", - error.get(0) + error.getFirst() ); }); } @Test - public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() { + void shouldReturn400WhenTryingToCreateConnectorWithExistingName() { webTestClient.post() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors", LOCAL, connectName) .bodyValue(new NewConnectorDTO() @@ -422,7 +420,7 @@ public void shouldReturn400WhenTryingToCreateConnectorWithExistingName() { } @Test - public void shouldResetConnectorWhenInStoppedState() { + void shouldResetConnectorWhenInStoppedState() { webTestClient.get() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", @@ -455,7 +453,7 @@ public void shouldResetConnectorWhenInStoppedState() { } @Test - public void shouldReturn400WhenResettingConnectorInRunningState() { + void shouldReturn400WhenResettingConnectorInRunningState() { webTestClient.get() .uri("/api/clusters/{clusterName}/connects/{connectName}/connectors/{connectorName}", diff --git a/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java b/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java index b1bf4baa7..c23ea5fb0 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConsumerGroupTests.java @@ -27,7 +27,7 @@ import reactor.core.publisher.Mono; @Slf4j -public class KafkaConsumerGroupTests extends AbstractIntegrationTest { +class KafkaConsumerGroupTests extends AbstractIntegrationTest { @Autowired WebTestClient webTestClient; diff --git a/api/src/test/java/io/kafbat/ui/KafkaConsumerTests.java b/api/src/test/java/io/kafbat/ui/KafkaConsumerTests.java index afe5f2d87..97ab381e6 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaConsumerTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaConsumerTests.java @@ -25,14 +25,14 @@ import reactor.core.publisher.Mono; @Slf4j -public class KafkaConsumerTests extends AbstractIntegrationTest { +class KafkaConsumerTests extends AbstractIntegrationTest { @Autowired private WebTestClient webTestClient; @Test - public void shouldDeleteRecords() { + void shouldDeleteRecords() { var topicName = UUID.randomUUID().toString(); webTestClient.post() .uri("/api/clusters/{clusterName}/topics", LOCAL) @@ -97,7 +97,7 @@ public void shouldDeleteRecords() { } @Test - public void shouldIncreasePartitionsUpTo10() { + void shouldIncreasePartitionsUpTo10() { var topicName = UUID.randomUUID().toString(); webTestClient.post() .uri("/api/clusters/{clusterName}/topics", LOCAL) @@ -144,7 +144,7 @@ public void shouldIncreasePartitionsUpTo10() { } @Test - public void shouldReturn404ForNonExistingTopic() { + void shouldReturn404ForNonExistingTopic() { var topicName = UUID.randomUUID().toString(); webTestClient.delete() @@ -161,7 +161,7 @@ public void shouldReturn404ForNonExistingTopic() { } @Test - public void shouldReturnConfigsForBroker() { + void shouldReturnConfigsForBroker() { List configs = webTestClient.get() .uri("/api/clusters/{clusterName}/brokers/{id}/configs", LOCAL, @@ -175,15 +175,16 @@ public void shouldReturnConfigsForBroker() { Assertions.assertNotNull(configs); Assertions.assertFalse(configs.isEmpty()); - Assertions.assertNotNull(configs.get(0).getName()); - Assertions.assertNotNull(configs.get(0).getIsReadOnly()); - Assertions.assertNotNull(configs.get(0).getIsSensitive()); - Assertions.assertNotNull(configs.get(0).getSource()); - Assertions.assertNotNull(configs.get(0).getSynonyms()); + BrokerConfigDTO brokerConfigDto = configs.getFirst(); + Assertions.assertNotNull(brokerConfigDto.getName()); + Assertions.assertNotNull(brokerConfigDto.getIsReadOnly()); + Assertions.assertNotNull(brokerConfigDto.getIsSensitive()); + Assertions.assertNotNull(brokerConfigDto.getSource()); + Assertions.assertNotNull(brokerConfigDto.getSynonyms()); } @Test - public void shouldReturn404ForNonExistingBroker() { + void shouldReturn404ForNonExistingBroker() { webTestClient.get() .uri("/api/clusters/{clusterName}/brokers/{id}/configs", LOCAL, @@ -194,7 +195,7 @@ public void shouldReturn404ForNonExistingBroker() { } @Test - public void shouldRetrieveTopicConfig() { + void shouldRetrieveTopicConfig() { var topicName = UUID.randomUUID().toString(); webTestClient.post() @@ -220,10 +221,11 @@ public void shouldRetrieveTopicConfig() { Assertions.assertNotNull(configs); Assertions.assertFalse(configs.isEmpty()); - Assertions.assertNotNull(configs.get(0).getName()); - Assertions.assertNotNull(configs.get(0).getIsReadOnly()); - Assertions.assertNotNull(configs.get(0).getIsSensitive()); - Assertions.assertNotNull(configs.get(0).getSource()); - Assertions.assertNotNull(configs.get(0).getSynonyms()); + TopicConfigDTO topicConfigDto = configs.getFirst(); + Assertions.assertNotNull(topicConfigDto.getName()); + Assertions.assertNotNull(topicConfigDto.getIsReadOnly()); + Assertions.assertNotNull(topicConfigDto.getIsSensitive()); + Assertions.assertNotNull(topicConfigDto.getSource()); + Assertions.assertNotNull(topicConfigDto.getSynonyms()); } } diff --git a/api/src/test/java/io/kafbat/ui/KafkaTopicCreateTests.java b/api/src/test/java/io/kafbat/ui/KafkaTopicCreateTests.java index fcc5a96fb..f4c5ca0b8 100644 --- a/api/src/test/java/io/kafbat/ui/KafkaTopicCreateTests.java +++ b/api/src/test/java/io/kafbat/ui/KafkaTopicCreateTests.java @@ -7,13 +7,13 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.web.reactive.server.WebTestClient; -public class KafkaTopicCreateTests extends AbstractIntegrationTest { +class KafkaTopicCreateTests extends AbstractIntegrationTest { @Autowired private WebTestClient webTestClient; private TopicCreationDTO topicCreation; @BeforeEach - public void setUpBefore() { + void setUpBefore() { this.topicCreation = new TopicCreationDTO() .replicationFactor(1) .partitions(3) diff --git a/api/src/test/java/io/kafbat/ui/ReadOnlyModeTests.java b/api/src/test/java/io/kafbat/ui/ReadOnlyModeTests.java index e6525b14a..df59c6c35 100644 --- a/api/src/test/java/io/kafbat/ui/ReadOnlyModeTests.java +++ b/api/src/test/java/io/kafbat/ui/ReadOnlyModeTests.java @@ -9,13 +9,13 @@ import org.springframework.http.HttpStatus; import org.springframework.test.web.reactive.server.WebTestClient; -public class ReadOnlyModeTests extends AbstractIntegrationTest { +class ReadOnlyModeTests extends AbstractIntegrationTest { @Autowired private WebTestClient webTestClient; @Test - public void shouldCreateTopicForNonReadonlyCluster() { + void shouldCreateTopicForNonReadonlyCluster() { var topicName = UUID.randomUUID().toString(); webTestClient.post() .uri("/api/clusters/{clusterName}/topics", LOCAL) @@ -31,7 +31,7 @@ public void shouldCreateTopicForNonReadonlyCluster() { } @Test - public void shouldNotCreateTopicForReadonlyCluster() { + void shouldNotCreateTopicForReadonlyCluster() { var topicName = UUID.randomUUID().toString(); webTestClient.post() .uri("/api/clusters/{clusterName}/topics", SECOND_LOCAL) @@ -47,7 +47,7 @@ public void shouldNotCreateTopicForReadonlyCluster() { } @Test - public void shouldUpdateTopicForNonReadonlyCluster() { + void shouldUpdateTopicForNonReadonlyCluster() { var topicName = UUID.randomUUID().toString(); webTestClient.post() .uri("/api/clusters/{clusterName}/topics", LOCAL) @@ -73,7 +73,7 @@ public void shouldUpdateTopicForNonReadonlyCluster() { } @Test - public void shouldNotUpdateTopicForReadonlyCluster() { + void shouldNotUpdateTopicForReadonlyCluster() { var topicName = UUID.randomUUID().toString(); webTestClient.patch() .uri("/api/clusters/{clusterName}/topics/{topicName}", SECOND_LOCAL, topicName) diff --git a/api/src/test/java/io/kafbat/ui/SchemaRegistryServiceTests.java b/api/src/test/java/io/kafbat/ui/SchemaRegistryServiceTests.java index 2653b7bac..a63a61b60 100644 --- a/api/src/test/java/io/kafbat/ui/SchemaRegistryServiceTests.java +++ b/api/src/test/java/io/kafbat/ui/SchemaRegistryServiceTests.java @@ -32,12 +32,12 @@ class SchemaRegistryServiceTests extends AbstractIntegrationTest { String subject; @BeforeEach - public void setUpBefore() { + void setUpBefore() { this.subject = UUID.randomUUID().toString(); } @Test - public void should404WhenGetAllSchemasForUnknownCluster() { + void should404WhenGetAllSchemasForUnknownCluster() { webTestClient .get() .uri("/api/clusters/unknown-cluster/schemas") @@ -46,7 +46,7 @@ public void should404WhenGetAllSchemasForUnknownCluster() { } @Test - public void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() { + void shouldReturn404WhenGetLatestSchemaByNonExistingSubject() { String unknownSchema = "unknown-schema"; webTestClient .get() @@ -244,7 +244,7 @@ void shouldCreateNewProtobufSchemaWithRefs() { } @Test - public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() { + void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() { webTestClient .get() .uri("/api/clusters/{clusterName}/schemas/compatibility", LOCAL) @@ -260,7 +260,7 @@ public void shouldReturnBackwardAsGlobalCompatibilityLevelByDefault() { } @Test - public void shouldReturnNotEmptyResponseWhenGetAllSchemas() { + void shouldReturnNotEmptyResponseWhenGetAllSchemas() { createNewSubjectAndAssert(subject); webTestClient @@ -287,7 +287,7 @@ public void shouldReturnNotEmptyResponseWhenGetAllSchemas() { } @Test - public void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() { + void shouldOkWhenCreateNewSchemaThenGetAndUpdateItsCompatibilityLevel() { createNewSubjectAndAssert(subject); //Get the created schema and check its items @@ -366,7 +366,7 @@ private void assertSchemaWhenGetLatest( List responseBody = listEntityExchangeResult.getResponseBody(); Assertions.assertNotNull(responseBody); Assertions.assertEquals(1, responseBody.size()); - SchemaSubjectDTO actualSchema = responseBody.get(0); + SchemaSubjectDTO actualSchema = responseBody.getFirst(); Assertions.assertNotNull(actualSchema); Assertions.assertEquals(subject, actualSchema.getSubject()); Assertions.assertEquals("\"string\"", actualSchema.getSchema()); diff --git a/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandlerTest.java b/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandlerTest.java index 16fff06b9..701f7cf3d 100644 --- a/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandlerTest.java +++ b/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraLoginCallbackHandlerTest.java @@ -32,7 +32,7 @@ import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) -public class AzureEntraLoginCallbackHandlerTest { +class AzureEntraLoginCallbackHandlerTest { // These are not real tokens. It was generated using fake values with an invalid signature, // so it is safe to store here. @@ -67,13 +67,13 @@ public class AzureEntraLoginCallbackHandlerTest { private AzureEntraLoginCallbackHandler azureEntraLoginCallbackHandler; @BeforeEach - public void beforeEach() { + void beforeEach() { azureEntraLoginCallbackHandler = new AzureEntraLoginCallbackHandler(); azureEntraLoginCallbackHandler.setTokenCredential(tokenCredential); } @Test - public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest() throws UnsupportedCallbackException { + void shouldProvideTokenToCallbackWithSuccessfulTokenRequest() throws UnsupportedCallbackException { Map configs = Map.of("bootstrap.servers", List.of("test-eh.servicebus.windows.net:9093")); when(tokenCredential.getToken(any(TokenRequestContext.class))).thenReturn(Mono.just(accessToken)); @@ -105,7 +105,7 @@ public void shouldProvideTokenToCallbackWithSuccessfulTokenRequest() throws Unsu } @Test - public void shouldProvideErrorToCallbackWithTokenError() throws UnsupportedCallbackException { + void shouldProvideErrorToCallbackWithTokenError() throws UnsupportedCallbackException { Map configs = Map.of("bootstrap.servers", List.of("test-eh.servicebus.windows.net:9093")); when(tokenCredential.getToken(any(TokenRequestContext.class))) @@ -124,13 +124,13 @@ public void shouldProvideErrorToCallbackWithTokenError() throws UnsupportedCallb } @Test - public void shouldThrowExceptionWithNullBootstrapServers() { + void shouldThrowExceptionWithNullBootstrapServers() { assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure( Map.of(), null, null)); } @Test - public void shouldThrowExceptionWithMultipleBootstrapServers() { + void shouldThrowExceptionWithMultipleBootstrapServers() { Map configs = Map.of("bootstrap.servers", List.of("server1", "server2")); assertThrows(IllegalArgumentException.class, () -> azureEntraLoginCallbackHandler.configure( @@ -138,18 +138,18 @@ public void shouldThrowExceptionWithMultipleBootstrapServers() { } @Test - public void shouldThrowExceptionWithUnsupportedCallback() { + void shouldThrowExceptionWithUnsupportedCallback() { assertThrows(UnsupportedCallbackException.class, () -> azureEntraLoginCallbackHandler.handle( new Callback[] {mock(Callback.class)})); } @Test - public void shouldDoNothingOnClose() { + void shouldDoNothingOnClose() { azureEntraLoginCallbackHandler.close(); } @Test - public void shouldSupportDefaultConstructor() { + void shouldSupportDefaultConstructor() { new AzureEntraLoginCallbackHandler(); } } diff --git a/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerTokenTest.java b/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerTokenTest.java index 6072de470..74dcfd077 100644 --- a/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerTokenTest.java +++ b/api/src/test/java/io/kafbat/ui/config/auth/azure/AzureEntraOAuthBearerTokenTest.java @@ -12,7 +12,7 @@ import org.apache.kafka.common.errors.SaslAuthenticationException; import org.junit.jupiter.api.Test; -public class AzureEntraOAuthBearerTokenTest { +class AzureEntraOAuthBearerTokenTest { // These are not real tokens. It was generated using fake values with an invalid signature, // so it is safe to store here. diff --git a/api/src/test/java/io/kafbat/ui/controller/ApplicationConfigControllerTest.java b/api/src/test/java/io/kafbat/ui/controller/ApplicationConfigControllerTest.java index c95a34d26..9659b4ea6 100644 --- a/api/src/test/java/io/kafbat/ui/controller/ApplicationConfigControllerTest.java +++ b/api/src/test/java/io/kafbat/ui/controller/ApplicationConfigControllerTest.java @@ -20,7 +20,7 @@ class ApplicationConfigControllerTest extends AbstractIntegrationTest { private WebTestClient webTestClient; @Test - public void testUpload() throws IOException { + void testUpload() throws IOException { var fileToUpload = new ClassPathResource("/fileForUploadTest.txt", this.getClass()); UploadedFileInfoDTO result = webTestClient diff --git a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java index c66251b25..7aafea5ab 100644 --- a/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java +++ b/api/src/test/java/io/kafbat/ui/emitter/MessageFiltersTest.java @@ -199,7 +199,7 @@ void filterSpeedIsAtLeast5kPerSec() { long took = System.currentTimeMillis() - before; assertThat(took).isLessThan(1000); - assertThat(matched).isGreaterThan(0); + assertThat(matched).isPositive(); } } diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/HexSerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/HexSerdeTest.java index a13690c1e..54cb63321 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/builtin/HexSerdeTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/HexSerdeTest.java @@ -11,7 +11,7 @@ import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.EnumSource; -public class HexSerdeTest { +class HexSerdeTest { private static final byte[] TEST_BYTES = "hello world".getBytes(); private static final String TEST_BYTES_HEX_ENCODED = "68 65 6C 6C 6F 20 77 6F 72 6C 64"; diff --git a/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufRawSerdeTest.java b/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufRawSerdeTest.java index ba88b2d37..8dd65123c 100644 --- a/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufRawSerdeTest.java +++ b/api/src/test/java/io/kafbat/ui/serdes/builtin/ProtobufRawSerdeTest.java @@ -52,7 +52,7 @@ void deserializeSimpleMessage() { void deserializeEmptyMessage() { var deserialized = serde.deserializer(DUMMY_TOPIC, Serde.Target.VALUE) .deserialize(null, new byte[0]); - assertThat(deserialized.getResult()).isEqualTo(""); + assertThat(deserialized.getResult()).isEmpty(); } @Test diff --git a/api/src/test/java/io/kafbat/ui/service/ApplicationInfoServiceTest.java b/api/src/test/java/io/kafbat/ui/service/ApplicationInfoServiceTest.java index 64e6ed743..a4cae637b 100644 --- a/api/src/test/java/io/kafbat/ui/service/ApplicationInfoServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/ApplicationInfoServiceTest.java @@ -6,12 +6,12 @@ import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; -public class ApplicationInfoServiceTest extends AbstractIntegrationTest { +class ApplicationInfoServiceTest extends AbstractIntegrationTest { @Autowired private ApplicationInfoService service; @Test - public void testCustomGithubReleaseInfoTimeout() { + void testCustomGithubReleaseInfoTimeout() { assertEquals(100, service.githubReleaseInfo().getGithubApiMaxWaitTime()); } } diff --git a/api/src/test/java/io/kafbat/ui/service/ConfigTest.java b/api/src/test/java/io/kafbat/ui/service/ConfigTest.java index 4df63e476..756bd5c91 100644 --- a/api/src/test/java/io/kafbat/ui/service/ConfigTest.java +++ b/api/src/test/java/io/kafbat/ui/service/ConfigTest.java @@ -17,7 +17,7 @@ import org.springframework.test.web.reactive.server.WebTestClient; import org.testcontainers.shaded.org.awaitility.Awaitility; -public class ConfigTest extends AbstractIntegrationTest { +class ConfigTest extends AbstractIntegrationTest { @Autowired private WebTestClient webTestClient; @@ -35,7 +35,7 @@ void waitUntilStatsInitialized() { } @Test - public void testAlterConfig() { + void testAlterConfig() { String name = "background.threads"; Optional bc = getConfig(name); @@ -65,7 +65,7 @@ public void testAlterConfig() { } @Test - public void testAlterReadonlyConfig() { + void testAlterReadonlyConfig() { String name = "log.dirs"; webTestClient.put() diff --git a/api/src/test/java/io/kafbat/ui/service/LogDirsTest.java b/api/src/test/java/io/kafbat/ui/service/LogDirsTest.java index 3914f35a1..718f2f49c 100644 --- a/api/src/test/java/io/kafbat/ui/service/LogDirsTest.java +++ b/api/src/test/java/io/kafbat/ui/service/LogDirsTest.java @@ -15,13 +15,13 @@ import org.springframework.core.ParameterizedTypeReference; import org.springframework.test.web.reactive.server.WebTestClient; -public class LogDirsTest extends AbstractIntegrationTest { +class LogDirsTest extends AbstractIntegrationTest { @Autowired private WebTestClient webTestClient; @Test - public void testAllBrokers() { + void testAllBrokers() { List dirs = webTestClient.get() .uri("/api/clusters/{clusterName}/brokers/logdirs", LOCAL) .exchange() @@ -31,7 +31,7 @@ public void testAllBrokers() { .getResponseBody(); assertThat(dirs).hasSize(1); - BrokersLogdirsDTO dir = dirs.get(0); + BrokersLogdirsDTO dir = dirs.getFirst(); assertThat(dir.getName()).isEqualTo("/var/lib/kafka/data"); assertThat(dir.getTopics().stream().anyMatch(t -> t.getName().equals("__consumer_offsets"))) .isTrue(); @@ -41,12 +41,12 @@ public void testAllBrokers() { .findAny().orElseThrow(); assertThat(topic.getPartitions()).hasSize(1); - assertThat(topic.getPartitions().get(0).getBroker()).isEqualTo(1); - assertThat(topic.getPartitions().get(0).getSize()).isPositive(); + assertThat(topic.getPartitions().getFirst().getBroker()).isEqualTo(1); + assertThat(topic.getPartitions().getFirst().getSize()).isPositive(); } @Test - public void testOneBrokers() { + void testOneBrokers() { List dirs = webTestClient.get() .uri("/api/clusters/{clusterName}/brokers/logdirs?broker=1", LOCAL) .exchange() @@ -56,7 +56,7 @@ public void testOneBrokers() { .getResponseBody(); assertThat(dirs).hasSize(1); - BrokersLogdirsDTO dir = dirs.get(0); + BrokersLogdirsDTO dir = dirs.getFirst(); assertThat(dir.getName()).isEqualTo("/var/lib/kafka/data"); assertThat(dir.getTopics().stream().anyMatch(t -> t.getName().equals("__consumer_offsets"))) .isTrue(); @@ -66,12 +66,12 @@ public void testOneBrokers() { .findAny().orElseThrow(); assertThat(topic.getPartitions()).hasSize(1); - assertThat(topic.getPartitions().get(0).getBroker()).isEqualTo(1); - assertThat(topic.getPartitions().get(0).getSize()).isPositive(); + assertThat(topic.getPartitions().getFirst().getBroker()).isEqualTo(1); + assertThat(topic.getPartitions().getFirst().getSize()).isPositive(); } @Test - public void testWrongBrokers() { + void testWrongBrokers() { List dirs = webTestClient.get() .uri("/api/clusters/{clusterName}/brokers/logdirs?broker=2", LOCAL) .exchange() @@ -84,7 +84,7 @@ public void testWrongBrokers() { } @Test - public void testChangeDirToWrongDir() { + void testChangeDirToWrongDir() { ErrorResponseDTO dirs = webTestClient.patch() .uri("/api/clusters/{clusterName}/brokers/{id}/logdirs", LOCAL, 1) .bodyValue(Map.of( diff --git a/api/src/test/java/io/kafbat/ui/service/OffsetsResetServiceTest.java b/api/src/test/java/io/kafbat/ui/service/OffsetsResetServiceTest.java index 2485c7140..8c873980e 100644 --- a/api/src/test/java/io/kafbat/ui/service/OffsetsResetServiceTest.java +++ b/api/src/test/java/io/kafbat/ui/service/OffsetsResetServiceTest.java @@ -33,7 +33,7 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; -public class OffsetsResetServiceTest extends AbstractIntegrationTest { +class OffsetsResetServiceTest extends AbstractIntegrationTest { private static final int PARTITIONS = 5; diff --git a/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java b/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java index 4f61351c2..43cb29382 100644 --- a/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java +++ b/api/src/test/java/io/kafbat/ui/service/SchemaRegistryPaginationTest.java @@ -22,7 +22,7 @@ import org.mockito.Mockito; import reactor.core.publisher.Mono; -public class SchemaRegistryPaginationTest { +class SchemaRegistryPaginationTest { private static final String LOCAL_KAFKA_CLUSTER_NAME = "local"; @@ -127,7 +127,7 @@ void shouldCalculateCorrectPageCountForNonDivisiblePageSize() { assertThat(schemas.getBody()).isNotNull(); assertThat(schemas.getBody().getPageCount()).isEqualTo(4); assertThat(schemas.getBody().getSchemas()).hasSize(1); - assertThat(schemas.getBody().getSchemas().get(0).getSubject()).isEqualTo("subject99"); + assertThat(schemas.getBody().getSchemas().getFirst().getSubject()).isEqualTo("subject99"); } @SuppressWarnings("unchecked") diff --git a/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java b/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java index 009a6f67b..9e0164540 100644 --- a/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java +++ b/api/src/test/java/io/kafbat/ui/service/SendAndReadTests.java @@ -32,7 +32,7 @@ import org.springframework.beans.factory.annotation.Autowired; import reactor.test.StepVerifier; -public class SendAndReadTests extends AbstractIntegrationTest { +class SendAndReadTests extends AbstractIntegrationTest { private static final AvroSchema AVRO_SCHEMA_1 = new AvroSchema( "{" @@ -81,14 +81,16 @@ public class SendAndReadTests extends AbstractIntegrationTest { private static final String AVRO_SCHEMA_2_JSON_RECORD = "{ \"f1\": 111, \"f2\": \"testStr\" }"; private static final ProtobufSchema PROTOBUF_SCHEMA = new ProtobufSchema( - "syntax = \"proto3\";\n" - + "package io.kafbat;\n" - + "\n" - + "message TestProtoRecord {\n" - + " string f1 = 1;\n" - + " int32 f2 = 2;\n" - + "}\n" - + "\n" + """ + syntax = "proto3"; + package io.kafbat; + + message TestProtoRecord { + string f1 = 1; + int32 f2 = 2; + } + + """ ); private static final String PROTOBUF_SCHEMA_JSON_RECORD @@ -528,7 +530,7 @@ public void doAssert(Consumer msgAssert) { .blockLast(Duration.ofSeconds(5000)); assertThat(polled).isNotNull(); - assertThat(polled.getPartition()).isEqualTo(0); + assertThat(polled.getPartition()).isZero(); assertThat(polled.getOffset()).isNotNull(); msgAssert.accept(polled); } finally { diff --git a/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java b/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java index 9091ebb3d..08b211b40 100644 --- a/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java +++ b/api/src/test/java/io/kafbat/ui/service/TopicsServicePaginationTest.java @@ -66,7 +66,7 @@ private void init(Map topicsInCache) { } @Test - public void shouldListFirst25Topics() { + void shouldListFirst25Topics() { init( IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) @@ -93,7 +93,7 @@ private KafkaCluster buildKafkaCluster(String clusterName) { } @Test - public void shouldListFirst25TopicsSortedByNameDescendingOrder() { + void shouldListFirst25TopicsSortedByNameDescendingOrder() { var internalTopics = IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) .map(name -> new TopicDescription(name, false, List.of())) @@ -119,7 +119,7 @@ public void shouldListFirst25TopicsSortedByNameDescendingOrder() { } @Test - public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() { + void shouldCalculateCorrectPageCountForNonDivisiblePageSize() { init( IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) @@ -134,11 +134,11 @@ public void shouldCalculateCorrectPageCountForNonDivisiblePageSize() { assertThat(topics.getBody().getPageCount()).isEqualTo(4); assertThat(topics.getBody().getTopics()).hasSize(1); - assertThat(topics.getBody().getTopics().get(0).getName()).isEqualTo("99"); + assertThat(topics.getBody().getTopics().getFirst().getName()).isEqualTo("99"); } @Test - public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() { + void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() { init( IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) @@ -157,7 +157,7 @@ public void shouldCorrectlyHandleNonPositivePageNumberAndPageSize() { } @Test - public void shouldListBotInternalAndNonInternalTopics() { + void shouldListBotInternalAndNonInternalTopics() { init( IntStream.rangeClosed(1, 100).boxed() .map(Objects::toString) @@ -177,7 +177,7 @@ public void shouldListBotInternalAndNonInternalTopics() { } @Test - public void shouldListOnlyNonInternalTopics() { + void shouldListOnlyNonInternalTopics() { init( IntStream.rangeClosed(1, 100).boxed() @@ -198,7 +198,7 @@ public void shouldListOnlyNonInternalTopics() { } @Test - public void shouldListOnlyTopicsContainingOne() { + void shouldListOnlyTopicsContainingOne() { init( IntStream.rangeClosed(1, 100).boxed() @@ -219,7 +219,7 @@ public void shouldListOnlyTopicsContainingOne() { } @Test - public void shouldListTopicsOrderedByPartitionsCount() { + void shouldListTopicsOrderedByPartitionsCount() { Map internalTopics = IntStream.rangeClosed(1, 100).boxed() .map(i -> new TopicDescription(UUID.randomUUID().toString(), false, IntStream.range(0, i) diff --git a/api/src/test/java/io/kafbat/ui/service/audit/AuditIntegrationTest.java b/api/src/test/java/io/kafbat/ui/service/audit/AuditIntegrationTest.java index 8f3221289..ebff4bcd6 100644 --- a/api/src/test/java/io/kafbat/ui/service/audit/AuditIntegrationTest.java +++ b/api/src/test/java/io/kafbat/ui/service/audit/AuditIntegrationTest.java @@ -22,7 +22,7 @@ import org.springframework.test.web.reactive.server.WebTestClient; import org.testcontainers.shaded.org.awaitility.Awaitility; -public class AuditIntegrationTest extends AbstractIntegrationTest { +class AuditIntegrationTest extends AbstractIntegrationTest { @Autowired private WebTestClient webTestClient; diff --git a/api/src/test/java/io/kafbat/ui/service/masking/DataMaskingTest.java b/api/src/test/java/io/kafbat/ui/service/masking/DataMaskingTest.java index 8cc2e5c9d..cd0a6c02e 100644 --- a/api/src/test/java/io/kafbat/ui/service/masking/DataMaskingTest.java +++ b/api/src/test/java/io/kafbat/ui/service/masking/DataMaskingTest.java @@ -1,6 +1,5 @@ package io.kafbat.ui.service.masking; -import static org.mockito.Mockito.eq; import static org.mockito.Mockito.reset; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; @@ -57,14 +56,14 @@ void appliesMasksToJsonContainerArgsBasedOnTopicPatterns(String jsonObjOrArr) { var parsedJson = (ContainerNode) new JsonMapper().readTree(jsonObjOrArr); masking.getMaskingFunction(TOPIC, Serde.Target.KEY).apply(jsonObjOrArr); - verify(policy1).applyToJsonContainer(eq(parsedJson)); + verify(policy1).applyToJsonContainer(parsedJson); verifyNoInteractions(policy2, policy3); reset(policy1, policy2, policy3); masking.getMaskingFunction(TOPIC, Serde.Target.VALUE).apply(jsonObjOrArr); - verify(policy2).applyToJsonContainer(eq(parsedJson)); - verify(policy3).applyToJsonContainer(eq(policy2.applyToJsonContainer(parsedJson))); + verify(policy2).applyToJsonContainer(parsedJson); + verify(policy3).applyToJsonContainer(policy2.applyToJsonContainer(parsedJson)); verifyNoInteractions(policy1); } @@ -76,13 +75,13 @@ void appliesMasksToJsonContainerArgsBasedOnTopicPatterns(String jsonObjOrArr) { }) void appliesFirstFoundMaskToStringArgsBasedOnTopicPatterns(String nonJsonObjOrArrString) { masking.getMaskingFunction(TOPIC, Serde.Target.KEY).apply(nonJsonObjOrArrString); - verify(policy1).applyToString(eq(nonJsonObjOrArrString)); + verify(policy1).applyToString(nonJsonObjOrArrString); verifyNoInteractions(policy2, policy3); reset(policy1, policy2, policy3); masking.getMaskingFunction(TOPIC, Serde.Target.VALUE).apply(nonJsonObjOrArrString); - verify(policy2).applyToString(eq(nonJsonObjOrArrString)); + verify(policy2).applyToString(nonJsonObjOrArrString); verifyNoInteractions(policy1, policy3); } From eaeb4a41dbd3233ac65aa5123d800d0d14db0c5f Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Tue, 11 Feb 2025 19:02:46 +0500 Subject: [PATCH 3/6] BE: Chore: Cleanup api module (#815) --- .../ui/client/RetryingKafkaConnectClient.java | 4 +- .../config/auth/BasicAuthSecurityConfig.java | 4 - .../logout/CognitoLogoutSuccessHandler.java | 2 +- .../io/kafbat/ui/emitter/OffsetsInfo.java | 2 +- .../kafbat/ui/emitter/ResultSizeLimiter.java | 23 ----- .../exception/ConnectNotFoundException.java | 13 --- .../exception/DuplicateEntityException.java | 13 --- .../io/kafbat/ui/exception/ErrorCode.java | 6 -- .../GlobalErrorWebExceptionHandler.java | 3 +- ...afkaConnectConflictResponseException.java} | 5 +- .../ui/exception/KsqlDbNotFoundException.java | 13 --- .../SchemaFailedToDeleteException.java | 13 --- .../UnprocessableEntityException.java | 14 --- .../kafbat/ui/mapper/ConsumerGroupMapper.java | 26 ++--- .../ui/mapper/DescribeLogDirsMapper.java | 5 +- .../io/kafbat/ui/serdes/SerdeInstance.java | 2 +- .../ui/serdes/builtin/ProtobufFileSerde.java | 2 +- .../ui/service/KafkaConnectService.java | 2 - .../ui/service/ReactiveAdminClient.java | 6 -- .../ui/service/SchemaRegistryService.java | 2 +- .../io/kafbat/ui/service/TopicsService.java | 12 +-- .../io/kafbat/ui/service/acl/AclsService.java | 4 +- .../kafbat/ui/service/ksql/KsqlApiClient.java | 2 +- .../service/ksql/response/ResponseParser.java | 97 ++++++++----------- .../ui/service/metrics/MetricsCollector.java | 9 +- .../kafbat/ui/util/EmptyRedirectStrategy.java | 22 +---- .../ui/util/KafkaServicesValidation.java | 5 +- .../io/kafbat/ui/util/ReactiveFailover.java | 4 +- .../util/jsonschema/JsonAvroConversion.java | 42 +++----- .../kafbat/ui/util/jsonschema/JsonSchema.java | 9 -- 30 files changed, 89 insertions(+), 277 deletions(-) delete mode 100644 api/src/main/java/io/kafbat/ui/emitter/ResultSizeLimiter.java delete mode 100644 api/src/main/java/io/kafbat/ui/exception/ConnectNotFoundException.java delete mode 100644 api/src/main/java/io/kafbat/ui/exception/DuplicateEntityException.java rename api/src/main/java/io/kafbat/ui/exception/{KafkaConnectConflictReponseException.java => KafkaConnectConflictResponseException.java} (67%) delete mode 100644 api/src/main/java/io/kafbat/ui/exception/KsqlDbNotFoundException.java delete mode 100644 api/src/main/java/io/kafbat/ui/exception/SchemaFailedToDeleteException.java delete mode 100644 api/src/main/java/io/kafbat/ui/exception/UnprocessableEntityException.java diff --git a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java index df2da3e55..cdf5bce14 100644 --- a/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java +++ b/api/src/main/java/io/kafbat/ui/client/RetryingKafkaConnectClient.java @@ -12,7 +12,7 @@ import io.kafbat.ui.connect.model.ConnectorTopics; import io.kafbat.ui.connect.model.NewConnector; import io.kafbat.ui.connect.model.TaskStatus; -import io.kafbat.ui.exception.KafkaConnectConflictReponseException; +import io.kafbat.ui.exception.KafkaConnectConflictResponseException; import io.kafbat.ui.exception.ValidationException; import io.kafbat.ui.util.WebClientConfigurator; import jakarta.validation.constraints.NotNull; @@ -48,7 +48,7 @@ private static Retry conflictCodeRetry() { .fixedDelay(MAX_RETRIES, RETRIES_DELAY) .filter(e -> e instanceof WebClientResponseException.Conflict) .onRetryExhaustedThrow((spec, signal) -> - new KafkaConnectConflictReponseException( + new KafkaConnectConflictResponseException( (WebClientResponseException.Conflict) signal.failure())); } diff --git a/api/src/main/java/io/kafbat/ui/config/auth/BasicAuthSecurityConfig.java b/api/src/main/java/io/kafbat/ui/config/auth/BasicAuthSecurityConfig.java index db8ef8153..788c33bdd 100644 --- a/api/src/main/java/io/kafbat/ui/config/auth/BasicAuthSecurityConfig.java +++ b/api/src/main/java/io/kafbat/ui/config/auth/BasicAuthSecurityConfig.java @@ -1,8 +1,6 @@ package io.kafbat.ui.config.auth; -import io.kafbat.ui.util.EmptyRedirectStrategy; import io.kafbat.ui.util.StaticFileWebFilter; -import java.net.URI; import lombok.extern.slf4j.Slf4j; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -12,8 +10,6 @@ import org.springframework.security.config.web.server.SecurityWebFiltersOrder; import org.springframework.security.config.web.server.ServerHttpSecurity; import org.springframework.security.web.server.SecurityWebFilterChain; -import org.springframework.security.web.server.authentication.RedirectServerAuthenticationSuccessHandler; -import org.springframework.security.web.server.authentication.logout.RedirectServerLogoutSuccessHandler; import org.springframework.security.web.server.util.matcher.ServerWebExchangeMatchers; @Configuration diff --git a/api/src/main/java/io/kafbat/ui/config/auth/logout/CognitoLogoutSuccessHandler.java b/api/src/main/java/io/kafbat/ui/config/auth/logout/CognitoLogoutSuccessHandler.java index d98ea22af..e58f51ab3 100644 --- a/api/src/main/java/io/kafbat/ui/config/auth/logout/CognitoLogoutSuccessHandler.java +++ b/api/src/main/java/io/kafbat/ui/config/auth/logout/CognitoLogoutSuccessHandler.java @@ -40,7 +40,7 @@ public Mono handle(WebFilterExchange exchange, Authentication authenticati requestUri.getPath(), requestUri.getQuery()); final UriComponents baseUrl = UriComponentsBuilder - .fromHttpUrl(fullUrl) + .fromUriString(fullUrl) .replacePath("/") .replaceQuery(null) .fragment(null) diff --git a/api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java b/api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java index a361834d0..7f34e1708 100644 --- a/api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java +++ b/api/src/main/java/io/kafbat/ui/emitter/OffsetsInfo.java @@ -53,7 +53,7 @@ private Map firstOffsetsForPolling(Consumer consumer Collection partitions) { try { // we try to use offsetsForTimes() to find earliest offsets, since for - // some topics (like compacted) beginningOffsets() ruturning 0 offsets + // some topics (like compacted) beginningOffsets() returning 0 offsets // even when effectively first offset can be very high var offsets = consumer.offsetsForTimes( partitions.stream().collect(Collectors.toMap(p -> p, p -> 0L)) diff --git a/api/src/main/java/io/kafbat/ui/emitter/ResultSizeLimiter.java b/api/src/main/java/io/kafbat/ui/emitter/ResultSizeLimiter.java deleted file mode 100644 index 3e0ec2a43..000000000 --- a/api/src/main/java/io/kafbat/ui/emitter/ResultSizeLimiter.java +++ /dev/null @@ -1,23 +0,0 @@ -package io.kafbat.ui.emitter; - -import io.kafbat.ui.model.TopicMessageEventDTO; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; - -public class ResultSizeLimiter implements Predicate { - private final AtomicInteger processed = new AtomicInteger(); - private final int limit; - - public ResultSizeLimiter(int limit) { - this.limit = limit; - } - - @Override - public boolean test(TopicMessageEventDTO event) { - if (event.getType().equals(TopicMessageEventDTO.TypeEnum.MESSAGE)) { - final int i = processed.incrementAndGet(); - return i <= limit; - } - return true; - } -} diff --git a/api/src/main/java/io/kafbat/ui/exception/ConnectNotFoundException.java b/api/src/main/java/io/kafbat/ui/exception/ConnectNotFoundException.java deleted file mode 100644 index 5978c2e93..000000000 --- a/api/src/main/java/io/kafbat/ui/exception/ConnectNotFoundException.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.kafbat.ui.exception; - -public class ConnectNotFoundException extends CustomBaseException { - - public ConnectNotFoundException() { - super("Connect not found"); - } - - @Override - public ErrorCode getErrorCode() { - return ErrorCode.CONNECT_NOT_FOUND; - } -} diff --git a/api/src/main/java/io/kafbat/ui/exception/DuplicateEntityException.java b/api/src/main/java/io/kafbat/ui/exception/DuplicateEntityException.java deleted file mode 100644 index 23ba0c5af..000000000 --- a/api/src/main/java/io/kafbat/ui/exception/DuplicateEntityException.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.kafbat.ui.exception; - -public class DuplicateEntityException extends CustomBaseException { - - public DuplicateEntityException(String message) { - super(message); - } - - @Override - public ErrorCode getErrorCode() { - return ErrorCode.DUPLICATED_ENTITY; - } -} diff --git a/api/src/main/java/io/kafbat/ui/exception/ErrorCode.java b/api/src/main/java/io/kafbat/ui/exception/ErrorCode.java index 6d4a732e3..32cf5c5c8 100644 --- a/api/src/main/java/io/kafbat/ui/exception/ErrorCode.java +++ b/api/src/main/java/io/kafbat/ui/exception/ErrorCode.java @@ -4,11 +4,8 @@ import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; - public enum ErrorCode { - FORBIDDEN(403, HttpStatus.FORBIDDEN), - UNEXPECTED(5000, HttpStatus.INTERNAL_SERVER_ERROR), KSQL_API_ERROR(5001, HttpStatus.INTERNAL_SERVER_ERROR), BINDING_FAIL(4001, HttpStatus.BAD_REQUEST), @@ -16,13 +13,10 @@ public enum ErrorCode { VALIDATION_FAIL(4002, HttpStatus.BAD_REQUEST), READ_ONLY_MODE_ENABLE(4003, HttpStatus.METHOD_NOT_ALLOWED), CONNECT_CONFLICT_RESPONSE(4004, HttpStatus.CONFLICT), - DUPLICATED_ENTITY(4005, HttpStatus.CONFLICT), UNPROCESSABLE_ENTITY(4006, HttpStatus.UNPROCESSABLE_ENTITY), CLUSTER_NOT_FOUND(4007, HttpStatus.NOT_FOUND), TOPIC_NOT_FOUND(4008, HttpStatus.NOT_FOUND), SCHEMA_NOT_FOUND(4009, HttpStatus.NOT_FOUND), - CONNECT_NOT_FOUND(4010, HttpStatus.NOT_FOUND), - KSQLDB_NOT_FOUND(4011, HttpStatus.NOT_FOUND), DIR_NOT_FOUND(4012, HttpStatus.BAD_REQUEST), TOPIC_OR_PARTITION_NOT_FOUND(4013, HttpStatus.BAD_REQUEST), INVALID_REQUEST(4014, HttpStatus.BAD_REQUEST), diff --git a/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java b/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java index e6c0c76a5..482ced492 100644 --- a/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java +++ b/api/src/main/java/io/kafbat/ui/exception/GlobalErrorWebExceptionHandler.java @@ -102,8 +102,7 @@ private Mono render(CustomBaseException baseException, ServerReq private Mono render(WebExchangeBindException exception, ServerRequest request) { Map> fieldErrorsMap = exception.getFieldErrors().stream() - .collect(Collectors - .toMap(FieldError::getField, f -> Set.of(extractFieldErrorMsg(f)), Sets::union)); + .collect(Collectors.toMap(FieldError::getField, f -> Set.of(extractFieldErrorMsg(f)), Sets::union)); var fieldsErrors = fieldErrorsMap.entrySet().stream() .map(e -> { diff --git a/api/src/main/java/io/kafbat/ui/exception/KafkaConnectConflictReponseException.java b/api/src/main/java/io/kafbat/ui/exception/KafkaConnectConflictResponseException.java similarity index 67% rename from api/src/main/java/io/kafbat/ui/exception/KafkaConnectConflictReponseException.java rename to api/src/main/java/io/kafbat/ui/exception/KafkaConnectConflictResponseException.java index 48376d1ac..ad356b7e5 100644 --- a/api/src/main/java/io/kafbat/ui/exception/KafkaConnectConflictReponseException.java +++ b/api/src/main/java/io/kafbat/ui/exception/KafkaConnectConflictResponseException.java @@ -1,11 +1,10 @@ package io.kafbat.ui.exception; - import org.springframework.web.reactive.function.client.WebClientResponseException; -public class KafkaConnectConflictReponseException extends CustomBaseException { +public class KafkaConnectConflictResponseException extends CustomBaseException { - public KafkaConnectConflictReponseException(WebClientResponseException.Conflict e) { + public KafkaConnectConflictResponseException(WebClientResponseException.Conflict e) { super("Kafka Connect responded with 409 (Conflict) code. Response body: " + e.getResponseBodyAsString()); } diff --git a/api/src/main/java/io/kafbat/ui/exception/KsqlDbNotFoundException.java b/api/src/main/java/io/kafbat/ui/exception/KsqlDbNotFoundException.java deleted file mode 100644 index 255ccec80..000000000 --- a/api/src/main/java/io/kafbat/ui/exception/KsqlDbNotFoundException.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.kafbat.ui.exception; - -public class KsqlDbNotFoundException extends CustomBaseException { - - public KsqlDbNotFoundException() { - super("KSQL DB not found"); - } - - @Override - public ErrorCode getErrorCode() { - return ErrorCode.KSQLDB_NOT_FOUND; - } -} diff --git a/api/src/main/java/io/kafbat/ui/exception/SchemaFailedToDeleteException.java b/api/src/main/java/io/kafbat/ui/exception/SchemaFailedToDeleteException.java deleted file mode 100644 index 05ba55c70..000000000 --- a/api/src/main/java/io/kafbat/ui/exception/SchemaFailedToDeleteException.java +++ /dev/null @@ -1,13 +0,0 @@ -package io.kafbat.ui.exception; - -public class SchemaFailedToDeleteException extends CustomBaseException { - - public SchemaFailedToDeleteException(String schemaName) { - super(String.format("Unable to delete schema with name %s", schemaName)); - } - - @Override - public ErrorCode getErrorCode() { - return ErrorCode.SCHEMA_NOT_DELETED; - } -} diff --git a/api/src/main/java/io/kafbat/ui/exception/UnprocessableEntityException.java b/api/src/main/java/io/kafbat/ui/exception/UnprocessableEntityException.java deleted file mode 100644 index fcd9e41fd..000000000 --- a/api/src/main/java/io/kafbat/ui/exception/UnprocessableEntityException.java +++ /dev/null @@ -1,14 +0,0 @@ -package io.kafbat.ui.exception; - - -public class UnprocessableEntityException extends CustomBaseException { - - public UnprocessableEntityException(String message) { - super(message); - } - - @Override - public ErrorCode getErrorCode() { - return ErrorCode.UNPROCESSABLE_ENTITY; - } -} diff --git a/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java b/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java index 72b3d65b4..800eab757 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/ConsumerGroupMapper.java @@ -97,23 +97,15 @@ private static BrokerDTO mapCoordinator(Node node) { return new BrokerDTO().host(node.host()).id(node.id()).port(node.port()); } - private static ConsumerGroupStateDTO mapConsumerGroupState( - org.apache.kafka.common.ConsumerGroupState state) { - switch (state) { - case DEAD: - return ConsumerGroupStateDTO.DEAD; - case EMPTY: - return ConsumerGroupStateDTO.EMPTY; - case STABLE: - return ConsumerGroupStateDTO.STABLE; - case PREPARING_REBALANCE: - return ConsumerGroupStateDTO.PREPARING_REBALANCE; - case COMPLETING_REBALANCE: - return ConsumerGroupStateDTO.COMPLETING_REBALANCE; - default: - return ConsumerGroupStateDTO.UNKNOWN; - } + private static ConsumerGroupStateDTO mapConsumerGroupState(org.apache.kafka.common.ConsumerGroupState state) { + return switch (state) { + case DEAD -> ConsumerGroupStateDTO.DEAD; + case EMPTY -> ConsumerGroupStateDTO.EMPTY; + case STABLE -> ConsumerGroupStateDTO.STABLE; + case PREPARING_REBALANCE -> ConsumerGroupStateDTO.PREPARING_REBALANCE; + case COMPLETING_REBALANCE -> ConsumerGroupStateDTO.COMPLETING_REBALANCE; + default -> ConsumerGroupStateDTO.UNKNOWN; + }; } - } diff --git a/api/src/main/java/io/kafbat/ui/mapper/DescribeLogDirsMapper.java b/api/src/main/java/io/kafbat/ui/mapper/DescribeLogDirsMapper.java index 8a4e44e5c..bccd3a66b 100644 --- a/api/src/main/java/io/kafbat/ui/mapper/DescribeLogDirsMapper.java +++ b/api/src/main/java/io/kafbat/ui/mapper/DescribeLogDirsMapper.java @@ -42,7 +42,7 @@ private BrokersLogdirsDTO toBrokerLogDirs(Integer broker, String dirName, private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name, List> partitions) { + DescribeLogDirsResponse.ReplicaInfo>> partitions) { BrokerTopicLogdirsDTO topic = new BrokerTopicLogdirsDTO(); topic.setName(name); topic.setPartitions( @@ -54,8 +54,7 @@ private BrokerTopicLogdirsDTO toTopicLogDirs(Integer broker, String name, } private BrokerTopicPartitionLogdirDTO topicPartitionLogDir(Integer broker, Integer partition, - DescribeLogDirsResponse.ReplicaInfo - replicaInfo) { + DescribeLogDirsResponse.ReplicaInfo replicaInfo) { BrokerTopicPartitionLogdirDTO logDir = new BrokerTopicPartitionLogdirDTO(); logDir.setBroker(broker); logDir.setPartition(partition); diff --git a/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java b/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java index 7c1826257..b0fdc9834 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java +++ b/api/src/main/java/io/kafbat/ui/serdes/SerdeInstance.java @@ -97,7 +97,7 @@ public void close() { try { serde.close(); } catch (Exception e) { - log.error("Error closing serde " + name, e); + log.error("Error closing serde {}", name, e); } return null; }); diff --git a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java index 20ea47a06..618c711b1 100644 --- a/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java +++ b/api/src/main/java/io/kafbat/ui/serdes/builtin/ProtobufFileSerde.java @@ -380,7 +380,7 @@ private Map knownProtoFiles() { } private ProtoFile loadKnownProtoFile(String path, Descriptors.FileDescriptor fileDescriptor) { - String protoFileString = null; + String protoFileString; // know type file contains either message or enum if (!fileDescriptor.getMessageTypes().isEmpty()) { protoFileString = new ProtobufSchema(fileDescriptor.getMessageTypes().getFirst()).canonicalString(); diff --git a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java index 31e4268a0..92bfc260b 100644 --- a/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java +++ b/api/src/main/java/io/kafbat/ui/service/KafkaConnectService.java @@ -1,6 +1,5 @@ package io.kafbat.ui.service; -import com.fasterxml.jackson.databind.ObjectMapper; import io.kafbat.ui.connect.api.KafkaConnectClientApi; import io.kafbat.ui.connect.model.ConnectorStatus; import io.kafbat.ui.connect.model.ConnectorStatusConnector; @@ -44,7 +43,6 @@ public class KafkaConnectService { private final ClusterMapper clusterMapper; private final KafkaConnectMapper kafkaConnectMapper; - private final ObjectMapper objectMapper; private final KafkaConfigSanitizer kafkaConfigSanitizer; public Flux getConnects(KafkaCluster cluster) { diff --git a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java index 651f6d531..6aea290c3 100644 --- a/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ReactiveAdminClient.java @@ -389,12 +389,6 @@ static Mono> toMonoWithExceptionFilter(Map> v ); } - public Mono>> describeLogDirs() { - return describeCluster() - .map(d -> d.getNodes().stream().map(Node::id).collect(toList())) - .flatMap(this::describeLogDirs); - } - public Mono>> describeLogDirs( Collection brokerIds) { return toMono(client.describeLogDirs(brokerIds).all()) diff --git a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java index 1bac22235..c725a787e 100644 --- a/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java +++ b/api/src/main/java/io/kafbat/ui/service/SchemaRegistryService.java @@ -63,7 +63,7 @@ public Mono> getAllSubjectNames(KafkaCluster cluster) { @SneakyThrows private List parseSubjectListString(String subjectNamesStr) { //workaround for https://github.com/spring-projects/spring-framework/issues/24734 - return new JsonMapper().readValue(subjectNamesStr, new TypeReference>() { + return new JsonMapper().readValue(subjectNamesStr, new TypeReference<>() { }); } diff --git a/api/src/main/java/io/kafbat/ui/service/TopicsService.java b/api/src/main/java/io/kafbat/ui/service/TopicsService.java index 015a86838..95ad7bc5a 100644 --- a/api/src/main/java/io/kafbat/ui/service/TopicsService.java +++ b/api/src/main/java/io/kafbat/ui/service/TopicsService.java @@ -97,7 +97,7 @@ private Mono loadTopic(KafkaCluster c, String topicName) { /** * After creation topic can be invisible via API for some time. - * To workaround this, we retyring topic loading until it becomes visible. + * To workaround this, we're retrying topic loading until it becomes visible. */ private Mono loadTopicAfterCreation(KafkaCluster c, String topicName) { return loadTopic(c, topicName) @@ -137,8 +137,7 @@ private List createList(List orderedNames, .collect(toList()); } - private Mono getPartitionOffsets(Map - descriptionsMap, + private Mono getPartitionOffsets(Map descriptionsMap, ReactiveAdminClient ac) { var descriptions = descriptionsMap.values(); return ac.listOffsets(descriptions, OffsetSpec.earliest()) @@ -225,8 +224,7 @@ private Mono updateTopic(KafkaCluster cluster, .then(loadTopic(cluster, topicName))); } - public Mono updateTopic(KafkaCluster cl, String topicName, - Mono topicUpdate) { + public Mono updateTopic(KafkaCluster cl, String topicName, Mono topicUpdate) { return topicUpdate .flatMap(t -> updateTopic(cl, topicName, t)); } @@ -298,7 +296,7 @@ private Map> getPartitionsRea var brokers = brokersUsage.entrySet().stream() .sorted(Map.Entry.comparingByValue()) .map(Map.Entry::getKey) - .collect(toList()); + .toList(); // Iterate brokers and try to add them in assignment // while partition replicas count != requested replication factor @@ -326,7 +324,7 @@ private Map> getPartitionsRea var brokersUsageList = brokersUsage.entrySet().stream() .sorted(Map.Entry.comparingByValue(Comparator.reverseOrder())) .map(Map.Entry::getKey) - .collect(toList()); + .toList(); // Iterate brokers and try to remove them from assignment // while partition replicas count != requested replication factor diff --git a/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java b/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java index b3877a336..30078d435 100644 --- a/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java +++ b/api/src/main/java/io/kafbat/ui/service/acl/AclsService.java @@ -112,13 +112,13 @@ private void logAclSyncPlan(KafkaCluster cluster, Set toBeAdded, Set if (!toBeAdded.isEmpty()) { log.info("ACLs to be added ({}): ", toBeAdded.size()); for (AclBinding aclBinding : toBeAdded) { - log.info(" " + AclCsv.createAclString(aclBinding)); + log.info(" {}", AclCsv.createAclString(aclBinding)); } } if (!toBeDeleted.isEmpty()) { log.info("ACLs to be deleted ({}): ", toBeDeleted.size()); for (AclBinding aclBinding : toBeDeleted) { - log.info(" " + AclCsv.createAclString(aclBinding)); + log.info(" {}", AclCsv.createAclString(aclBinding)); } } } diff --git a/api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java b/api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java index 90192eb2d..13daee2bc 100644 --- a/api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java +++ b/api/src/main/java/io/kafbat/ui/service/ksql/KsqlApiClient.java @@ -176,7 +176,7 @@ public Flux execute(String ksql, Map streamPr if (statements.size() > 1) { return errorTableFlux("Only single statement supported now"); } - if (statements.size() == 0) { + if (statements.isEmpty()) { return errorTableFlux("No valid ksql statement found"); } if (isUnsupportedStatementType(statements.get(0))) { diff --git a/api/src/main/java/io/kafbat/ui/service/ksql/response/ResponseParser.java b/api/src/main/java/io/kafbat/ui/service/ksql/response/ResponseParser.java index f353ea578..a5d54f369 100644 --- a/api/src/main/java/io/kafbat/ui/service/ksql/response/ResponseParser.java +++ b/api/src/main/java/io/kafbat/ui/service/ksql/response/ResponseParser.java @@ -99,64 +99,45 @@ public static List parseStatementResponse(JsonN .orElse("unknown"); // messages structure can be inferred from https://github.com/confluentinc/ksql/blob/master/ksqldb-rest-model/src/main/java/io/confluent/ksql/rest/entity/KsqlEntity.java - switch (type) { - case "currentStatus": - return parseObject( - "Status", - List.of("status", "message"), - jsonNode.get("commandStatus") - ); - case "properties": - return parseProperties(jsonNode); - case "queries": - return parseArray("Queries", "queries", jsonNode); - case "sourceDescription": - return parseObjectDynamically("Source Description", jsonNode.get("sourceDescription")); - case "queryDescription": - return parseObjectDynamically("Queries Description", jsonNode.get("queryDescription")); - case "topicDescription": - return parseObject( - "Topic Description", - List.of("name", "kafkaTopic", "format", "schemaString"), - jsonNode - ); - case "streams": - return parseArray("Streams", "streams", jsonNode); - case "tables": - return parseArray("Tables", "tables", jsonNode); - case "kafka_topics": - return parseArray("Topics", "topics", jsonNode); - case "kafka_topics_extended": - return parseArray("Topics extended", "topics", jsonNode); - case "executionPlan": - return parseObject("Execution plan", List.of("executionPlanText"), jsonNode); - case "source_descriptions": - return parseArray("Source descriptions", "sourceDescriptions", jsonNode); - case "query_descriptions": - return parseArray("Queries", "queryDescriptions", jsonNode); - case "describe_function": - return parseObject("Function description", - List.of("name", "author", "version", "description", "functions", "path", "type"), - jsonNode - ); - case "function_names": - return parseArray("Function Names", "functions", jsonNode); - case "connector_info": - return parseObjectDynamically("Connector Info", jsonNode.get("info")); - case "drop_connector": - return parseObject("Dropped connector", List.of("connectorName"), jsonNode); - case "connector_list": - return parseArray("Connectors", "connectors", jsonNode); - case "connector_plugins_list": - return parseArray("Connector Plugins", "connectorPlugins", jsonNode); - case "connector_description": - return parseObject("Connector Description", - List.of("connectorClass", "status", "sources", "topics"), - jsonNode - ); - default: - return parseUnknownResponse(jsonNode); - } + return switch (type) { + case "currentStatus" -> parseObject( + "Status", + List.of("status", "message"), + jsonNode.get("commandStatus") + ); + case "properties" -> parseProperties(jsonNode); + case "queries" -> parseArray("Queries", "queries", jsonNode); + case "sourceDescription" -> parseObjectDynamically("Source Description", jsonNode.get("sourceDescription")); + case "queryDescription" -> parseObjectDynamically("Queries Description", jsonNode.get("queryDescription")); + case "topicDescription" -> parseObject( + "Topic Description", + List.of("name", "kafkaTopic", "format", "schemaString"), + jsonNode + ); + case "streams" -> parseArray("Streams", "streams", jsonNode); + case "tables" -> parseArray("Tables", "tables", jsonNode); + case "kafka_topics" -> parseArray("Topics", "topics", jsonNode); + case "kafka_topics_extended" -> parseArray("Topics extended", "topics", jsonNode); + case "executionPlan" -> parseObject("Execution plan", List.of("executionPlanText"), jsonNode); + case "source_descriptions" -> parseArray("Source descriptions", "sourceDescriptions", jsonNode); + case "query_descriptions" -> parseArray("Queries", "queryDescriptions", jsonNode); + case "describe_function" -> parseObject( + "Function description", + List.of("name", "author", "version", "description", "functions", "path", "type"), + jsonNode + ); + case "function_names" -> parseArray("Function Names", "functions", jsonNode); + case "connector_info" -> parseObjectDynamically("Connector Info", jsonNode.get("info")); + case "drop_connector" -> parseObject("Dropped connector", List.of("connectorName"), jsonNode); + case "connector_list" -> parseArray("Connectors", "connectors", jsonNode); + case "connector_plugins_list" -> parseArray("Connector Plugins", "connectorPlugins", jsonNode); + case "connector_description" -> parseObject( + "Connector Description", + List.of("connectorClass", "status", "sources", "topics"), + jsonNode + ); + default -> parseUnknownResponse(jsonNode); + }; } private static List parseObjectDynamically( diff --git a/api/src/main/java/io/kafbat/ui/service/metrics/MetricsCollector.java b/api/src/main/java/io/kafbat/ui/service/metrics/MetricsCollector.java index b124feb8d..e9a08e8cb 100644 --- a/api/src/main/java/io/kafbat/ui/service/metrics/MetricsCollector.java +++ b/api/src/main/java/io/kafbat/ui/service/metrics/MetricsCollector.java @@ -28,7 +28,7 @@ public Mono getBrokerMetrics(KafkaCluster cluster, Collection nod return Flux.fromIterable(nodes) .flatMap(n -> getMetrics(cluster, n).map(lst -> Tuples.of(n, lst))) .collectMap(Tuple2::getT1, Tuple2::getT2) - .map(nodeMetrics -> collectMetrics(cluster, nodeMetrics)) + .map(this::collectMetrics) .defaultIfEmpty(Metrics.empty()); } @@ -45,20 +45,19 @@ private Mono> getMetrics(KafkaCluster kafkaCluster, Node node) { return metricFlux.collectList(); } - public Metrics collectMetrics(KafkaCluster cluster, Map> perBrokerMetrics) { + public Metrics collectMetrics(Map> perBrokerMetrics) { Metrics.MetricsBuilder builder = Metrics.builder() .perBrokerMetrics( perBrokerMetrics.entrySet() .stream() .collect(Collectors.toMap(e -> e.getKey().id(), Map.Entry::getValue))); - populateWellknowMetrics(cluster, perBrokerMetrics) - .apply(builder); + populateWellknowMetrics(perBrokerMetrics).apply(builder); return builder.build(); } - private WellKnownMetrics populateWellknowMetrics(KafkaCluster cluster, Map> perBrokerMetrics) { + private WellKnownMetrics populateWellknowMetrics(Map> perBrokerMetrics) { WellKnownMetrics wellKnownMetrics = new WellKnownMetrics(); perBrokerMetrics.forEach((node, metrics) -> metrics.forEach(metric -> diff --git a/api/src/main/java/io/kafbat/ui/util/EmptyRedirectStrategy.java b/api/src/main/java/io/kafbat/ui/util/EmptyRedirectStrategy.java index c6f80a113..9ebbba7f8 100644 --- a/api/src/main/java/io/kafbat/ui/util/EmptyRedirectStrategy.java +++ b/api/src/main/java/io/kafbat/ui/util/EmptyRedirectStrategy.java @@ -10,27 +10,18 @@ public class EmptyRedirectStrategy implements ServerRedirectStrategy { - private HttpStatus httpStatus = HttpStatus.FOUND; - - private boolean contextRelative = true; - public Mono sendRedirect(ServerWebExchange exchange, URI location) { Assert.notNull(exchange, "exchange cannot be null"); Assert.notNull(location, "location cannot be null"); return Mono.fromRunnable(() -> { ServerHttpResponse response = exchange.getResponse(); - response.setStatusCode(this.httpStatus); + response.setStatusCode(HttpStatus.FOUND); response.getHeaders().setLocation(createLocation(exchange, location)); }); } private URI createLocation(ServerWebExchange exchange, URI location) { - if (!this.contextRelative) { - return location; - } - - String url = location.getPath().isEmpty() ? "/" - : location.toASCIIString(); + String url = location.getPath().isEmpty() ? "/" : location.toASCIIString(); if (url.startsWith("/")) { String context = exchange.getRequest().getPath().contextPath().value(); @@ -38,13 +29,4 @@ private URI createLocation(ServerWebExchange exchange, URI location) { } return location; } - - public void setHttpStatus(HttpStatus httpStatus) { - Assert.notNull(httpStatus, "httpStatus cannot be null"); - this.httpStatus = httpStatus; - } - - public void setContextRelative(boolean contextRelative) { - this.contextRelative = contextRelative; - } } diff --git a/api/src/main/java/io/kafbat/ui/util/KafkaServicesValidation.java b/api/src/main/java/io/kafbat/ui/util/KafkaServicesValidation.java index 397fa3839..019a33543 100644 --- a/api/src/main/java/io/kafbat/ui/util/KafkaServicesValidation.java +++ b/api/src/main/java/io/kafbat/ui/util/KafkaServicesValidation.java @@ -62,8 +62,7 @@ public static Optional validateTruststore(TruststoreConfig truststoreCon public static Mono validateClusterConnection(String bootstrapServers, Properties clusterProps, - @Nullable - TruststoreConfig ssl) { + @Nullable TruststoreConfig ssl) { Properties properties = new Properties(); KafkaClientSslPropertiesUtil.addKafkaSslProperties(ssl, properties); properties.putAll(clusterProps); @@ -73,7 +72,7 @@ public static Mono validateClusterConnection(S properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5_000); properties.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 5_000); properties.put(AdminClientConfig.CLIENT_ID_CONFIG, "kui-admin-client-validation-" + System.currentTimeMillis()); - AdminClient adminClient = null; + AdminClient adminClient; try { adminClient = AdminClient.create(properties); } catch (Exception e) { diff --git a/api/src/main/java/io/kafbat/ui/util/ReactiveFailover.java b/api/src/main/java/io/kafbat/ui/util/ReactiveFailover.java index b46384d2e..872e9ddf9 100644 --- a/api/src/main/java/io/kafbat/ui/util/ReactiveFailover.java +++ b/api/src/main/java/io/kafbat/ui/util/ReactiveFailover.java @@ -59,8 +59,8 @@ public static ReactiveFailover create(List args, } private ReactiveFailover(List> publishers, - Predicate failoverExceptionsPredicate, - String noAvailablePublishersMsg) { + Predicate failoverExceptionsPredicate, + String noAvailablePublishersMsg) { Preconditions.checkArgument(!publishers.isEmpty()); this.publishers = publishers; this.failoverExceptionsPredicate = failoverExceptionsPredicate; diff --git a/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java b/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java index 52b6913f6..de23d40cd 100644 --- a/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java +++ b/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonAvroConversion.java @@ -49,7 +49,7 @@ public class JsonAvroConversion { // converts json into Object that is expected input for KafkaAvroSerializer // (with AVRO_USE_LOGICAL_TYPE_CONVERTERS flat enabled!) public static Object convertJsonToAvro(String jsonString, Schema avroSchema) { - JsonNode rootNode = null; + JsonNode rootNode; try { rootNode = MAPPER.readTree(jsonString); } catch (JsonProcessingException e) { @@ -221,9 +221,7 @@ public static JsonNode convertAvroToJson(Object obj, Schema avroSchema) { list.forEach(e -> node.add(convertAvroToJson(e, avroSchema.getElementType()))); yield node; } - case ENUM -> { - yield new TextNode(obj.toString()); - } + case ENUM -> new TextNode(obj.toString()); case UNION -> { ObjectNode node = MAPPER.createObjectNode(); int unionIdx = AvroData.getGenericData().resolveUnion(avroSchema, obj); @@ -343,9 +341,7 @@ enum LogicalTypeConversion { assertJsonType(node, JsonNodeType.STRING); return java.util.UUID.fromString(node.asText()); }, - (obj, schema) -> { - return new TextNode(obj.toString()); - }, + (obj, schema) -> new TextNode(obj.toString()), new SimpleFieldSchema( new SimpleJsonType( JsonType.Type.STRING, @@ -363,9 +359,7 @@ enum LogicalTypeConversion { "node '%s' can't be converted to decimal logical type" .formatted(node)); }, - (obj, schema) -> { - return new DecimalNode((BigDecimal) obj); - }, + (obj, schema) -> new DecimalNode((BigDecimal) obj), new SimpleFieldSchema(new SimpleJsonType(JsonType.Type.NUMBER)) ), @@ -381,9 +375,7 @@ enum LogicalTypeConversion { .formatted(node)); } }, - (obj, schema) -> { - return new TextNode(obj.toString()); - }, + (obj, schema) -> new TextNode(obj.toString()), new SimpleFieldSchema( new SimpleJsonType( JsonType.Type.STRING, @@ -402,9 +394,7 @@ enum LogicalTypeConversion { .formatted(node)); } }, - (obj, schema) -> { - return new TextNode(obj.toString()); - }, + (obj, schema) -> new TextNode(obj.toString()), new SimpleFieldSchema( new SimpleJsonType( JsonType.Type.STRING, @@ -423,9 +413,7 @@ enum LogicalTypeConversion { .formatted(node)); } }, - (obj, schema) -> { - return new TextNode(obj.toString()); - }, + (obj, schema) -> new TextNode(obj.toString()), new SimpleFieldSchema( new SimpleJsonType( JsonType.Type.STRING, @@ -444,9 +432,7 @@ enum LogicalTypeConversion { .formatted(node)); } }, - (obj, schema) -> { - return new TextNode(obj.toString()); - }, + (obj, schema) -> new TextNode(obj.toString()), new SimpleFieldSchema( new SimpleJsonType( JsonType.Type.STRING, @@ -469,9 +455,7 @@ enum LogicalTypeConversion { .formatted(node)); } }, - (obj, schema) -> { - return new TextNode(obj.toString()); - }, + (obj, schema) -> new TextNode(obj.toString()), new SimpleFieldSchema( new SimpleJsonType( JsonType.Type.STRING, @@ -487,9 +471,7 @@ enum LogicalTypeConversion { Instant instant = (Instant) TIMESTAMP_MILLIS.jsonToAvroConversion.apply(node, schema); return LocalDateTime.ofInstant(instant, ZoneOffset.UTC); }, - (obj, schema) -> { - return new TextNode(obj.toString()); - }, + (obj, schema) -> new TextNode(obj.toString()), new SimpleFieldSchema( new SimpleJsonType( JsonType.Type.STRING, @@ -504,9 +486,7 @@ enum LogicalTypeConversion { Instant instant = (Instant) TIMESTAMP_MICROS.jsonToAvroConversion.apply(node, schema); return LocalDateTime.ofInstant(instant, ZoneOffset.UTC); }, - (obj, schema) -> { - return new TextNode(obj.toString()); - }, + (obj, schema) -> new TextNode(obj.toString()), new SimpleFieldSchema( new SimpleJsonType( JsonType.Type.STRING, diff --git a/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonSchema.java b/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonSchema.java index 090010dac..491acebc7 100644 --- a/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonSchema.java +++ b/api/src/main/java/io/kafbat/ui/util/jsonschema/JsonSchema.java @@ -9,7 +9,6 @@ import java.util.stream.Collectors; import lombok.Builder; import lombok.Data; -import lombok.SneakyThrows; import reactor.util.function.Tuple2; import reactor.util.function.Tuples; @@ -59,12 +58,4 @@ public String toJson() { } return objectNode.toString(); } - - @SneakyThrows - public static JsonSchema stringSchema() { - return JsonSchema.builder() - .id(new URI("http://unknown.unknown")) - .type(new SimpleJsonType(JsonType.Type.STRING)) - .build(); - } } From 6a22b4a868d014bc8095b50498ba1d57e4bc646f Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Tue, 11 Feb 2025 19:11:20 +0500 Subject: [PATCH 4/6] BE: RBAC: Add integration tests for AD auth (#726) --- .../ui/ActiveDirectoryIntegrationTest.java | 120 ++++++++++++++++++ .../container/ActiveDirectoryContainer.java | 79 ++++++++++++ .../test/resources/application-rbac-ad.yml | 23 ++++ 3 files changed, 222 insertions(+) create mode 100644 api/src/test/java/io/kafbat/ui/ActiveDirectoryIntegrationTest.java create mode 100644 api/src/test/java/io/kafbat/ui/container/ActiveDirectoryContainer.java create mode 100644 api/src/test/resources/application-rbac-ad.yml diff --git a/api/src/test/java/io/kafbat/ui/ActiveDirectoryIntegrationTest.java b/api/src/test/java/io/kafbat/ui/ActiveDirectoryIntegrationTest.java new file mode 100644 index 000000000..80c3abe33 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/ActiveDirectoryIntegrationTest.java @@ -0,0 +1,120 @@ +package io.kafbat.ui; + +import static io.kafbat.ui.AbstractIntegrationTest.LOCAL; +import static io.kafbat.ui.container.ActiveDirectoryContainer.DOMAIN; +import static io.kafbat.ui.container.ActiveDirectoryContainer.EMPTY_PERMISSIONS_USER; +import static io.kafbat.ui.container.ActiveDirectoryContainer.FIRST_USER_WITH_GROUP; +import static io.kafbat.ui.container.ActiveDirectoryContainer.PASSWORD; +import static io.kafbat.ui.container.ActiveDirectoryContainer.SECOND_USER_WITH_GROUP; +import static io.kafbat.ui.container.ActiveDirectoryContainer.USER_WITHOUT_GROUP; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import io.kafbat.ui.container.ActiveDirectoryContainer; +import io.kafbat.ui.model.AuthenticationInfoDTO; +import io.kafbat.ui.model.ResourceTypeDTO; +import io.kafbat.ui.model.UserPermissionDTO; +import java.util.List; +import java.util.Objects; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.reactive.AutoConfigureWebTestClient; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.context.ApplicationContextInitializer; +import org.springframework.context.ConfigurableApplicationContext; +import org.springframework.http.MediaType; +import org.springframework.test.context.ActiveProfiles; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.springframework.web.reactive.function.BodyInserters; + +@SpringBootTest +@ActiveProfiles("rbac-ad") +@AutoConfigureWebTestClient(timeout = "60000") +@ContextConfiguration(initializers = {ActiveDirectoryIntegrationTest.Initializer.class}) +public class ActiveDirectoryIntegrationTest { + private static final String SESSION = "SESSION"; + + private static final ActiveDirectoryContainer ACTIVE_DIRECTORY = new ActiveDirectoryContainer(); + + @Autowired + private WebTestClient webTestClient; + + @BeforeAll + public static void setup() { + ACTIVE_DIRECTORY.start(); + } + + @AfterAll + public static void shutdown() { + ACTIVE_DIRECTORY.stop(); + } + + @Test + public void testUserPermissions() { + AuthenticationInfoDTO info = authenticationInfo(FIRST_USER_WITH_GROUP); + + assertNotNull(info); + assertTrue(info.getRbacEnabled()); + + List permissions = info.getUserInfo().getPermissions(); + + assertFalse(permissions.isEmpty()); + assertTrue(permissions.stream().anyMatch(permission -> + permission.getClusters().contains(LOCAL) && permission.getResource() == ResourceTypeDTO.TOPIC)); + assertEquals(permissions, authenticationInfo(SECOND_USER_WITH_GROUP).getUserInfo().getPermissions()); + assertEquals(permissions, authenticationInfo(USER_WITHOUT_GROUP).getUserInfo().getPermissions()); + } + + @Test + public void testEmptyPermissions() { + assertTrue(Objects.requireNonNull(authenticationInfo(EMPTY_PERMISSIONS_USER)) + .getUserInfo() + .getPermissions() + .isEmpty() + ); + } + + private String session(String name) { + return Objects.requireNonNull( + webTestClient + .post() + .uri("/login") + .contentType(MediaType.APPLICATION_FORM_URLENCODED) + .body(BodyInserters.fromFormData("username", name).with("password", PASSWORD)) + .exchange() + .expectStatus() + .isFound() + .returnResult(String.class) + .getResponseCookies() + .getFirst(SESSION)) + .getValue(); + } + + private AuthenticationInfoDTO authenticationInfo(String name) { + return webTestClient + .get() + .uri("/api/authorization") + .cookie(SESSION, session(name)) + .exchange() + .expectStatus() + .isOk() + .returnResult(AuthenticationInfoDTO.class) + .getResponseBody() + .blockFirst(); + } + + public static class Initializer implements ApplicationContextInitializer { + @Override + public void initialize(@NotNull ConfigurableApplicationContext context) { + System.setProperty("spring.ldap.urls", ACTIVE_DIRECTORY.getLdapUrl()); + System.setProperty("oauth2.ldap.activeDirectory", "true"); + System.setProperty("oauth2.ldap.activeDirectory.domain", DOMAIN); + } + } +} diff --git a/api/src/test/java/io/kafbat/ui/container/ActiveDirectoryContainer.java b/api/src/test/java/io/kafbat/ui/container/ActiveDirectoryContainer.java new file mode 100644 index 000000000..55bc3a186 --- /dev/null +++ b/api/src/test/java/io/kafbat/ui/container/ActiveDirectoryContainer.java @@ -0,0 +1,79 @@ +package io.kafbat.ui.container; + +import com.github.dockerjava.api.command.InspectContainerResponse; +import java.io.IOException; +import lombok.extern.slf4j.Slf4j; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.utility.DockerImageName; + +@Slf4j +public class ActiveDirectoryContainer extends GenericContainer { + public static final String DOMAIN = "corp.kafbat.io"; + public static final String PASSWORD = "StrongPassword123"; + public static final String FIRST_USER_WITH_GROUP = "JohnDoe"; + public static final String SECOND_USER_WITH_GROUP = "JohnWick"; + public static final String USER_WITHOUT_GROUP = "JackSmith"; + public static final String EMPTY_PERMISSIONS_USER = "JohnJames"; + + private static final String DOMAIN_DC = "dc=corp,dc=kafbat,dc=io"; + private static final String GROUP = "group"; + private static final String FIRST_GROUP = "firstGroup"; + private static final String SECOND_GROUP = "secondGroup"; + private static final String DOMAIN_EMAIL = "kafbat.io"; + private static final String SAMBA_TOOL = "samba-tool"; + private static final int LDAP_PORT = 389; + private static final DockerImageName IMAGE_NAME = DockerImageName.parse("nowsci/samba-domain:latest"); + + public ActiveDirectoryContainer() { + super(IMAGE_NAME); + + withExposedPorts(LDAP_PORT); + + withEnv("DOMAIN", DOMAIN); + withEnv("DOMAIN_DC", DOMAIN_DC); + withEnv("DOMAIN_EMAIL", DOMAIN_EMAIL); + withEnv("DOMAINPASS", PASSWORD); + withEnv("NOCOMPLEXITY", "true"); + withEnv("INSECURELDAP", "true"); + + withPrivilegedMode(true); + } + + protected void containerIsStarted(InspectContainerResponse containerInfo) { + createUser(EMPTY_PERMISSIONS_USER); + createUser(USER_WITHOUT_GROUP); + createUser(FIRST_USER_WITH_GROUP); + createUser(SECOND_USER_WITH_GROUP); + + exec(SAMBA_TOOL, GROUP, "add", FIRST_GROUP); + exec(SAMBA_TOOL, GROUP, "add", SECOND_GROUP); + exec(SAMBA_TOOL, GROUP, "addmembers", FIRST_GROUP, FIRST_USER_WITH_GROUP); + exec(SAMBA_TOOL, GROUP, "addmembers", SECOND_GROUP, SECOND_USER_WITH_GROUP); + } + + public String getLdapUrl() { + return String.format("ldap://%s:%s", getHost(), getMappedPort(LDAP_PORT)); + } + + private void createUser(String name) { + exec(SAMBA_TOOL, "user", "create", name, PASSWORD, "--mail-address", name + '@' + DOMAIN_EMAIL); + exec(SAMBA_TOOL, "user", "setexpiry", name, "--noexpiry"); + } + + private void exec(String... cmd) { + ExecResult result; + try { + result = execInContainer(cmd); + } catch (IOException | InterruptedException e) { + throw new RuntimeException(e); + } + + if (result.getStdout() != null && !result.getStdout().isEmpty()) { + log.info("Output: {}", result.getStdout()); + } + + if (result.getExitCode() != 0) { + throw new IllegalStateException(result.toString()); + } + } +} diff --git a/api/src/test/resources/application-rbac-ad.yml b/api/src/test/resources/application-rbac-ad.yml new file mode 100644 index 000000000..3b97d185f --- /dev/null +++ b/api/src/test/resources/application-rbac-ad.yml @@ -0,0 +1,23 @@ +auth: + type: LDAP +rbac: + roles: + - name: "roleName" + clusters: + - local + subjects: + - provider: ldap_ad + type: group + value: firstGroup + - provider: ldap_ad + type: group + value: secondGroup + - provider: ldap_ad + type: user + value: JackSmith + permissions: + - resource: applicationconfig + actions: all + - resource: topic + value: ".*" + actions: all From 2601a9a8461b12f2ab13873a4694be4b21be1d79 Mon Sep 17 00:00:00 2001 From: Dmitry Werner Date: Wed, 12 Feb 2025 12:29:51 +0500 Subject: [PATCH 5/6] BE: Chore: Remove unused classes (#838) --- .../condition/ActiveDirectoryCondition.java | 21 -------- .../io/kafbat/ui/model/BrokerMetrics.java | 11 ---- .../ui/model/InternalClusterMetrics.java | 54 ------------------- 3 files changed, 86 deletions(-) delete mode 100644 api/src/main/java/io/kafbat/ui/config/auth/condition/ActiveDirectoryCondition.java delete mode 100644 api/src/main/java/io/kafbat/ui/model/BrokerMetrics.java delete mode 100644 api/src/main/java/io/kafbat/ui/model/InternalClusterMetrics.java diff --git a/api/src/main/java/io/kafbat/ui/config/auth/condition/ActiveDirectoryCondition.java b/api/src/main/java/io/kafbat/ui/config/auth/condition/ActiveDirectoryCondition.java deleted file mode 100644 index 944eff0d3..000000000 --- a/api/src/main/java/io/kafbat/ui/config/auth/condition/ActiveDirectoryCondition.java +++ /dev/null @@ -1,21 +0,0 @@ -package io.kafbat.ui.config.auth.condition; - -import org.springframework.boot.autoconfigure.condition.AllNestedConditions; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; - -public class ActiveDirectoryCondition extends AllNestedConditions { - - public ActiveDirectoryCondition() { - super(ConfigurationPhase.PARSE_CONFIGURATION); - } - - @ConditionalOnProperty(value = "auth.type", havingValue = "LDAP") - public static class OnAuthType { - - } - - @ConditionalOnProperty(value = "${oauth2.ldap.activeDirectory}:false", havingValue = "true", matchIfMissing = false) - public static class OnActiveDirectory { - - } -} diff --git a/api/src/main/java/io/kafbat/ui/model/BrokerMetrics.java b/api/src/main/java/io/kafbat/ui/model/BrokerMetrics.java deleted file mode 100644 index dbd57c9c1..000000000 --- a/api/src/main/java/io/kafbat/ui/model/BrokerMetrics.java +++ /dev/null @@ -1,11 +0,0 @@ -package io.kafbat.ui.model; - -import java.util.List; -import lombok.Builder; -import lombok.Data; - -@Data -@Builder(toBuilder = true) -public class BrokerMetrics { - private final List metrics; -} diff --git a/api/src/main/java/io/kafbat/ui/model/InternalClusterMetrics.java b/api/src/main/java/io/kafbat/ui/model/InternalClusterMetrics.java deleted file mode 100644 index 6c04fadeb..000000000 --- a/api/src/main/java/io/kafbat/ui/model/InternalClusterMetrics.java +++ /dev/null @@ -1,54 +0,0 @@ -package io.kafbat.ui.model; - -import java.math.BigDecimal; -import java.util.List; -import java.util.Map; -import javax.annotation.Nullable; -import lombok.Builder; -import lombok.Data; - -@Data -@Builder(toBuilder = true) -public class InternalClusterMetrics { - - public static InternalClusterMetrics empty() { - return InternalClusterMetrics.builder() - .brokers(List.of()) - .topics(Map.of()) - .status(ServerStatusDTO.OFFLINE) - .internalBrokerMetrics(Map.of()) - .metrics(List.of()) - .version("unknown") - .build(); - } - - private final String version; - - private final ServerStatusDTO status; - private final Throwable lastKafkaException; - - private final int brokerCount; - private final int activeControllers; - private final List brokers; - - private final int topicCount; - private final Map topics; - - // partitions stats - private final int underReplicatedPartitionCount; - private final int onlinePartitionCount; - private final int offlinePartitionCount; - private final int inSyncReplicasCount; - private final int outOfSyncReplicasCount; - - // log dir stats - @Nullable // will be null if log dir collection disabled - private final Map internalBrokerDiskUsage; - - // metrics from metrics collector - private final BigDecimal bytesInPerSec; - private final BigDecimal bytesOutPerSec; - private final Map internalBrokerMetrics; - private final List metrics; - -} From a05709fe6241896dbf4af2879800e44572ce93be Mon Sep 17 00:00:00 2001 From: Renat Kalimulin <103274228+Nilumilak@users.noreply.github.com> Date: Wed, 12 Feb 2025 11:43:09 +0300 Subject: [PATCH 6/6] FE: Topics: Save field previews into local storage (#449) --- .../Topics/Topic/Messages/MessagesTable.tsx | 61 ++++++++++++++++--- .../Messages/__test__/MessagesTable.spec.tsx | 40 ++++++++++++ frontend/src/lib/hooks/useLocalStorage.ts | 9 ++- 3 files changed, 98 insertions(+), 12 deletions(-) diff --git a/frontend/src/components/Topics/Topic/Messages/MessagesTable.tsx b/frontend/src/components/Topics/Topic/Messages/MessagesTable.tsx index 813cabcfa..1751a0e56 100644 --- a/frontend/src/components/Topics/Topic/Messages/MessagesTable.tsx +++ b/frontend/src/components/Topics/Topic/Messages/MessagesTable.tsx @@ -2,31 +2,78 @@ import PageLoader from 'components/common/PageLoader/PageLoader'; import { Table } from 'components/common/table/Table/Table.styled'; import TableHeaderCell from 'components/common/table/TableHeaderCell/TableHeaderCell'; import { TopicMessage } from 'generated-sources'; -import React, { useState } from 'react'; +import React, { useCallback, useEffect, useState } from 'react'; import { Button } from 'components/common/Button/Button'; import * as S from 'components/common/NewTable/Table.styled'; import { usePaginateTopics, useIsLiveMode } from 'lib/hooks/useMessagesFilters'; import { useMessageFiltersStore } from 'lib/hooks/useMessageFiltersStore'; +import useAppParams from 'lib/hooks/useAppParams'; +import { RouteParamsClusterTopic } from 'lib/paths'; +import { useLocalStorage } from 'lib/hooks/useLocalStorage'; -import PreviewModal from './PreviewModal'; import Message, { PreviewFilter } from './Message'; +import PreviewModal from './PreviewModal'; export interface MessagesTableProps { messages: TopicMessage[]; isFetching: boolean; } +interface MessagePreviewProps { + [key: string]: { + keyFilters: PreviewFilter[]; + contentFilters: PreviewFilter[]; + }; +} + const MessagesTable: React.FC = ({ messages, isFetching, }) => { const paginate = usePaginateTopics(); - const [previewFor, setPreviewFor] = useState(null); - + const [previewFor, setPreviewFor] = useState<'key' | 'content' | null>(null); const [keyFilters, setKeyFilters] = useState([]); const [contentFilters, setContentFilters] = useState([]); const nextCursor = useMessageFiltersStore((state) => state.nextCursor); const isLive = useIsLiveMode(); + const { topicName } = useAppParams(); + const [messagesPreview, setMessagesPreview] = + useLocalStorage('message-preview', { + [topicName]: { + keyFilters: [], + contentFilters: [], + }, + }); + + useEffect(() => { + setKeyFilters(messagesPreview[topicName]?.keyFilters || []); + setContentFilters(messagesPreview[topicName]?.contentFilters || []); + }, []); + + const setFilters = useCallback( + (payload: PreviewFilter[]) => { + if (previewFor === 'key') { + setKeyFilters(payload); + setMessagesPreview({ + ...messagesPreview, + [topicName]: { + ...messagesPreview[topicName], + keyFilters: payload, + }, + }); + } else { + setContentFilters(payload); + setMessagesPreview({ + ...messagesPreview, + [topicName]: { + ...messagesPreview[topicName], + contentFilters: payload, + }, + }); + } + }, + [previewFor, messagesPreview, topicName] + ); return (
@@ -34,11 +81,7 @@ const MessagesTable: React.FC = ({ setPreviewFor(null)} - setFilters={(payload: PreviewFilter[]) => - previewFor === 'key' - ? setKeyFilters(payload) - : setContentFilters(payload) - } + setFilters={setFilters} /> )} diff --git a/frontend/src/components/Topics/Topic/Messages/__test__/MessagesTable.spec.tsx b/frontend/src/components/Topics/Topic/Messages/__test__/MessagesTable.spec.tsx index 808dde9e4..77f31cebc 100644 --- a/frontend/src/components/Topics/Topic/Messages/__test__/MessagesTable.spec.tsx +++ b/frontend/src/components/Topics/Topic/Messages/__test__/MessagesTable.spec.tsx @@ -7,6 +7,8 @@ import MessagesTable, { } from 'components/Topics/Topic/Messages/MessagesTable'; import { TopicMessage, TopicMessageTimestampTypeEnum } from 'generated-sources'; import { useIsLiveMode } from 'lib/hooks/useMessagesFilters'; +import useAppParams from 'lib/hooks/useAppParams'; +import { LOCAL_STORAGE_KEY_PREFIX } from 'lib/constants'; export const topicMessagePayload: TopicMessage = { partition: 29, @@ -33,8 +35,16 @@ jest.mock('lib/hooks/useMessagesFilters', () => ({ usePaginateTopics: jest.fn(), })); +jest.mock('lib/hooks/useAppParams', () => ({ + __esModule: true, + default: jest.fn(), +})); + describe('MessagesTable', () => { const renderComponent = (props?: Partial) => { + (useAppParams as jest.Mock).mockImplementation(() => ({ + topicName: 'testTopic', + })); return render( ); @@ -99,4 +109,34 @@ describe('MessagesTable', () => { } }); }); + + describe('should save messages preview into localstorage', () => { + beforeEach(() => { + renderComponent({ messages: mockTopicsMessages, isFetching: false }); + }); + + it('should save messages preview into localstorage', async () => { + const previewButtons = screen.getAllByText('Preview'); + await userEvent.click(previewButtons[0]); + await userEvent.type(screen.getByPlaceholderText('Field'), 'test1'); + await userEvent.type(screen.getByPlaceholderText('Json Path'), 'test2'); + await userEvent.click(screen.getByText('Save')); + await userEvent.click(previewButtons[1]); + await userEvent.type(screen.getByPlaceholderText('Field'), 'test3'); + await userEvent.type(screen.getByPlaceholderText('Json Path'), 'test4'); + await userEvent.click(screen.getByText('Save')); + expect( + global.localStorage.getItem( + `${LOCAL_STORAGE_KEY_PREFIX}-message-preview` + ) + ).toEqual( + JSON.stringify({ + testTopic: { + keyFilters: [{ field: 'test1', path: 'test2' }], + contentFilters: [{ field: 'test3', path: 'test4' }], + }, + }) + ); + }); + }); }); diff --git a/frontend/src/lib/hooks/useLocalStorage.ts b/frontend/src/lib/hooks/useLocalStorage.ts index d8945620d..65215fd2c 100644 --- a/frontend/src/lib/hooks/useLocalStorage.ts +++ b/frontend/src/lib/hooks/useLocalStorage.ts @@ -1,9 +1,12 @@ import { LOCAL_STORAGE_KEY_PREFIX } from 'lib/constants'; -import { useState, useEffect } from 'react'; +import { useState, useEffect, Dispatch, SetStateAction } from 'react'; -export const useLocalStorage = (featureKey: string, defaultValue: string) => { +export const useLocalStorage = ( + featureKey: string, + defaultValue: T +): [T, Dispatch>] => { const key = `${LOCAL_STORAGE_KEY_PREFIX}-${featureKey}`; - const [value, setValue] = useState(() => { + const [value, setValue] = useState(() => { const saved = localStorage.getItem(key); if (saved !== null) {