Skip to content

Commit

Permalink
COH-28677 Build: Add logging to the Bedrock cluster predicates
Browse files Browse the repository at this point in the history
(merge main -> ce/main 104186)
RQ job.9.20231024124338.1640

[git-p4: depot-paths = "//dev/coherence-ce/main/": change = 104227]
  • Loading branch information
thegridman committed Oct 24, 2023
1 parent a7da789 commit 0033991
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,34 @@

import com.oracle.bedrock.Option;
import com.oracle.bedrock.OptionsByType;
import com.oracle.bedrock.deferred.DeferredHelper;
import com.oracle.bedrock.deferred.DeferredPredicate;
import com.oracle.bedrock.deferred.PermanentlyUnavailableException;
import com.oracle.bedrock.options.Decoration;
import com.oracle.bedrock.options.Decorations;
import com.oracle.bedrock.runtime.AbstractAssembly;
import com.oracle.bedrock.runtime.Assembly;
import com.oracle.bedrock.runtime.coherence.callables.GetAutoStartServiceNames;
import com.oracle.bedrock.runtime.coherence.callables.GetServiceStatus;
import com.oracle.bedrock.runtime.coherence.callables.IsCoherenceRunning;
import com.oracle.bedrock.runtime.coherence.callables.IsReady;
import com.oracle.bedrock.runtime.coherence.callables.IsSafe;
import com.oracle.bedrock.runtime.coherence.callables.IsServiceStorageEnabled;
import com.oracle.bedrock.runtime.concurrent.options.Caching;
import com.oracle.bedrock.runtime.concurrent.runnable.ThreadDump;
import com.oracle.bedrock.runtime.options.StabilityPredicate;
import com.oracle.bedrock.util.Trilean;
import com.tangosol.net.Coherence;
import com.tangosol.net.NamedCache;
import com.tangosol.util.UID;
import com.tangosol.util.function.Remote;

import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import static com.oracle.bedrock.deferred.DeferredHelper.ensure;
Expand Down Expand Up @@ -200,6 +209,47 @@ protected void onRelaunched(
onChanged(optionsByType);
}

@Override
@SuppressWarnings({"rawtypes", "unchecked"})
protected void onChanged(OptionsByType options) {
try
{
StabilityPredicate<Assembly<?>> stabilityPredicate = options.getOrDefault(StabilityPredicate.class, null);
if (stabilityPredicate != null)
{
DeferredPredicate<?> deferredPredicate = new DeferredPredicate(this, stabilityPredicate.get());
DeferredHelper.ensure(DeferredHelper.eventually(deferredPredicate), com.oracle.bedrock.predicate.Predicates.is(true), options.asArray());
}
}
catch (PermanentlyUnavailableException e)
{
CoherenceClusterMember[] aMember = this.applications.toArray(CoherenceClusterMember[]::new);
CompletableFuture[] aFuture = new CompletableFuture[aMember.length];

for (int i = 0; i < aMember.length; i++)
{
try
{
aFuture[i] = aMember[i].submit(ThreadDump.toStdErr());
}
catch (Exception ignored)
{
// ignored
}
}

try
{
CompletableFuture.allOf(aFuture).get(2, TimeUnit.MINUTES);
}
catch (Exception ex)
{
System.err.println("Caught exception waiting for thread dumps to complete " + ex.getMessage());
}
throw e;
}
}


/**
* Useful {@link Predicate}s for a {@link CoherenceCluster}.
Expand All @@ -214,8 +264,79 @@ public interface Predicates
*/
static Predicate<CoherenceCluster> autoStartServicesSafe()
{
return (cluster) -> {
return IsAutoStartServicesSafePredicate.INSTANCE;
}

/**
* A {@link Predicate} to determine if {@link CoherenceCluster}
* is running.
*
* @return a {@link Predicate}
*/
static Predicate<CoherenceCluster> isCoherenceRunning()
{
return new IsCoherenceRunningPredicate(Set.of(Coherence.DEFAULT_NAME));
}

/**
* A {@link Predicate} to determine if all health checks in the
* {@link CoherenceCluster} are ready.
*
* @return a {@link Predicate}
*/
static Predicate<CoherenceCluster> isReady()
{
return IsReadyPredicate.INSTANCE;
}

/**
* A {@link Predicate} to determine if all health checks in the
* {@link CoherenceCluster} are ready.
*
* @param sHealthCheck the name of an additional health check to verify
*
* @return a {@link Predicate}
*/
static Predicate<CoherenceCluster> isReady(String sHealthCheck)
{
return isCoherenceRunning(Set.of(Coherence.DEFAULT_NAME));
}

/**
* A {@link Predicate} to determine if {@link CoherenceCluster}
* is running.
*
* @param asName the names of the Coherence instances to verify
*
* @return a {@link Predicate}
*/
static Predicate<CoherenceCluster> isCoherenceRunning(String... asName)
{
return isCoherenceRunning(Set.of(asName));
}

/**
* A {@link Predicate} to determine if {@link CoherenceCluster}
* is running.
*
* @param setName the names of the Coherence instances to verify
*
* @return a {@link Predicate}
*/
static Predicate<CoherenceCluster> isCoherenceRunning(Set<String> setName)
{
return new IsCoherenceRunningPredicate(setName);
}
}

// ----- IsAutoStartServicesSafePredicate -------------------------------

static class IsAutoStartServicesSafePredicate
implements Remote.Predicate<CoherenceCluster>
{
@Override
public boolean test(CoherenceCluster cluster)
{
// determine the number of each auto start service is defined by the cluster
HashMap<String, Integer> serviceCountMap = new HashMap<>();

Expand Down Expand Up @@ -276,98 +397,79 @@ else if (count == 1)
}

return true;
};
}

/**
* A {@link Predicate} to determine if {@link CoherenceCluster}
* is running.
*
* @return a {@link Predicate}
*/
static Predicate<CoherenceCluster> isCoherenceRunning()
@Override
public String toString()
{
return (cluster) ->
{
for (CoherenceClusterMember member : cluster)
{
if (!member.invoke(new IsCoherenceRunning()))
{
return false;
}
}
return true;
};
return "IsReadyPredicate";
}

/**
* A {@link Predicate} to determine if all health checks in the
* {@link CoherenceCluster} are ready.
*
* @return a {@link Predicate}
*/
static Predicate<CoherenceCluster> isReady()
// ----- data members -----------------------------------------------

static final IsAutoStartServicesSafePredicate INSTANCE = new IsAutoStartServicesSafePredicate();
}

// ----- IsReadyPredicate -----------------------------------------------

static class IsReadyPredicate
implements Remote.Predicate<CoherenceCluster>
{
@Override
public boolean test(CoherenceCluster cluster)
{
return (cluster) ->
for (CoherenceClusterMember member : cluster)
{
for (CoherenceClusterMember member : cluster)
if (!member.invoke(IsReady.INSTANCE))
{
if (!member.invoke(IsReady.INSTANCE))
{
return false;
}
return false;
}
return true;
};
}
return true;
}

/**
* A {@link Predicate} to determine if all health checks in the
* {@link CoherenceCluster} are ready.
*
* @param sHealthCheck the name of an additional health check to verify
*
* @return a {@link Predicate}
*/
static Predicate<CoherenceCluster> isReady(String sHealthCheck)
@Override
public String toString()
{
return isCoherenceRunning(Set.of(Coherence.DEFAULT_NAME));
return "IsReadyPredicate";
}

/**
* A {@link Predicate} to determine if {@link CoherenceCluster}
* is running.
*
* @param asName the names of the Coherence instances to verify
*
* @return a {@link Predicate}
*/
static Predicate<CoherenceCluster> isCoherenceRunning(String... asName)
// ----- data members -----------------------------------------------

static final IsReadyPredicate INSTANCE = new IsReadyPredicate();
}

// ----- IsCoherenceRunningPredicate ------------------------------------

static class IsCoherenceRunningPredicate
implements Remote.Predicate<CoherenceCluster>
{
public IsCoherenceRunningPredicate(Set<String> setNames)
{
return isCoherenceRunning(Set.of(asName));
f_setNames = setNames;
}

/**
* A {@link Predicate} to determine if {@link CoherenceCluster}
* is running.
*
* @param setName the names of the Coherence instances to verify
*
* @return a {@link Predicate}
*/
static Predicate<CoherenceCluster> isCoherenceRunning(Set<String> setName)
@Override
public boolean test(CoherenceCluster cluster)
{
return (cluster) ->
for (CoherenceClusterMember member : cluster)
{
for (CoherenceClusterMember member : cluster)
if (!member.invoke(new IsCoherenceRunning(f_setNames)))
{
if (!member.invoke(new IsCoherenceRunning(setName)))
{
return false;
}
return false;
}
return true;
};
}
return true;
}

@Override
public String toString()
{
return "IsCoherenceRunningPredicate(coherence=" + f_setNames + ")";
}

// ----- data members -----------------------------------------------

private final Set<String> f_setNames;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ internal.util.processor.CacheProcessors.Null=com.tangosol.internal.util.processo
internal.util.processor.CacheProcessors.GetOrDefault=com.tangosol.internal.util.processor.CacheProcessors$GetOrDefault
internal.util.processor.CacheProcessors.Put=com.tangosol.internal.util.processor.CacheProcessors$Put
internal.util.processor.CacheProcessors.PutAll=com.tangosol.internal.util.processor.CacheProcessors$PutAll
internal.util.processor.CacheProcessors.PutAll=com.tangosol.internal.util.processor.CacheProcessors$PutAllWithExpiry
internal.util.processor.CacheProcessors.PutAllPutAllWithExpiry=com.tangosol.internal.util.processor.CacheProcessors$PutAllWithExpiry
internal.util.processor.CacheProcessors.PutIfAbsent=com.tangosol.internal.util.processor.CacheProcessors$PutIfAbsent
internal.util.processor.CacheProcessors.Remove=com.tangosol.internal.util.processor.CacheProcessors$Remove
internal.util.processor.CacheProcessors.RemoveBlind=com.tangosol.internal.util.processor.CacheProcessors$RemoveBlind
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ class path, or module path, of the JVM.
<local-storage system-property="coherence.distributed.localstorage">true</local-storage>
<partition-count system-property="coherence.distributed.partitions">257</partition-count>
<autostart system-property="coherence.topic.enabled">true</autostart>
<channel-count system-property="coherence.topic.channel.count"/>
<high-units>{topic-high-units-bytes 0B}</high-units>
<reconnect-wait system-property="coherence.topic.reconnect.wait"/>
<reconnect-timeout system-property="coherence.topic.reconnect.timeout"/>
Expand Down

0 comments on commit 0033991

Please sign in to comment.