From 214292958964b77a421adbd8613c2b65ca806962 Mon Sep 17 00:00:00 2001 From: fengyubiao Date: Tue, 29 Oct 2024 16:01:57 +0800 Subject: [PATCH] [fix] [broker] Fix race-condition causing repeated delete topic (#23522) (cherry picked from commit 7b80f019fa86cf9e154e7dfcd3fd3dc1d036cbba) (cherry picked from commit eddf395631811a731fe9c0284b44fd2f6efd2026) --- .../apache/bookkeeper/mledger/impl/MetaStoreImpl.java | 9 ++++++--- .../broker/service/persistent/PersistentTopic.java | 10 +++++++++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index d9269ec83b179..e47443e4e8f95 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -398,10 +398,13 @@ private static ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) { } private static MetaStoreException getException(Throwable t) { - if (t.getCause() instanceof MetadataStoreException.BadVersionException) { - return new ManagedLedgerException.BadVersionException(t.getMessage()); + Throwable actEx = FutureUtil.unwrapCompletionException(t); + if (actEx instanceof MetadataStoreException.BadVersionException badVersionException) { + return new ManagedLedgerException.BadVersionException(badVersionException); + } else if (actEx instanceof MetaStoreException metaStoreException){ + return metaStoreException; } else { - return new MetaStoreException(t); + return new MetaStoreException(actEx); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 5d0d837bf9ac4..773fc7cab4a54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1437,6 +1437,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, // Mark the progress of close to prevent close calling concurrently. this.closeFutures = new CloseFutures(new CompletableFuture(), new CompletableFuture()); + AtomicBoolean alreadyUnFenced = new AtomicBoolean(); CompletableFuture res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources() .getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> { CompletableFuture deleteFuture = new CompletableFuture<>(); @@ -1453,6 +1454,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, closeClientFuture.complete(null); }, getOrderedExecutor()).exceptionally(ex -> { log.error("[{}] Error closing clients", topic, ex); + alreadyUnFenced.set(true); unfenceTopicToResume(); closeClientFuture.completeExceptionally(ex); return null; @@ -1475,6 +1477,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, .whenComplete((v, ex) -> { if (ex != null) { log.error("[{}] Error deleting topic", topic, ex); + alreadyUnFenced.set(true); unfenceTopicToResume(); deleteFuture.completeExceptionally(ex); } else { @@ -1484,6 +1487,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { if (e != null) { log.error("[{}] Error deleting topic", topic, e); + alreadyUnFenced.set(true); unfenceTopicToResume(); deleteFuture.completeExceptionally(e); } else { @@ -1514,6 +1518,7 @@ public void deleteLedgerComplete(Object ctx) { } else { log.error("[{}] Error deleting topic", topic, exception); + alreadyUnFenced.set(true); unfenceTopicToResume(); deleteFuture.completeExceptionally( new PersistenceException(exception)); @@ -1526,6 +1531,7 @@ public void deleteLedgerComplete(Object ctx) { } }); }).exceptionally(ex->{ + alreadyUnFenced.set(true); unfenceTopicToResume(); deleteFuture.completeExceptionally( new TopicBusyException("Failed to close clients before deleting topic.", @@ -1537,7 +1543,9 @@ public void deleteLedgerComplete(Object ctx) { }).whenComplete((value, ex) -> { if (ex != null) { log.error("[{}] Error deleting topic", topic, ex); - unfenceTopicToResume(); + if (!alreadyUnFenced.get()) { + unfenceTopicToResume(); + } } });