Skip to content

Commit

Permalink
fix #3285: adding waiting for list contexts
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed Jul 6, 2021
1 parent 189adf4 commit 7c011b2
Show file tree
Hide file tree
Showing 21 changed files with 325 additions and 373 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,30 @@
package io.fabric8.kubernetes.client.dsl;

import io.fabric8.kubernetes.client.GracePeriodConfigurable;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import io.fabric8.kubernetes.client.PropagationPolicyConfigurable;
import io.fabric8.kubernetes.client.Watcher;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

public interface WatchListDeletable<T, L> extends Watchable<Watcher<T>>, Versionable<WatchAndWaitable<T>>, Listable<L>, Deletable,
GracePeriodConfigurable<Deletable>,
PropagationPolicyConfigurable<EditReplacePatchDeletable<T>>,
StatusUpdatable<T>,
Informable<T>
{

/**
* Wait for the entire list available at this context to satisfy the given predicate.
* @param condition the {@link Predicate} to test
* @param pollInterval the polling interval to use, a value less than 1 indicates that every event should be tested
* @param timeout after which a {@link KubernetesClientTimeoutException} is thrown
* @param timeUnit unit of the pollInterval and timeout
* @return the list of items after the condition is met
* @throws KubernetesClientTimeoutException if time runs out before the condition is met
*/
List<T> waitUntilListCondition(Predicate<List<T>> condition, long pollInterval, long timeout, TimeUnit timeUnit);

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
import io.fabric8.kubernetes.api.model.ObjectReference;
import io.fabric8.kubernetes.client.dsl.WritableOperation;
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.builder.TypedVisitor;
import io.fabric8.kubernetes.api.builder.Visitor;
Expand Down Expand Up @@ -77,8 +75,9 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand All @@ -94,7 +93,6 @@ public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceLi
MixedOperation<T, L, R>,
Resource<T> {

private static final Logger LOG = LoggerFactory.getLogger(BaseOperation.class);
private static final String READ_ONLY_UPDATE_EXCEPTION_MESSAGE = "Cannot update read-only resources";
private static final String READ_ONLY_EDIT_EXCEPTION_MESSAGE = "Cannot edit read-only resources";

Expand Down Expand Up @@ -977,57 +975,85 @@ public T waitUntilReady(long amount, TimeUnit timeUnit) {

@Override
public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUnit) {
CompletableFuture<T> future = new CompletableFuture<>();
// tests the condition, trapping any exceptions
Consumer<T> tester = obj -> {
try {
if (condition.test(obj)) {
future.complete(obj);
}
} catch (Exception e) {
future.completeExceptionally(e);
}
};
// start an informer that supplies the tester with events and empty list handling
try (SharedIndexInformer<T> informer = this.createInformer(0, l -> {
if (l.getItems().isEmpty()) {
tester.accept(null);
}
}, new ResourceEventHandler<T>() {

@Override
public void onAdd(T obj) {
tester.accept(obj);
}

@Override
public void onUpdate(T oldObj, T newObj) {
tester.accept(newObj);
}

@Override
public void onDelete(T obj, boolean deletedFinalStateUnknown) {
tester.accept(null);
CompletableFuture<T> futureCondition = listCondition(l -> {
if (l.isEmpty()) {
return condition.test(null);
}
})) {
// prevent unnecessary watches
future.whenComplete((r,t) -> informer.stop());
informer.run();
return future.get(amount, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e.getCause());
} catch (ExecutionException e) {
throw KubernetesClientException.launderThrowable(e.getCause());
} catch (TimeoutException e) {
return condition.test(l.get(0));
}, 0, timeUnit).thenApply(l -> l.isEmpty() ? null : l.get(0));

if (!Utils.waitUntilReady(futureCondition, amount, timeUnit, true)) {
T i = getItem();
if (i != null) {
throw new KubernetesClientTimeoutException(i, amount, timeUnit);
}
throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit);
}
return futureCondition.getNow(null);
}

@Override
public List<T> waitUntilListCondition(Predicate<List<T>> condition, long pollInterval, long amount, TimeUnit timeUnit) {
CompletableFuture<List<T>> listCondition = listCondition(condition, pollInterval, timeUnit);
if (!Utils.waitUntilReady(listCondition, amount, timeUnit, true)) {
// it would be good to report the last state of the cache here
throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit);
}
return listCondition.getNow(null);
}

CompletableFuture<List<T>> listCondition(Predicate<List<T>> condition, long pollInterval, TimeUnit timeUnit) {
CompletableFuture<List<T>> future = new CompletableFuture<>();
AtomicReference<Runnable> tester = new AtomicReference<>();

// create an informer that supplies the tester with events and empty list handling
SharedIndexInformer<T> informer = this.createInformer(0, l -> {
if (l.getItems().isEmpty()) {
tester.get().run();
}
});

// prevent unnecessary watches and handle closure
future.whenComplete((r, t) -> informer.stop());

// use the cache to evaluate the list predicate, trapping any exceptions
Runnable test = () -> {
try {
// could skip if lastResourceVersion has not changed
List<T> list = informer.getStore().list();
if (condition.test(list)) {
future.complete(list);
}
} catch (Exception e) {
future.completeExceptionally(e);
}
};
tester.set(test);

// if there is a no poll, re-evaluate on every event
if (pollInterval <= 0) {
informer.addEventHandler(new ResourceEventHandler<T>() {
@Override
public void onAdd(T obj) {
test.run();
}
@Override
public void onDelete(T obj, boolean deletedFinalStateUnknown) {
test.run();
}
@Override
public void onUpdate(T oldObj, T newObj) {
test.run();
}
});
} else {
ScheduledFuture<?> pollingFuture = Utils.scheduleAtFixedRate(Utils.getCommonExecutorSerive(), test, pollInterval, pollInterval, timeUnit);
future.whenComplete((r, t) -> pollingFuture.cancel(true));
}
informer.run();
return future;
}

public void setType(Class<T> type) {
this.type = type;
}
Expand All @@ -1054,14 +1080,17 @@ public Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexer

@Override
public SharedIndexInformer<T> inform(ResourceEventHandler<T> handler, long resync) {
DefaultSharedIndexInformer<T, L> result = createInformer(resync, null, handler);
DefaultSharedIndexInformer<T, L> result = createInformer(resync, null);
if (handler != null) {
result.addEventHandler(handler);
}
// synchronous start list/watch must succeed in the calling thread
// initial add events will be processed in the calling thread as well
result.run();
return result;
}

private DefaultSharedIndexInformer<T, L> createInformer(long resync, Consumer<L> onList, ResourceEventHandler<T> handler) {
private DefaultSharedIndexInformer<T, L> createInformer(long resync, Consumer<L> onList) {
T i = getItem();
String name = (Utils.isNotNullOrEmpty(getName()) || i != null) ? checkName(i) : null;

Expand All @@ -1088,9 +1117,6 @@ public Watch watch(ListOptions params, String namespace, OperationContext contex
if (indexers != null) {
informer.addIndexers(indexers);
}
if (handler != null) {
informer.addEventHandler(handler);
}
return informer;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ private void cleanUp() {
}

public void waitUntilReady() {
if (!Utils.waitUntilReady(startedFuture, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) {
if (!Utils.waitUntilReady(startedFuture, config.getRequestTimeout(), TimeUnit.MILLISECONDS, true)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.warn("Log watch request has not been opened within: " + config.getRequestTimeout() + " millis.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public List<HasMetadata> waitUntilReady(final long amount, final TimeUnit timeUn
public List<HasMetadata> waitUntilCondition(Predicate<HasMetadata> condition,
long amount,
TimeUnit timeUnit) {
List<HasMetadata> items = acceptVisitors(asHasMetadata(item, true), visitors);
ArrayList<HasMetadata> items = acceptVisitors(asHasMetadata(item, true), visitors);
if (items.isEmpty()) {
return Collections.emptyList();
}
Expand All @@ -115,10 +115,8 @@ public List<HasMetadata> waitUntilCondition(Predicate<HasMetadata> condition,
final List<HasMetadata> results = new ArrayList<>();
final List<HasMetadata> itemsWithConditionNotMatched = new ArrayList<>();

// Iterate over the items because we don't know what kind of List it is.
// But the futures use an ArrayList, so accessing by index is efficient.
int i = 0;
for (final HasMetadata meta : items) {
for (int i = 0; i < items.size(); i++) {
final HasMetadata meta = items.get(i);
try {
CompletableFuture<HasMetadata> future = futures.get(i);
// just get each result as the timeout is enforced below
Expand All @@ -130,7 +128,6 @@ public List<HasMetadata> waitUntilCondition(Predicate<HasMetadata> condition,
Thread.currentThread().interrupt();
throw KubernetesClientException.launderThrowable(e);
}
++i;
}

if (!itemsWithConditionNotMatched.isEmpty()) {
Expand Down Expand Up @@ -293,8 +290,8 @@ public List<HasMetadata> get() {
}
}

private static List<HasMetadata> acceptVisitors(List<HasMetadata> list, List<Visitor> visitors) {
List<HasMetadata> result = new ArrayList<>();
private static ArrayList<HasMetadata> acceptVisitors(List<HasMetadata> list, List<Visitor> visitors) {
ArrayList<HasMetadata> result = new ArrayList<>();
for (HasMetadata item : list) {
ResourceHandler<HasMetadata, ?> h = handlerOf(item);
VisitableBuilder<HasMetadata, ?> builder = h.edit(item);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,13 @@
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
import io.fabric8.kubernetes.client.dsl.internal.RollingOperationContext;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.PodOperationUtil;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.OkHttpClient;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentList;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.KubernetesClientTimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,11 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.UnaryOperator;
Expand Down Expand Up @@ -263,54 +258,37 @@ public ImageEditReplacePatchable<Deployment> withTimeout(long timeout, TimeUnit
* Lets wait until there are enough Ready pods of the given Deployment
*/
private void waitUntilDeploymentIsScaled(final int count) {
final CompletableFuture<Void> scaledFuture = new CompletableFuture<>();
final AtomicReference<Integer> replicasRef = new AtomicReference<>(0);

final String name = checkName(getItem());
final String namespace = checkNamespace(getItem());

final Runnable deploymentPoller = () -> {
try {
Deployment deployment = get();
//If the deployment is gone, we shouldn't wait.
try {
waitUntilCondition(deployment -> {
if (deployment == null) {
if (count == 0) {
scaledFuture.complete(null);
return;
} else {
scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available."));
return;
return true;
}
throw new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.");
}

replicasRef.set(deployment.getStatus().getReplicas());
int currentReplicas = deployment.getStatus().getReplicas() != null ? deployment.getStatus().getReplicas() : 0;
long generation = deployment.getMetadata().getGeneration() != null ? deployment.getMetadata().getGeneration() : 0;
long observedGeneration = deployment.getStatus() != null && deployment.getStatus().getObservedGeneration() != null ? deployment.getStatus().getObservedGeneration() : -1;
if (observedGeneration >= generation && Objects.equals(deployment.getSpec().getReplicas(), currentReplicas)) {
scaledFuture.complete(null);
} else {
LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...",
deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace);
}
} catch (Throwable t) {
LOG.error("Error while waiting for Deployment to be scaled.", t);
}
};

ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS);
try {
if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) {
LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.",
return true;
}
LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...",
deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace);
return false;

}, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS);
LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.",
replicasRef.get(), count, name, namespace);
} else {
LOG.error("{}/{} pod(s) ready for Deployment: {} in namespace: {} after waiting for {} seconds so giving up",
} catch (KubernetesClientTimeoutException e) {
LOG.error("{}/{} pod(s) ready for Deployment: {} in namespace: {} after waiting for {} seconds so giving up",
replicasRef.get(), count, name, namespace, TimeUnit.MILLISECONDS.toSeconds(getConfig().getScaleTimeout()));
}
} finally {
poller.cancel(true);
executor.shutdown();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@
package io.fabric8.kubernetes.client.dsl.internal.apps.v1;

import okhttp3.OkHttpClient;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder;
import io.fabric8.kubernetes.api.model.apps.DeploymentList;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.dsl.Operation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.dsl.WatchListDeletable;

class DeploymentRollingUpdater extends RollingUpdater<Deployment, DeploymentList> {

Expand Down Expand Up @@ -50,8 +52,8 @@ protected Deployment createClone(Deployment obj, String newName, String newDeplo
}

@Override
protected PodList listSelectedPods(Deployment obj) {
return listSelectedPods(obj.getSpec().getSelector());
protected WatchListDeletable<Pod, PodList> selectedPodLister(Deployment obj) {
return selectedPodLister(obj.getSpec().getSelector());
}

@Override
Expand Down
Loading

0 comments on commit 7c011b2

Please sign in to comment.