diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java index d9269ec83b179..e47443e4e8f95 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/MetaStoreImpl.java @@ -398,10 +398,13 @@ private static ManagedLedgerInfo updateMLInfoTimestamp(ManagedLedgerInfo info) { } private static MetaStoreException getException(Throwable t) { - if (t.getCause() instanceof MetadataStoreException.BadVersionException) { - return new ManagedLedgerException.BadVersionException(t.getMessage()); + Throwable actEx = FutureUtil.unwrapCompletionException(t); + if (actEx instanceof MetadataStoreException.BadVersionException badVersionException) { + return new ManagedLedgerException.BadVersionException(badVersionException); + } else if (actEx instanceof MetaStoreException metaStoreException){ + return metaStoreException; } else { - return new MetaStoreException(t); + return new MetaStoreException(actEx); } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 5d0d837bf9ac4..773fc7cab4a54 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -1437,6 +1437,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, // Mark the progress of close to prevent close calling concurrently. this.closeFutures = new CloseFutures(new CompletableFuture(), new CompletableFuture()); + AtomicBoolean alreadyUnFenced = new AtomicBoolean(); CompletableFuture res = getBrokerService().getPulsar().getPulsarResources().getNamespaceResources() .getPartitionedTopicResources().runWithMarkDeleteAsync(TopicName.get(topic), () -> { CompletableFuture deleteFuture = new CompletableFuture<>(); @@ -1453,6 +1454,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, closeClientFuture.complete(null); }, getOrderedExecutor()).exceptionally(ex -> { log.error("[{}] Error closing clients", topic, ex); + alreadyUnFenced.set(true); unfenceTopicToResume(); closeClientFuture.completeExceptionally(ex); return null; @@ -1475,6 +1477,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, .whenComplete((v, ex) -> { if (ex != null) { log.error("[{}] Error deleting topic", topic, ex); + alreadyUnFenced.set(true); unfenceTopicToResume(); deleteFuture.completeExceptionally(ex); } else { @@ -1484,6 +1487,7 @@ private CompletableFuture delete(boolean failIfHasSubscriptions, FutureUtil.waitForAll(subsDeleteFutures).whenComplete((f, e) -> { if (e != null) { log.error("[{}] Error deleting topic", topic, e); + alreadyUnFenced.set(true); unfenceTopicToResume(); deleteFuture.completeExceptionally(e); } else { @@ -1514,6 +1518,7 @@ public void deleteLedgerComplete(Object ctx) { } else { log.error("[{}] Error deleting topic", topic, exception); + alreadyUnFenced.set(true); unfenceTopicToResume(); deleteFuture.completeExceptionally( new PersistenceException(exception)); @@ -1526,6 +1531,7 @@ public void deleteLedgerComplete(Object ctx) { } }); }).exceptionally(ex->{ + alreadyUnFenced.set(true); unfenceTopicToResume(); deleteFuture.completeExceptionally( new TopicBusyException("Failed to close clients before deleting topic.", @@ -1537,7 +1543,9 @@ public void deleteLedgerComplete(Object ctx) { }).whenComplete((value, ex) -> { if (ex != null) { log.error("[{}] Error deleting topic", topic, ex); - unfenceTopicToResume(); + if (!alreadyUnFenced.get()) { + unfenceTopicToResume(); + } } });