diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/WatchListDeletable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/WatchListDeletable.java index aef6b9181d5..d13b3355ee4 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/WatchListDeletable.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/WatchListDeletable.java @@ -16,13 +16,30 @@ package io.fabric8.kubernetes.client.dsl; import io.fabric8.kubernetes.client.GracePeriodConfigurable; +import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import io.fabric8.kubernetes.client.PropagationPolicyConfigurable; import io.fabric8.kubernetes.client.Watcher; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; + public interface WatchListDeletable extends Watchable>, Versionable>, Listable, Deletable, GracePeriodConfigurable, PropagationPolicyConfigurable>, StatusUpdatable, Informable { + + /** + * Wait for the entire list available at this context to satisfy the given predicate. + * @param condition the {@link Predicate} to test + * @param pollInterval the polling interval to use, a value less than 1 indicates that every event should be tested + * @param timeout after which a {@link KubernetesClientTimeoutException} is thrown + * @param timeUnit unit of the pollInterval and timeout + * @return the list of items after the condition is met + * @throws KubernetesClientTimeoutException if time runs out before the condition is met + */ + List waitUntilListCondition(Predicate> condition, long pollInterval, long timeout, TimeUnit timeUnit); + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java index e660a0e087c..1a668c0a649 100755 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java @@ -18,8 +18,6 @@ import io.fabric8.kubernetes.api.model.ObjectReference; import io.fabric8.kubernetes.client.dsl.WritableOperation; import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.builder.TypedVisitor; import io.fabric8.kubernetes.api.builder.Visitor; @@ -77,8 +75,9 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; @@ -94,7 +93,6 @@ public class BaseOperation, Resource { - private static final Logger LOG = LoggerFactory.getLogger(BaseOperation.class); private static final String READ_ONLY_UPDATE_EXCEPTION_MESSAGE = "Cannot update read-only resources"; private static final String READ_ONLY_EDIT_EXCEPTION_MESSAGE = "Cannot edit read-only resources"; @@ -977,57 +975,85 @@ public T waitUntilReady(long amount, TimeUnit timeUnit) { @Override public T waitUntilCondition(Predicate condition, long amount, TimeUnit timeUnit) { - CompletableFuture future = new CompletableFuture<>(); - // tests the condition, trapping any exceptions - Consumer tester = obj -> { - try { - if (condition.test(obj)) { - future.complete(obj); - } - } catch (Exception e) { - future.completeExceptionally(e); - } - }; - // start an informer that supplies the tester with events and empty list handling - try (SharedIndexInformer informer = this.createInformer(0, l -> { - if (l.getItems().isEmpty()) { - tester.accept(null); - } - }, new ResourceEventHandler() { - - @Override - public void onAdd(T obj) { - tester.accept(obj); - } - - @Override - public void onUpdate(T oldObj, T newObj) { - tester.accept(newObj); - } - - @Override - public void onDelete(T obj, boolean deletedFinalStateUnknown) { - tester.accept(null); + CompletableFuture futureCondition = listCondition(l -> { + if (l.isEmpty()) { + return condition.test(null); } - })) { - // prevent unnecessary watches - future.whenComplete((r,t) -> informer.stop()); - informer.run(); - return future.get(amount, timeUnit); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw KubernetesClientException.launderThrowable(e.getCause()); - } catch (ExecutionException e) { - throw KubernetesClientException.launderThrowable(e.getCause()); - } catch (TimeoutException e) { + return condition.test(l.get(0)); + }, 0, timeUnit).thenApply(l -> l.isEmpty() ? null : l.get(0)); + + if (!Utils.waitUntilReady(futureCondition, amount, timeUnit, true)) { T i = getItem(); if (i != null) { throw new KubernetesClientTimeoutException(i, amount, timeUnit); } throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit); } + return futureCondition.getNow(null); + } + + @Override + public List waitUntilListCondition(Predicate> condition, long pollInterval, long amount, TimeUnit timeUnit) { + CompletableFuture> listCondition = listCondition(condition, pollInterval, timeUnit); + if (!Utils.waitUntilReady(listCondition, amount, timeUnit, true)) { + // it would be good to report the last state of the cache here + throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit); + } + return listCondition.getNow(null); } + CompletableFuture> listCondition(Predicate> condition, long pollInterval, TimeUnit timeUnit) { + CompletableFuture> future = new CompletableFuture<>(); + AtomicReference tester = new AtomicReference<>(); + + // create an informer that supplies the tester with events and empty list handling + SharedIndexInformer informer = this.createInformer(0, l -> { + if (l.getItems().isEmpty()) { + tester.get().run(); + } + }); + + // prevent unnecessary watches and handle closure + future.whenComplete((r, t) -> informer.stop()); + + // use the cache to evaluate the list predicate, trapping any exceptions + Runnable test = () -> { + try { + // could skip if lastResourceVersion has not changed + List list = informer.getStore().list(); + if (condition.test(list)) { + future.complete(list); + } + } catch (Exception e) { + future.completeExceptionally(e); + } + }; + tester.set(test); + + // if there is a no poll, re-evaluate on every event + if (pollInterval <= 0) { + informer.addEventHandler(new ResourceEventHandler() { + @Override + public void onAdd(T obj) { + test.run(); + } + @Override + public void onDelete(T obj, boolean deletedFinalStateUnknown) { + test.run(); + } + @Override + public void onUpdate(T oldObj, T newObj) { + test.run(); + } + }); + } else { + ScheduledFuture pollingFuture = Utils.scheduleAtFixedRate(Utils.getCommonExecutorSerive(), test, pollInterval, pollInterval, timeUnit); + future.whenComplete((r, t) -> pollingFuture.cancel(true)); + } + informer.run(); + return future; + } + public void setType(Class type) { this.type = type; } @@ -1054,14 +1080,17 @@ public Informable withIndexers(Map>> indexer @Override public SharedIndexInformer inform(ResourceEventHandler handler, long resync) { - DefaultSharedIndexInformer result = createInformer(resync, null, handler); + DefaultSharedIndexInformer result = createInformer(resync, null); + if (handler != null) { + result.addEventHandler(handler); + } // synchronous start list/watch must succeed in the calling thread // initial add events will be processed in the calling thread as well result.run(); return result; } - private DefaultSharedIndexInformer createInformer(long resync, Consumer onList, ResourceEventHandler handler) { + private DefaultSharedIndexInformer createInformer(long resync, Consumer onList) { T i = getItem(); String name = (Utils.isNotNullOrEmpty(getName()) || i != null) ? checkName(i) : null; @@ -1088,9 +1117,6 @@ public Watch watch(ListOptions params, String namespace, OperationContext contex if (indexers != null) { informer.addIndexers(indexers); } - if (handler != null) { - informer.addEventHandler(handler); - } return informer; } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java index ccddd1926ce..9898e01a907 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java @@ -107,7 +107,7 @@ private void cleanUp() { } public void waitUntilReady() { - if (!Utils.waitUntilReady(startedFuture, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) { + if (!Utils.waitUntilReady(startedFuture, config.getRequestTimeout(), TimeUnit.MILLISECONDS, true)) { if (LOGGER.isDebugEnabled()) { LOGGER.warn("Log watch request has not been opened within: " + config.getRequestTimeout() + " millis."); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java index d4a563bfb02..a5d39032112 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableListImpl.java @@ -91,7 +91,7 @@ public List waitUntilReady(final long amount, final TimeUnit timeUn public List waitUntilCondition(Predicate condition, long amount, TimeUnit timeUnit) { - List items = acceptVisitors(asHasMetadata(item, true), visitors); + ArrayList items = acceptVisitors(asHasMetadata(item, true), visitors); if (items.isEmpty()) { return Collections.emptyList(); } @@ -115,10 +115,8 @@ public List waitUntilCondition(Predicate condition, final List results = new ArrayList<>(); final List itemsWithConditionNotMatched = new ArrayList<>(); - // Iterate over the items because we don't know what kind of List it is. - // But the futures use an ArrayList, so accessing by index is efficient. - int i = 0; - for (final HasMetadata meta : items) { + for (int i = 0; i < items.size(); i++) { + final HasMetadata meta = items.get(i); try { CompletableFuture future = futures.get(i); // just get each result as the timeout is enforced below @@ -130,7 +128,6 @@ public List waitUntilCondition(Predicate condition, Thread.currentThread().interrupt(); throw KubernetesClientException.launderThrowable(e); } - ++i; } if (!itemsWithConditionNotMatched.isEmpty()) { @@ -293,8 +290,8 @@ public List get() { } } - private static List acceptVisitors(List list, List visitors) { - List result = new ArrayList<>(); + private static ArrayList acceptVisitors(List list, List visitors) { + ArrayList result = new ArrayList<>(); for (HasMetadata item : list) { ResourceHandler h = handlerOf(item); VisitableBuilder builder = h.edit(item); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java index 601a7b7d6b9..1832515381d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java @@ -27,14 +27,13 @@ import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.internal.RollingOperationContext; import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil; -import io.fabric8.kubernetes.client.utils.PodOperationUtil; -import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.OkHttpClient; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.api.model.apps.DeploymentList; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,11 +47,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Consumer; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; @@ -263,24 +258,18 @@ public ImageEditReplacePatchable withTimeout(long timeout, TimeUnit * Lets wait until there are enough Ready pods of the given Deployment */ private void waitUntilDeploymentIsScaled(final int count) { - final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); final String namespace = checkNamespace(getItem()); - final Runnable deploymentPoller = () -> { - try { - Deployment deployment = get(); - //If the deployment is gone, we shouldn't wait. + try { + waitUntilCondition(deployment -> { if (deployment == null) { if (count == 0) { - scaledFuture.complete(null); - return; - } else { - scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); - return; + return true; } + throw new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available."); } replicasRef.set(deployment.getStatus().getReplicas()); @@ -288,29 +277,18 @@ private void waitUntilDeploymentIsScaled(final int count) { long generation = deployment.getMetadata().getGeneration() != null ? deployment.getMetadata().getGeneration() : 0; long observedGeneration = deployment.getStatus() != null && deployment.getStatus().getObservedGeneration() != null ? deployment.getStatus().getObservedGeneration() : -1; if (observedGeneration >= generation && Objects.equals(deployment.getSpec().getReplicas(), currentReplicas)) { - scaledFuture.complete(null); - } else { - LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...", - deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace); - } - } catch (Throwable t) { - LOG.error("Error while waiting for Deployment to be scaled.", t); - } - }; - - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); - try { - if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { - LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.", + return true; + } + LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...", + deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace); + return false; + + }, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS); + LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.", replicasRef.get(), count, name, namespace); - } else { - LOG.error("{}/{} pod(s) ready for Deployment: {} in namespace: {} after waiting for {} seconds so giving up", + } catch (KubernetesClientTimeoutException e) { + LOG.error("{}/{} pod(s) ready for Deployment: {} in namespace: {} after waiting for {} seconds so giving up", replicasRef.get(), count, name, namespace, TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout())); - } - } finally { - poller.cancel(true); - executor.shutdown(); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentRollingUpdater.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentRollingUpdater.java index f4aa30037e8..7da6b20d2a9 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentRollingUpdater.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentRollingUpdater.java @@ -16,6 +16,7 @@ package io.fabric8.kubernetes.client.dsl.internal.apps.v1; import okhttp3.OkHttpClient; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; @@ -23,6 +24,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.dsl.Operation; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; +import io.fabric8.kubernetes.client.dsl.WatchListDeletable; class DeploymentRollingUpdater extends RollingUpdater { @@ -50,8 +52,8 @@ protected Deployment createClone(Deployment obj, String newName, String newDeplo } @Override - protected PodList listSelectedPods(Deployment obj) { - return listSelectedPods(obj.getSpec().getSelector()); + protected WatchListDeletable selectedPodLister(Deployment obj) { + return selectedPodLister(obj.getSpec().getSelector()); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/ReplicaSetRollingUpdater.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/ReplicaSetRollingUpdater.java index e1ddcfd14f1..2489aba2ddd 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/ReplicaSetRollingUpdater.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/ReplicaSetRollingUpdater.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.dsl.internal.apps.v1; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.apps.ReplicaSet; import io.fabric8.kubernetes.api.model.apps.ReplicaSetBuilder; @@ -22,6 +23,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.dsl.Operation; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; +import io.fabric8.kubernetes.client.dsl.WatchListDeletable; import okhttp3.OkHttpClient; class ReplicaSetRollingUpdater extends RollingUpdater { @@ -50,8 +52,8 @@ protected ReplicaSet createClone(ReplicaSet obj, String newName, String newDeplo } @Override - protected PodList listSelectedPods(ReplicaSet obj) { - return listSelectedPods(obj.getSpec().getSelector()); + protected WatchListDeletable selectedPodLister(ReplicaSet obj) { + return selectedPodLister(obj.getSpec().getSelector()); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java index 3e9b4a7e320..940d5c1302c 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java @@ -19,21 +19,17 @@ import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.autoscaling.v1.Scale; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; import io.fabric8.kubernetes.client.dsl.base.HasMetadataOperation; import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.base.PatchType; import io.fabric8.kubernetes.client.dsl.internal.RollingOperationContext; -import io.fabric8.kubernetes.client.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; @@ -74,8 +70,10 @@ public T scale(int count) { public T scale(int count, boolean wait) { T res = withReplicas(count); if (wait) { - waitUntilScaled(count); - res = getMandatory(); + res = waitUntilScaled(count); + if (res == null) { + res = getMandatory(); + } } return res; } @@ -93,53 +91,37 @@ public Scale scale(Scale scaleParam) { /** * Let's wait until there are enough Ready pods. */ - private void waitUntilScaled(final int count) { - final CompletableFuture scaledFuture = new CompletableFuture<>(); + private T waitUntilScaled(final int count) { final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); final String namespace = checkNamespace(getItem()); - final Runnable tPoller = () -> { - try { - T t = get(); - //If the resource is gone, we shouldn't wait. - if (t == null) { - if (count == 0) { - scaledFuture.complete(null); - } else { - scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for " + getType().getSimpleName() + ": " +name + " in namespace: " + namespace + " to scale. Resource is no longer available.")); - } - return; - } - int currentReplicas = getCurrentReplicas(t); - int desiredReplicas = getDesiredReplicas(t); - replicasRef.set(currentReplicas); - long generation = t.getMetadata().getGeneration() != null ? t.getMetadata().getGeneration() : -1; - long observedGeneration = getObservedGeneration(t); - if (observedGeneration >= generation && Objects.equals(desiredReplicas, currentReplicas)) { - scaledFuture.complete(null); - } - Log.debug("Only {}/{} replicas scheduled for {}: {} in namespace: {} seconds so waiting...", - currentReplicas, desiredReplicas, t.getKind(), t.getMetadata().getName(), namespace); - } catch (Throwable t) { - Log.error("Error while waiting for resource to be scaled.", t); - } - }; - - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture poller = executor.scheduleWithFixedDelay(tPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); try { - if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { - Log.debug("{}/{} pod(s) ready for {}: {} in namespace: {}.", - replicasRef.get(), count, getType().getSimpleName(), name, namespace); - } else { - Log.error("{}/{} pod(s) ready for {}: {} in namespace: {} after waiting for {} seconds so giving up", + return waitUntilCondition(t -> { + //If the resource is gone, we shouldn't wait. + if (t == null) { + if (count == 0) { + return true; + } + throw new IllegalStateException("Can't wait for " + getType().getSimpleName() + ": " +name + " in namespace: " + namespace + " to scale. Resource is no longer available."); + } + int currentReplicas = getCurrentReplicas(t); + int desiredReplicas = getDesiredReplicas(t); + replicasRef.set(currentReplicas); + long generation = t.getMetadata().getGeneration() != null ? t.getMetadata().getGeneration() : -1; + long observedGeneration = getObservedGeneration(t); + if (observedGeneration >= generation && Objects.equals(desiredReplicas, currentReplicas)) { + return true; + } + Log.debug("Only {}/{} replicas scheduled for {}: {} in namespace: {} seconds so waiting...", + currentReplicas, desiredReplicas, t.getKind(), t.getMetadata().getName(), namespace); + return false; + }, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS); + } catch (KubernetesClientTimeoutException e) { + Log.error("{}/{} pod(s) ready for {}: {} in namespace: {} after waiting for {} seconds so giving up", replicasRef.get(), count, getType().getSimpleName(), name, namespace, TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout())); - } - } finally { - poller.cancel(true); - executor.shutdown(); + return null; } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollingUpdater.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollingUpdater.java index dd97abb5660..0ac24555710 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollingUpdater.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollingUpdater.java @@ -17,6 +17,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import io.fabric8.kubernetes.client.dsl.internal.core.v1.PodOperationsImpl; +import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.OkHttpClient; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.LabelSelector; @@ -26,9 +27,11 @@ import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable; import io.fabric8.kubernetes.client.dsl.Operation; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; +import io.fabric8.kubernetes.client.dsl.WatchListDeletable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,12 +42,8 @@ import java.time.format.DateTimeFormatter; import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -80,7 +79,7 @@ public RollingUpdater(OkHttpClient client, Config config, String namespace, long protected abstract T createClone(T obj, String newName, String newDeploymentHash); - protected abstract PodList listSelectedPods(T obj); + protected abstract WatchListDeletable selectedPodLister(T obj); protected abstract T updateDeploymentKey(String name, String hash); @@ -101,7 +100,7 @@ public T rollUpdate(T oldObj, T newObj) { String oldDeploymentHash = md5sum(oldObj); //Before we update the resource though we need to add the labels to the existing managed pods - PodList oldPods = listSelectedPods(oldObj); + PodList oldPods = selectedPodLister(oldObj).list(); for (Pod pod : oldPods.getItems()) { try { @@ -207,40 +206,27 @@ public static Map requestPayLoadForRolloutRestart() { * Lets wait until there are enough Ready pods of the given RC */ private void waitUntilPodsAreReady(final T obj, final String namespace, final int requiredPodCount) { - final CountDownLatch countDownLatch = new CountDownLatch(1); final AtomicInteger podCount = new AtomicInteger(0); - - final Runnable readyPodsPoller = () -> { - PodList podList = listSelectedPods(obj); - int count = 0; - List items = podList.getItems(); - for (Pod item : items) { - for (PodCondition c : item.getStatus().getConditions()) { - if (c.getType().equals("Ready") && c.getStatus().equals("True")) { - count++; + + ScheduledFuture logger = Utils.scheduleAtFixedRate(Utils.getCommonExecutorSerive(), + () -> LOG.debug("Only {}/{} pod(s) ready for {}: {} in namespace: {} seconds so waiting...", + podCount.get(), requiredPodCount, obj.getKind(), obj.getMetadata().getName(), namespace), + 0, loggingIntervalMillis, TimeUnit.MILLISECONDS); + try { + selectedPodLister(obj).waitUntilListCondition(items -> { + int count = 0; + for (Pod item : items) { + for (PodCondition c : item.getStatus().getConditions()) { + if (c.getType().equals("Ready") && c.getStatus().equals("True")) { + count++; + } } } - } - podCount.set(count); - if (count == requiredPodCount) { - countDownLatch.countDown(); - } - }; - - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture poller = executor.scheduleWithFixedDelay(readyPodsPoller, 0, 1, TimeUnit.SECONDS); - ScheduledFuture logger = executor.scheduleWithFixedDelay(() -> LOG.debug("Only {}/{} pod(s) ready for {}: {} in namespace: {} seconds so waiting...", - podCount.get(), requiredPodCount, obj.getKind(), obj.getMetadata().getName(), namespace), 0, loggingIntervalMillis, TimeUnit.MILLISECONDS); - try { - countDownLatch.await(rollingTimeoutMillis, TimeUnit.MILLISECONDS); - executor.shutdown(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - poller.cancel(true); - logger.cancel(true); - executor.shutdown(); - LOG.warn("Only {}/{} pod(s) ready for {}: {} in namespace: {} after waiting for {} seconds so giving up", - podCount.get(), requiredPodCount, obj.getKind(), obj.getMetadata().getName(), namespace, TimeUnit.MILLISECONDS.toSeconds(rollingTimeoutMillis)); + podCount.set(count); + return count == requiredPodCount; + }, 1000, rollingTimeoutMillis, TimeUnit.MILLISECONDS); + } finally { + logger.cancel(true); // since we're using the common pool, we must shutdown manually } } @@ -249,34 +235,15 @@ private void waitUntilPodsAreReady(final T obj, final String namespace, final in * Lets wait until the resource is actually deleted in the server */ private void waitUntilDeleted(final String namespace, final String name) { - final CountDownLatch countDownLatch = new CountDownLatch(1); - - final Runnable waitTillDeletedPoller = () -> { - try { - T res = resources().inNamespace(namespace).withName(name).get(); - if (res == null) { - countDownLatch.countDown(); - } - } catch (KubernetesClientException e) { - if (e.getCode() == 404) { - countDownLatch.countDown(); - } - } - }; - - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture poller = executor.scheduleWithFixedDelay(waitTillDeletedPoller, 0, 5, TimeUnit.SECONDS); - ScheduledFuture logger = executor.scheduleWithFixedDelay(() -> LOG.debug("Found resource {}/{} not yet deleted on server, so waiting...", namespace, name), 0, loggingIntervalMillis, TimeUnit.MILLISECONDS); + ScheduledFuture logger = Utils.scheduleAtFixedRate(Utils.getCommonExecutorSerive(), + () -> LOG.debug("Found resource {}/{} not yet deleted on server, so waiting...", namespace, name), + 0, loggingIntervalMillis, TimeUnit.MILLISECONDS); try { - countDownLatch.await(DEFAULT_SERVER_GC_WAIT_TIMEOUT, TimeUnit.MILLISECONDS); - executor.shutdown(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - poller.cancel(true); - logger.cancel(true); - executor.shutdown(); - LOG.warn("Still found deleted resource {} in namespace: {} after waiting for {} seconds so giving up", - name, namespace, TimeUnit.MILLISECONDS.toSeconds(DEFAULT_SERVER_GC_WAIT_TIMEOUT)); + resources().inNamespace(namespace) + .withName(name) + .waitUntilCondition(Objects::isNull, DEFAULT_SERVER_GC_WAIT_TIMEOUT, TimeUnit.MILLISECONDS); + } finally { + logger.cancel(true); // since we're using the common pool, we must shutdown manually } } @@ -292,8 +259,8 @@ protected Operation> pods() { return new PodOperationsImpl(client, config); } - protected PodList listSelectedPods(LabelSelector selector) { - return pods().inNamespace(namespace).withLabelSelector(selector).list(); + protected FilterWatchListDeletable selectedPodLister(LabelSelector selector) { + return pods().inNamespace(namespace).withLabelSelector(selector); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/StatefulSetRollingUpdater.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/StatefulSetRollingUpdater.java index 4614fadec97..8ba93f440a3 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/StatefulSetRollingUpdater.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/StatefulSetRollingUpdater.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.dsl.internal.apps.v1; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.apps.StatefulSet; import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder; @@ -22,6 +23,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.dsl.Operation; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; +import io.fabric8.kubernetes.client.dsl.WatchListDeletable; import okhttp3.OkHttpClient; class StatefulSetRollingUpdater extends RollingUpdater { @@ -50,8 +52,8 @@ protected StatefulSet createClone(StatefulSet obj, String newName, String newDep } @Override - protected PodList listSelectedPods(StatefulSet obj) { - return listSelectedPods(obj.getSpec().getSelector()); + protected WatchListDeletable selectedPodLister(StatefulSet obj) { + return selectedPodLister(obj.getSpec().getSelector()); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/batch/v1/JobOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/batch/v1/JobOperationsImpl.java index 43f263fde4b..58482e375f7 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/batch/v1/JobOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/batch/v1/JobOperationsImpl.java @@ -42,10 +42,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -126,41 +122,21 @@ public Job scale(int count, boolean wait) { * Lets wait until there are enough Ready pods of the given Job */ private void waitUntilJobIsScaled() { - final CountDownLatch countDownLatch = new CountDownLatch(1); - final AtomicReference atomicJob = new AtomicReference<>(); - final Runnable jobPoller = () -> { - try { - Job job = getMandatory(); - atomicJob.set(job); - Integer activeJobs = job.getStatus().getActive(); - if (activeJobs == null) { - activeJobs = 0; - } - if (Objects.equals(job.getSpec().getParallelism(), activeJobs)) { - countDownLatch.countDown(); - } else { - LOG.debug("Only {}/{} pods scheduled for Job: {} in namespace: {} seconds so waiting...", - job.getStatus().getActive(), job.getSpec().getParallelism(), job.getMetadata().getName(), namespace); - } - } catch (Throwable t) { - LOG.error("Error while waiting for Job to be scaled.", t); + waitUntilCondition(job -> { + atomicJob.set(job); + Integer activeJobs = job.getStatus().getActive(); + if (activeJobs == null) { + activeJobs = 0; } - }; - - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture poller = executor.scheduleWithFixedDelay(jobPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); - try { - countDownLatch.await(getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS); - executor.shutdown(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - poller.cancel(true); - executor.shutdown(); - LOG.error("Only {}/{} pod(s) ready for Job: {} in namespace: {} - giving up", - atomicJob.get().getStatus().getActive(), atomicJob.get().getSpec().getParallelism(), atomicJob.get().getMetadata().getName(), namespace); - } + if (Objects.equals(job.getSpec().getParallelism(), activeJobs)) { + return true; + } + LOG.debug("Only {}/{} pods scheduled for Job: {} in namespace: {} seconds so waiting...", + job.getStatus().getActive(), job.getSpec().getParallelism(), job.getMetadata().getName(), namespace); + return false; + }, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS); } public String getLog() { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ReplicationControllerRollingUpdater.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ReplicationControllerRollingUpdater.java index 5f40ff827f2..c36a1666da2 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ReplicationControllerRollingUpdater.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/core/v1/ReplicationControllerRollingUpdater.java @@ -17,6 +17,7 @@ import io.fabric8.kubernetes.client.dsl.internal.apps.v1.RollingUpdater; import okhttp3.OkHttpClient; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.ReplicationController; import io.fabric8.kubernetes.api.model.ReplicationControllerBuilder; @@ -24,6 +25,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.dsl.Operation; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; +import io.fabric8.kubernetes.client.dsl.WatchListDeletable; class ReplicationControllerRollingUpdater extends RollingUpdater { @@ -50,8 +52,8 @@ protected ReplicationController createClone(ReplicationController obj, String ne } @Override - protected PodList listSelectedPods(ReplicationController obj) { - return pods().inNamespace(namespace).withLabels(obj.getSpec().getSelector()).list(); + protected WatchListDeletable selectedPodLister(ReplicationController obj) { + return pods().inNamespace(namespace).withLabels(obj.getSpec().getSelector()); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java index 6441035352e..4aeece97427 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java @@ -27,6 +27,7 @@ import io.fabric8.kubernetes.api.model.extensions.ReplicaSetList; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import io.fabric8.kubernetes.client.dsl.ImageEditReplacePatchable; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; @@ -38,7 +39,6 @@ import io.fabric8.kubernetes.client.dsl.internal.apps.v1.RollableScalableResourceOperation; import io.fabric8.kubernetes.client.dsl.internal.apps.v1.RollingUpdater; import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil; -import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.OkHttpClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,11 +53,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Consumer; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.UnaryOperator; @@ -268,24 +264,19 @@ public ImageEditReplacePatchable withTimeout(long timeout, TimeUnit * Lets wait until there are enough Ready pods of the given Deployment */ private void waitUntilDeploymentIsScaled(final int count) { - final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); final String namespace = checkNamespace(getItem()); - final Runnable deploymentPoller = () -> { - try { - Deployment deployment = get(); - //If the deployment is gone, we shouldn't wait. + try { + waitUntilCondition(deployment -> { + // If the deployment is gone, we shouldn't wait. if (deployment == null) { if (count == 0) { - scaledFuture.complete(null); - return; - } else { - scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); - return; - } + return true; + } + throw new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available."); } replicasRef.set(deployment.getStatus().getReplicas()); @@ -293,29 +284,17 @@ private void waitUntilDeploymentIsScaled(final int count) { long generation = deployment.getMetadata().getGeneration() != null ? deployment.getMetadata().getGeneration() : 0; long observedGeneration = deployment.getStatus() != null && deployment.getStatus().getObservedGeneration() != null ? deployment.getStatus().getObservedGeneration() : -1; if (observedGeneration >= generation && Objects.equals(deployment.getSpec().getReplicas(), currentReplicas)) { - scaledFuture.complete(null); - } else { - LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...", - deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace); + return true; } - } catch (Exception t) { - LOG.error("Error while waiting for Deployment to be scaled.", t); - } - }; - - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); - try { - if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { - LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.", + LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...", + deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace); + return false; + }, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS); + LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.", replicasRef.get(), count, name, namespace); - } else { - LOG.error("{}/{} pod(s) ready for Deployment: {} in namespace: {} after waiting for {} seconds so giving up", + } catch (KubernetesClientTimeoutException e) { + LOG.error("{}/{} pod(s) ready for Deployment: {} in namespace: {} after waiting for {} seconds so giving up", replicasRef.get(), count, name, namespace, TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout())); - } - } finally { - poller.cancel(true); - executor.shutdown(); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentRollingUpdater.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentRollingUpdater.java index 5cd200c55ff..e7bea578bb0 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentRollingUpdater.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentRollingUpdater.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.dsl.internal.extensions.v1beta1; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.extensions.Deployment; import io.fabric8.kubernetes.api.model.extensions.DeploymentBuilder; @@ -22,6 +23,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.dsl.Operation; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; +import io.fabric8.kubernetes.client.dsl.WatchListDeletable; import io.fabric8.kubernetes.client.dsl.internal.apps.v1.RollingUpdater; import okhttp3.OkHttpClient; @@ -51,9 +53,10 @@ protected Deployment createClone(Deployment obj, String newName, String newDeplo } @Override - protected PodList listSelectedPods(Deployment obj) { - return listSelectedPods(obj.getSpec().getSelector()); + protected WatchListDeletable selectedPodLister(Deployment obj) { + return selectedPodLister(obj.getSpec().getSelector()); } + @Override protected Deployment updateDeploymentKey(String name, String hash) { Deployment old = resources().inNamespace(namespace).withName(name).get(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetRollingUpdater.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetRollingUpdater.java index 183911daaec..c5850e761ac 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetRollingUpdater.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/ReplicaSetRollingUpdater.java @@ -15,6 +15,7 @@ */ package io.fabric8.kubernetes.client.dsl.internal.extensions.v1beta1; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.extensions.ReplicaSet; import io.fabric8.kubernetes.api.model.extensions.ReplicaSetBuilder; @@ -22,6 +23,7 @@ import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.dsl.Operation; import io.fabric8.kubernetes.client.dsl.RollableScalableResource; +import io.fabric8.kubernetes.client.dsl.WatchListDeletable; import io.fabric8.kubernetes.client.dsl.internal.apps.v1.RollingUpdater; import okhttp3.OkHttpClient; @@ -51,8 +53,8 @@ protected ReplicaSet createClone(ReplicaSet obj, String newName, String newDeplo } @Override - protected PodList listSelectedPods(ReplicaSet obj) { - return listSelectedPods(obj.getSpec().getSelector()); + protected WatchListDeletable selectedPodLister(ReplicaSet obj) { + return selectedPodLister(obj.getSpec().getSelector()); } @Override diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java index 3acb87c655b..219342c773e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java @@ -147,10 +147,11 @@ public static String join(final Object[] array, final char separator) { * @param future The communication channel. * @param amount The amount of time to wait. * @param timeUnit The time unit. + * @param cancel true if the future should be cancelled * * @return a boolean value indicating resource is ready or not. */ - public static boolean waitUntilReady(Future future, long amount, TimeUnit timeUnit) { + public static boolean waitUntilReady(Future future, long amount, TimeUnit timeUnit, boolean cancel) { try { future.get(amount, timeUnit); return true; @@ -165,14 +166,19 @@ public static boolean waitUntilReady(Future future, long amount, TimeUnit tim throw KubernetesClientException.launderThrowable(t); } catch (Exception e) { throw KubernetesClientException.launderThrowable(e); + } finally { + if (cancel) { + future.cancel(true); + } } } /** - * Similar to {@link #waitUntilReady(Future, long, TimeUnit)}, but will always throw an exception if not ready + * Similar to {@link #waitUntilReady(Future, long, TimeUnit, boolean)}, but will always throw an exception if not ready. + * The future will be canceled as a side-effect of this call. */ public static void waitUntilReadyOrFail(Future future, long amount, TimeUnit timeUnit) { - if (!waitUntilReady(future, amount, timeUnit)) { + if (!waitUntilReady(future, amount, timeUnit, true)) { throw new KubernetesClientException("not ready after " + amount + " " + timeUnit); } } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ReplicaSetTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ReplicaSetTest.java index 3c1ecc35c2c..bb376182962 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ReplicaSetTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ReplicaSetTest.java @@ -17,6 +17,7 @@ package io.fabric8.kubernetes.client.mock; import io.fabric8.kubernetes.api.model.KubernetesListBuilder; +import io.fabric8.kubernetes.api.model.ListMetaBuilder; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.PodListBuilder; @@ -183,18 +184,27 @@ void testScaleAndWait() { .endStatus() .build()).once(); - server.expect().withPath("/apis/apps/v1/namespaces/test/replicasets/repl1").andReturn(200, new ReplicaSetBuilder() - .withNewMetadata() - .withName("repl1") - .withResourceVersion("1") - .endMetadata() - .withNewSpec() - .withReplicas(5) - .endSpec() - .withNewStatus() - .withReplicas(5) - .endStatus() - .build()).always(); + ReplicaSet scaled = new ReplicaSetBuilder() + .withNewMetadata() + .withName("repl1") + .withResourceVersion("1") + .endMetadata() + .withNewSpec() + .withReplicas(5) + .endSpec() + .withNewStatus() + .withReplicas(5) + .endStatus() + .build(); + // patch + server.expect().withPath("/apis/apps/v1/namespaces/test/replicasets/repl1").andReturn(200, scaled).once(); + + // list for waiting + server.expect() + .withPath("/apis/apps/v1/namespaces/test/replicasets?fieldSelector=metadata.name%3Drepl1&watch=false") + .andReturn(200, + new ReplicaSetListBuilder().withItems(scaled).withMetadata(new ListMetaBuilder().build()).build()) + .always(); ReplicaSet repl = client.apps().replicaSets().withName("repl1").scale(5, true); assertNotNull(repl); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ReplicationControllerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ReplicationControllerTest.java index d77803dbf2f..5d6ca7f8553 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ReplicationControllerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ReplicationControllerTest.java @@ -17,6 +17,7 @@ package io.fabric8.kubernetes.client.mock; import io.fabric8.kubernetes.api.model.KubernetesListBuilder; +import io.fabric8.kubernetes.api.model.ListMetaBuilder; import io.fabric8.kubernetes.api.model.PodBuilder; import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.api.model.PodListBuilder; @@ -183,18 +184,26 @@ void testScaleAndWait() { .endStatus() .build()).once(); - server.expect().withPath("/api/v1/namespaces/test/replicationcontrollers/repl1").andReturn(200, new ReplicationControllerBuilder() - .withNewMetadata() - .withName("repl1") - .withResourceVersion("1") - .endMetadata() - .withNewSpec() - .withReplicas(5) - .endSpec() - .withNewStatus() - .withReplicas(5) - .endStatus() - .build()).always(); + ReplicationController scaled = new ReplicationControllerBuilder() + .withNewMetadata() + .withName("repl1") + .withResourceVersion("1") + .endMetadata() + .withNewSpec() + .withReplicas(5) + .endSpec() + .withNewStatus() + .withReplicas(5) + .endStatus() + .build(); + server.expect().withPath("/api/v1/namespaces/test/replicationcontrollers/repl1").andReturn(200, scaled).once(); + + // list for waiting + server.expect() + .withPath("/api/v1/namespaces/test/replicationcontrollers?fieldSelector=metadata.name%3Drepl1&watch=false") + .andReturn(200, + new ReplicationControllerListBuilder().withItems(scaled).withMetadata(new ListMetaBuilder().build()).build()) + .always(); ReplicationController repl = client.replicationControllers().withName("repl1").scale(5, true); assertNotNull(repl); diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java index 76845069877..751aa9de2c5 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceListTest.java @@ -276,8 +276,8 @@ void testAllFailedWaitUntilCondition() { .anyMatch(c -> "True".equals(c.getStatus())); // The pods are never ready if you request them directly. - server.expect().get().withPath("/api/v1/namespaces/ns1/pods/pod1").andReturn(HTTP_OK, noReady1).once(); - server.expect().get().withPath("/api/v1/namespaces/ns1/pods/pod2").andReturn(HTTP_OK, noReady2).once(); + ResourceTest.list(server, noReady1); + ResourceTest.list(server, noReady2); Status gone = new StatusBuilder() .withCode(HTTP_GONE) diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/StatefulSetTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/StatefulSetTest.java index 5019f7d7993..92fc04ddb27 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/StatefulSetTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/StatefulSetTest.java @@ -17,6 +17,7 @@ package io.fabric8.kubernetes.client.mock; import io.fabric8.kubernetes.api.model.KubernetesListBuilder; +import io.fabric8.kubernetes.api.model.ListMetaBuilder; import io.fabric8.kubernetes.api.model.OwnerReferenceBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.PodBuilder; @@ -199,18 +200,26 @@ public void testScaleAndWait() { .endStatus() .build()).once(); - server.expect().withPath("/apis/apps/v1/namespaces/test/statefulsets/repl1").andReturn(200, new StatefulSetBuilder() - .withNewMetadata() - .withName("repl1") - .withResourceVersion("1") - .endMetadata() - .withNewSpec() - .withReplicas(5) - .endSpec() - .withNewStatus() - .withReplicas(5) - .endStatus() - .build()).always(); + StatefulSet scaled = new StatefulSetBuilder() + .withNewMetadata() + .withName("repl1") + .withResourceVersion("1") + .endMetadata() + .withNewSpec() + .withReplicas(5) + .endSpec() + .withNewStatus() + .withReplicas(5) + .endStatus() + .build(); + server.expect().withPath("/apis/apps/v1/namespaces/test/statefulsets/repl1").andReturn(200, scaled).once(); + + // list for waiting + server.expect() + .withPath("/apis/apps/v1/namespaces/test/statefulsets?fieldSelector=metadata.name%3Drepl1&watch=false") + .andReturn(200, + new StatefulSetListBuilder().withItems(scaled).withMetadata(new ListMetaBuilder().build()).build()) + .always(); StatefulSet repl = client.apps().statefulSets().withName("repl1").scale(5, true); assertNotNull(repl); diff --git a/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java b/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java index 8b4dab6e465..3e86dd9fbab 100644 --- a/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java +++ b/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java @@ -20,6 +20,7 @@ import io.fabric8.kubernetes.api.model.autoscaling.v1.Scale; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.KubernetesClientException; +import io.fabric8.kubernetes.client.KubernetesClientTimeoutException; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.PodResource; @@ -29,7 +30,6 @@ import io.fabric8.kubernetes.client.dsl.internal.RollingOperationContext; import io.fabric8.kubernetes.client.utils.PodOperationUtil; import io.fabric8.kubernetes.client.utils.URLUtils; -import io.fabric8.kubernetes.client.utils.Utils; import io.fabric8.openshift.api.model.DeploymentConfig; import io.fabric8.openshift.api.model.DeploymentConfigBuilder; import io.fabric8.openshift.api.model.DeploymentConfigList; @@ -51,10 +51,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; @@ -172,52 +168,39 @@ public Scale scale(Scale scale) { * Lets wait until there are enough Ready pods of the given Deployment */ private void waitUntilDeploymentConfigIsScaled(final int count) { - final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); final String namespace = checkNamespace(getItem()); - final Runnable deploymentPoller = () -> { - try { - DeploymentConfig deploymentConfig = get(); + try { + waitUntilCondition(deploymentConfig -> { //If the rs is gone, we shouldn't wait. if (deploymentConfig == null) { if (count == 0) { - scaledFuture.complete(null); - return; - } else { - scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for DeploymentConfig: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); - return; + return true; } + throw new IllegalStateException("Can't wait for DeploymentConfig: " + checkName(getItem()) + " in namespace: " + + checkName(getItem()) + " to scale. Resource is no longer available."); } replicasRef.set(deploymentConfig.getStatus().getReplicas()); int currentReplicas = deploymentConfig.getStatus().getReplicas() != null ? deploymentConfig.getStatus().getReplicas() : 0; - if (deploymentConfig.getStatus().getObservedGeneration() >= deploymentConfig.getMetadata().getGeneration() && Objects.equals(deploymentConfig.getSpec().getReplicas(), currentReplicas)) { - scaledFuture.complete(null); - } else { - LOG.debug("Only {}/{} pods scheduled for DeploymentConfig: {} in namespace: {} seconds so waiting...", - deploymentConfig.getStatus().getReplicas(), deploymentConfig.getSpec().getReplicas(), deploymentConfig.getMetadata().getName(), namespace); - } - } catch (Throwable t) { - LOG.error("Error while waiting for Deployment to be scaled.", t); - } - }; - - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); - try { - if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { - LOG.debug("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {}.", - replicasRef.get(), count, name, namespace); - } else { - LOG.error("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {} after waiting for {} seconds so giving up", - replicasRef.get(), count, name, namespace, TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout())); + if (deploymentConfig.getStatus().getObservedGeneration() >= deploymentConfig.getMetadata().getGeneration() + && Objects.equals(deploymentConfig.getSpec().getReplicas(), currentReplicas)) { + return true; } - } finally { - poller.cancel(true); - executor.shutdown(); - } + LOG.debug("Only {}/{} pods scheduled for DeploymentConfig: {} in namespace: {} seconds so waiting...", + deploymentConfig.getStatus().getReplicas(), deploymentConfig.getSpec().getReplicas(), + deploymentConfig.getMetadata().getName(), namespace); + return false; + }, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS); + LOG.debug("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {}.", + replicasRef.get(), count, name, namespace); + } catch (KubernetesClientTimeoutException e) { + LOG.error("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {} after waiting for {} seconds so giving up", + replicasRef.get(), count, name, namespace, + TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout())); + } } @Override