Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into 5996-rst-stream-can…
Browse files Browse the repository at this point in the history
…cel-2
  • Loading branch information
niloc132 committed Nov 23, 2024
2 parents 69c75f4 + 7b47794 commit f3d353a
Show file tree
Hide file tree
Showing 14 changed files with 90 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1340,8 +1340,10 @@ public void onUpdate(TableUpdate upstream) {
@Override
protected void destroy() {
super.destroy();
leftStampKeys.close();
leftStampValues.close();
getUpdateGraph().runWhenIdle(() -> {
leftStampKeys.close();
leftStampValues.close();
});
}
});

Expand Down Expand Up @@ -1522,8 +1524,10 @@ public void onUpdate(TableUpdate upstream) {
@Override
protected void destroy() {
super.destroy();
compactedRightStampKeys.close();
compactedRightStampValues.close();
getUpdateGraph().runWhenIdle(() -> {
compactedRightStampKeys.close();
compactedRightStampValues.close();
});
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -791,7 +791,9 @@ private RowSet indexFromBuilder(int slotIndex) {
@Override
protected void destroy() {
super.destroy();
leftSsaFactory.close();
rightSsaFactory.close();
getUpdateGraph().runWhenIdle(() -> {
leftSsaFactory.close();
rightSsaFactory.close();
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ public void addChunk(@NotNull final WritableChunk<? extends Values> chunk) {
}

@Override
public void clear() {
public synchronized void clear() {
totalSize = 0;
data.forEach(SafeCloseable::close);
data.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@
import io.deephaven.engine.table.impl.sources.LongAsInstantColumnSource;
import io.deephaven.engine.table.impl.sources.NullValueColumnSource;
import io.deephaven.engine.table.impl.sources.SwitchColumnSource;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.engine.table.impl.QueryTable;
import io.deephaven.chunk.ChunkType;
import io.deephaven.chunk.WritableChunk;
import io.deephaven.engine.table.impl.sources.chunkcolumnsource.ChunkColumnSource;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.rowset.TrackingWritableRowSet;
import io.deephaven.engine.updategraph.NotificationQueue;
import io.deephaven.engine.updategraph.UpdateGraph;
import io.deephaven.engine.updategraph.UpdateSourceRegistrar;
import io.deephaven.internal.log.LoggerFactory;
import io.deephaven.io.logger.Logger;
import io.deephaven.util.MultiException;
Expand All @@ -40,11 +40,13 @@
import java.lang.ref.WeakReference;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

Expand Down Expand Up @@ -271,6 +273,15 @@ private static void maybeClearChunkColumnSource(ColumnSource<?> cs) {
}
}

private synchronized void clearChunkColumnSources() {
SafeCloseable.closeAll(
Stream.of(bufferChunkSources, currentChunkSources, prevChunkSources)
.filter(Objects::nonNull)
.flatMap(Arrays::stream)
.map(ccs -> ccs::clear));
bufferChunkSources = currentChunkSources = prevChunkSources = null;
}

/**
* Return the {@link Table#BLINK_TABLE_ATTRIBUTE blink} {@link Table table} that this adapter is producing, and
* ensure that this StreamToBlinkTableAdapter no longer enforces strong reachability of the result. May return
Expand Down Expand Up @@ -306,6 +317,7 @@ public void close() {
.endl();
updateSourceRegistrar.removeSource(this);
streamPublisher.shutdown();
getUpdateGraph().runWhenIdle(this::clearChunkColumnSources);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@
package io.deephaven.stream;

import io.deephaven.chunk.attributes.Values;
import io.deephaven.chunk.util.pools.ChunkPoolConstants;
import io.deephaven.chunk.util.pools.ChunkPoolReleaseTracking;
import io.deephaven.engine.context.ExecutionContext;
import io.deephaven.engine.liveness.LivenessScope;
import io.deephaven.engine.liveness.LivenessScopeStack;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.rowset.RowSetShiftData;
import io.deephaven.engine.table.Table;
Expand All @@ -18,6 +22,7 @@
import io.deephaven.engine.table.impl.SimpleListener;
import io.deephaven.chunk.*;
import io.deephaven.util.BooleanUtils;
import io.deephaven.util.SafeCloseable;
import io.deephaven.util.type.ArrayTypeUtils;
import junit.framework.TestCase;
import org.apache.commons.lang3.mutable.MutableBoolean;
Expand Down Expand Up @@ -446,6 +451,34 @@ public void onFailureInternal(Throwable originalException, Entry sourceEntry) {
TestCase.assertTrue(listenerFailed.booleanValue());
}

@Test
public void testCleanup() {
final TableDefinition tableDefinition = TableDefinition.from(
List.of("O", "B", "S", "I", "L", "F", "D", "C"),
List.of(String.class, byte.class, short.class, int.class, long.class, float.class, double.class,
char.class));
final Table tableToAdd = emptyTable(ChunkPoolConstants.SMALLEST_POOLED_CHUNK_CAPACITY).updateView(
"O=Long.toString(ii)", "B=(byte)ii", "S=(short)ii", "I=(int)ii", "L=ii", "F=(float)ii",
"D=(double)ii", "C=(char)ii");
final ControlledUpdateGraph updateGraph = ExecutionContext.getContext().getUpdateGraph().cast();
try (final SafeCloseable ignored = LivenessScopeStack.open(new LivenessScope(true), true)) {
final TablePublisher tablePublisher = TablePublisher.of("Test", tableDefinition, null, null);
// Add buffered chunks
tablePublisher.add(tableToAdd);
// Move buffered chunks to current
updateGraph.runWithinUnitTestCycle(() -> {
});
// Add more buffered chunks
tablePublisher.add(tableToAdd);
// Move current to previous, buffered to current
updateGraph.runWithinUnitTestCycle(() -> {
});
// Add even more buffered chunks
tablePublisher.add(tableToAdd);
}
ChunkPoolReleaseTracking.check();
}

private static class DummyStreamPublisher implements StreamPublisher {

private boolean fail;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.deephaven.base.log.LogOutput;
import io.deephaven.base.verify.Assert;
import io.deephaven.io.log.LogEntry;
import io.deephaven.util.annotations.FinalDefault;
import io.deephaven.util.function.ThrowingSupplier;
import io.deephaven.util.locks.AwareFunctionalLock;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -244,4 +245,24 @@ public LogOutput append(LogOutput output) {
}

// endregion refresh control

/**
* Run {@code task} immediately if this UpdateGraph is currently idle, else schedule {@code task} to run at a later
* time when it has become idle.
*
* @param task The task to run when idle
*/
@FinalDefault
default void runWhenIdle(@NotNull final Runnable task) {
if (clock().currentState() == LogicalClock.State.Idle) {
task.run();
} else {
addNotification(new TerminalNotification() {
@Override
public void run() {
task.run();
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,9 @@ public boolean isFinestEnabled() {
transportState.runOnTransportThread(
() -> {
transportState.complete();
// asyncContext.complete();
asyncContext.complete();
log.fine("call completed");
});
// Jetty specific fix: When AsyncContext.complete() is called, Jetty sends a RST_STREAM with
// "cancel" error to the client, while other containers send "no error" in this case. Calling
// close() instead on the output stream still sends the RST_STREAM, but with "no error". Note
// that this does the opposite in at least Tomcat, so we're not going to upstream this change.
// See https://github.com/deephaven/deephaven-core/issues/6400
outputStream.close();
};
this.isReady = () -> outputStream.isReady();
}
Expand Down

0 comments on commit f3d353a

Please sign in to comment.