Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Better chunk cleanup on StreamToBlinkTableAdapter.destroy, and a few listener.destroy fixes #6406

Merged
merged 7 commits into from
Nov 22, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -1340,8 +1340,10 @@ public void onUpdate(TableUpdate upstream) {
@Override
protected void destroy() {
super.destroy();
leftStampKeys.close();
leftStampValues.close();
getUpdateGraph().runWhenIdle(() -> {
leftStampKeys.close();
leftStampValues.close();
});
}
});

Expand Down Expand Up @@ -1522,8 +1524,10 @@ public void onUpdate(TableUpdate upstream) {
@Override
protected void destroy() {
super.destroy();
compactedRightStampKeys.close();
compactedRightStampValues.close();
getUpdateGraph().runWhenIdle(() -> {
compactedRightStampKeys.close();
compactedRightStampValues.close();
});
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,9 @@ private RowSet indexFromBuilder(int slotIndex) {
@Override
protected void destroy() {
super.destroy();
leftSsaFactory.close();
rightSsaFactory.close();
getUpdateGraph().runWhenIdle(() -> {
leftSsaFactory.close();
rightSsaFactory.close();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
import io.deephaven.engine.table.impl.sources.LongAsInstantColumnSource;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.impl.sources.SwitchColumnSource;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.engine.table.impl.sources.chunkcolumnsource.ChunkColumnSource;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.MultiException;
Expand All @@ -40,11 +40,13 @@
import java.lang.ref.WeakReference;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

Expand Down Expand Up @@ -271,6 +273,15 @@ private static void maybeClearChunkColumnSource(ColumnSource<?> cs) {
}
}

private synchronized void clearChunkColumnSources() {
SafeCloseable.closeAll(
Stream.of(bufferChunkSources, currentChunkSources, prevChunkSources)
.filter(Objects::nonNull)
.flatMap(Arrays::stream)
.map(ccs -> ccs::clear));
bufferChunkSources = currentChunkSources = prevChunkSources = null;
}

/**
* Return the {@link Table#BLINK_TABLE_ATTRIBUTE blink} {@link Table table} that this adapter is producing, and
* ensure that this StreamToBlinkTableAdapter no longer enforces strong reachability of the result. May return
Expand Down Expand Up @@ -306,6 +317,7 @@ public void close() {
.endl();
updateSourceRegistrar.removeSource(this);
streamPublisher.shutdown();
getUpdateGraph().runWhenIdle(this::clearChunkColumnSources);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
package io.deephaven.stream;

import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.ChunkPoolConstants;
import io.deephaven.chunk.util.pools.ChunkPoolReleaseTracking;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.Table;
Expand All @@ -18,6 +22,7 @@
import io.deephaven.engine.table.impl.SimpleListener;
import io.deephaven.chunk.*;
import io.deephaven.util.BooleanUtils;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.type.ArrayTypeUtils;
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.MutableBoolean;
Expand Down Expand Up @@ -446,6 +451,34 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) {
TestCase.assertTrue(listenerFailed.booleanValue());
}

@Test
public void testCleanup() {
final TableDefinition tableDefinition = TableDefinition.from(
List.of("O", "B", "S", "I", "L", "F", "D", "C"),
List.of(String.class, byte.class, short.class, int.class, long.class, float.class, double.class,
char.class));
final Table tableToAdd = emptyTable(ChunkPoolConstants.SMALLEST_POOLED_CHUNK_CAPACITY).updateView(
"O=Long.toString(ii)", "B=(byte)ii", "S=(short)ii", "I=(int)ii", "L=ii", "F=(float)ii",
"D=(double)ii", "C=(char)ii");
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(true), true)) {
final TablePublisher tablePublisher = TablePublisher.of("Test", tableDefinition, null, null);
// Add buffered chunks
tablePublisher.add(tableToAdd);
// Move buffered chunks to current
updateGraph.runWithinUnitTestCycle(() -> {
});
// Add more buffered chunks
tablePublisher.add(tableToAdd);
// Move current to previous, buffered to current
updateGraph.runWithinUnitTestCycle(() -> {
});
// Add even more buffered chunks
tablePublisher.add(tableToAdd);
}
ChunkPoolReleaseTracking.check();
}

private static class DummyStreamPublisher implements StreamPublisher {

private boolean fail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.io.log.LogEntry;
import io.deephaven.util.annotations.FinalDefault;
import io.deephaven.util.function.ThrowingSupplier;
import io.deephaven.util.locks.AwareFunctionalLock;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -244,4 +245,24 @@ public LogOutput append(LogOutput output) {
}

// endregion refresh control

/**
* Run {@code task} immediately if this UpdateGraph is currently idle, else schedule {@code task} to run at a later
* time when it has become idle.
*
* @param task The task to run when idle
*/
@FinalDefault
default void runWhenIdle(@NotNull final Runnable task) {
if (clock().currentState() == LogicalClock.State.Idle) {
task.run();
} else {
addNotification(new TerminalNotification() {
@Override
public void run() {
task.run();
}
});
}
}
}