diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java index 1e391636815..d24044717f3 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/controller/TopicsController.java @@ -22,6 +22,7 @@ import com.provectus.kafka.ui.model.TopicCreationDTO; import com.provectus.kafka.ui.model.TopicDTO; import com.provectus.kafka.ui.model.TopicDetailsDTO; +import com.provectus.kafka.ui.model.TopicProducerStateDTO; import com.provectus.kafka.ui.model.TopicUpdateDTO; import com.provectus.kafka.ui.model.TopicsResponseDTO; import com.provectus.kafka.ui.model.rbac.AccessContext; @@ -327,6 +328,34 @@ public Mono> getTopicAnalysis(String clusterNam .doOnEach(sig -> audit(context, sig)); } + @Override + public Mono>> getActiveProducerStates(String clusterName, + String topicName, + ServerWebExchange exchange) { + var context = AccessContext.builder() + .cluster(clusterName) + .topic(topicName) + .topicActions(VIEW) + .operationName("getActiveProducerStates") + .build(); + + Comparator ordering = + Comparator.comparingInt(TopicProducerStateDTO::getPartition) + .thenComparing(Comparator.comparing(TopicProducerStateDTO::getProducerId).reversed()); + + Flux states = topicsService.getActiveProducersState(getCluster(clusterName), topicName) + .flatMapMany(statesMap -> + Flux.fromStream( + statesMap.entrySet().stream() + .flatMap(e -> e.getValue().stream().map(p -> clusterMapper.map(e.getKey().partition(), p))) + .sorted(ordering))); + + return validateAccess(context) + .thenReturn(states) + .map(ResponseEntity::ok) + .doOnEach(sig -> audit(context, sig)); + } + private Comparator getComparatorForTopic( TopicColumnsToSortDTO orderBy) { var defaultComparator = Comparator.comparing(InternalTopic::getName); diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java index 9317ac7b9c3..7fb7bc5c48e 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/mapper/ClusterMapper.java @@ -30,10 +30,12 @@ import com.provectus.kafka.ui.model.TopicConfigDTO; import com.provectus.kafka.ui.model.TopicDTO; import com.provectus.kafka.ui.model.TopicDetailsDTO; +import com.provectus.kafka.ui.model.TopicProducerStateDTO; import com.provectus.kafka.ui.service.metrics.RawMetric; import java.util.List; import java.util.Map; import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.admin.ProducerState; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclOperation; @@ -117,6 +119,17 @@ default BrokerDiskUsageDTO map(Integer id, InternalBrokerDiskUsage internalBroke return brokerDiskUsage; } + default TopicProducerStateDTO map(int partition, ProducerState state) { + return new TopicProducerStateDTO() + .partition(partition) + .producerId(state.producerId()) + .producerEpoch(state.producerEpoch()) + .lastSequence(state.lastSequence()) + .lastTimestampMs(state.lastTimestamp()) + .coordinatorEpoch(state.coordinatorEpoch().stream().boxed().findAny().orElse(null)) + .currentTransactionStartOffset(state.currentTransactionStartOffset().stream().boxed().findAny().orElse(null)); + } + static KafkaAclDTO.OperationEnum mapAclOperation(AclOperation operation) { return switch (operation) { case ALL -> KafkaAclDTO.OperationEnum.ALL; diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java index 9de908efa7f..6defb074237 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/ReactiveAdminClient.java @@ -31,6 +31,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import javax.annotation.Nullable; import lombok.AccessLevel; @@ -55,6 +56,7 @@ import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.ProducerState; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.clients.consumer.OffsetAndMetadata; @@ -658,6 +660,21 @@ public Mono alterReplicaLogDirs(Map replica return toMono(client.alterReplicaLogDirs(replicaAssignment).all()); } + // returns tp -> list of active producer's states (if any) + public Mono>> getActiveProducersState(String topic) { + return describeTopic(topic) + .map(td -> client.describeProducers( + IntStream.range(0, td.partitions().size()) + .mapToObj(i -> new TopicPartition(topic, i)) + .toList() + ).all() + ) + .flatMap(ReactiveAdminClient::toMono) + .map(map -> map.entrySet().stream() + .filter(e -> !e.getValue().activeProducers().isEmpty()) // skipping partitions without producers + .collect(toMap(Map.Entry::getKey, e -> e.getValue().activeProducers()))); + } + private Mono incrementalAlterConfig(String topicName, List currentConfigs, Map newConfigs) { diff --git a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java index 3ddcd6f82e0..8e976906d56 100644 --- a/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java +++ b/kafka-ui-api/src/main/java/com/provectus/kafka/ui/service/TopicsService.java @@ -39,6 +39,7 @@ import org.apache.kafka.clients.admin.NewPartitionReassignment; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.ProducerState; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; @@ -459,6 +460,11 @@ public Mono> getTopicsForPagination(KafkaCluster cluster) { ); } + public Mono>> getActiveProducersState(KafkaCluster cluster, String topic) { + return adminClientService.get(cluster) + .flatMap(ac -> ac.getActiveProducersState(topic)); + } + private Mono> filterExisting(KafkaCluster cluster, Collection topics) { return adminClientService.get(cluster) .flatMap(ac -> ac.listTopics(true)) diff --git a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml index 306c3fd2ddd..ae51d31568f 100644 --- a/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml +++ b/kafka-ui-contract/src/main/resources/swagger/kafka-ui-api.yaml @@ -763,6 +763,33 @@ paths: 404: description: Not found + /api/clusters/{clusterName}/topics/{topicName}/activeproducers: + get: + tags: + - Topics + summary: get producer states for topic + operationId: getActiveProducerStates + parameters: + - name: clusterName + in: path + required: true + schema: + type: string + - name: topicName + in: path + required: true + schema: + type: string + responses: + 200: + description: OK + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/TopicProducerState' + /api/clusters/{clusterName}/topics/{topicName}/consumer-groups: get: tags: @@ -2619,6 +2646,31 @@ components: - PROTOBUF - UNKNOWN + TopicProducerState: + type: object + properties: + partition: + type: integer + format: int32 + producerId: + type: integer + format: int64 + producerEpoch: + type: integer + format: int32 + lastSequence: + type: integer + format: int32 + lastTimestampMs: + type: integer + format: int64 + coordinatorEpoch: + type: integer + format: int32 + currentTransactionStartOffset: + type: integer + format: int64 + ConsumerGroup: discriminator: propertyName: inherit