Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: DH-18174: Delay reading from parquet file when creating table and column location #6606

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ interface Listener extends BasicTableDataListener {

/**
* @param name The column name
* @return The ColumnLocation for the defined column under this table location
* @return The ColumnLocation for the defined column under this table location. The exact same ColumnLocation object
* should be returned for the same column name.
*/
@NotNull
ColumnLocation getColumnLocation(@NotNull CharSequence name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,16 @@ public LivenessReferent asLivenessReferent() {
// TableLocationState implementation
// ------------------------------------------------------------------------------------------------------------------

/**
* No-op by default, can be overridden by subclasses to initialize state on first access.
* <p>
* The expectation for static locations that override this is to call {@link #handleUpdateInternal(RowSet, long)}
* instead of {@link #handleUpdate(RowSet, long)}, and {@link #handleUpdateInternal(TableLocationState)} instead of
* {@link #handleUpdate(TableLocationState)} from inside {@link #initializeState()}. Otherwise, the initialization
* logic will recurse infinitely.
*/
protected void initializeState() {}

@Override
@NotNull
public final Object getStateLock() {
Expand All @@ -95,16 +105,19 @@ public final Object getStateLock() {

@Override
public final RowSet getRowSet() {
initializeState();
return state.getRowSet();
}

@Override
public final long getSize() {
initializeState();
return state.getSize();
}

@Override
public final long getLastModifiedTimeMillis() {
initializeState();
return state.getLastModifiedTimeMillis();
}

Expand Down Expand Up @@ -137,6 +150,11 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) {
* @param lastModifiedTimeMillis The new lastModificationTimeMillis
*/
public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) {
initializeState();
handleUpdateInternal(rowSet, lastModifiedTimeMillis);
}

protected final void handleUpdateInternal(final RowSet rowSet, final long lastModifiedTimeMillis) {
if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) {
deliverUpdateNotification();
}
Expand All @@ -149,6 +167,11 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM
* @param source The source to copy state values from
*/
public void handleUpdate(@NotNull final TableLocationState source) {
initializeState();
handleUpdateInternal(source);
}

protected final void handleUpdateInternal(@NotNull final TableLocationState source) {
if (source.copyStateValuesTo(state) && supportsSubscriptions()) {
deliverUpdateNotification();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,6 @@ private class IncludedTableLocationEntry implements Comparable<IncludedTableLoca
// New regions indices are assigned in order of insertion, starting from 0 with no re-use of removed indices.
// If this logic changes, the `getTableAttributes()` logic needs to be updated.
private final int regionIndex = nextRegionIndex++;
private final List<ColumnLocationState<?>> columnLocationStates = new ArrayList<>();

/**
* RowSet in the region's space, not the table's space.
Expand Down Expand Up @@ -631,13 +630,10 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi
.appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey));

for (final ColumnDefinition<?> columnDefinition : columnDefinitions) {
// noinspection unchecked,rawtypes
final ColumnLocationState<?> state = new ColumnLocationState(
columnDefinition,
columnSources.get(columnDefinition.getName()),
location.getColumnLocation(columnDefinition.getName()));
columnLocationStates.add(state);
state.regionAllocated(regionIndex);
final RegionedColumnSource<?> regionedColumnSource = columnSources.get(columnDefinition.getName());
final ColumnLocation columnLocation = location.getColumnLocation(columnDefinition.getName());
Assert.eq(regionIndex, "regionIndex", regionedColumnSource.addRegion(columnDefinition, columnLocation),
"regionedColumnSource.addRegion((definition, location)");
}

rowSetAtLastUpdate = initialRowSet;
Expand Down Expand Up @@ -710,7 +706,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) {
}

private void invalidate() {
columnLocationStates.forEach(cls -> cls.source.invalidateRegion(regionIndex));
columnSources.values().forEach(source -> source.invalidateRegion(regionIndex));
}

@Override
Expand All @@ -734,30 +730,6 @@ public ImmutableTableLocationKey getKey(
}
};

/**
* Batches up a definition, source, and location for ease of use. Implements grouping maintenance.
*/
private static class ColumnLocationState<T> {

protected final ColumnDefinition<T> definition;
protected final RegionedColumnSource<T> source;
protected final ColumnLocation location;

private ColumnLocationState(
ColumnDefinition<T> definition,
RegionedColumnSource<T> source,
ColumnLocation location) {
this.definition = definition;
this.source = source;
this.location = location;
}

private void regionAllocated(final int regionIndex) {
Assert.eq(regionIndex, "regionIndex", source.addRegion(definition, location),
"source.addRegion((definition, location)");
}
}

public Map<String, Object> getTableAttributes(
@NotNull TableUpdateMode tableUpdateMode,
@NotNull TableUpdateMode tableLocationUpdateMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.math.BigInteger;
import java.time.Instant;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import java.util.function.LongFunction;
Expand All @@ -58,48 +59,66 @@ final class ParquetColumnLocation<ATTR extends Values> extends AbstractColumnLoc
private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance()
.getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192);

private final String columnName;
private final String parquetColumnName;

private volatile boolean readersInitialized;

// Access to following variables must be guarded by initializeReaders()
// -----------------------------------------------------------------------
/**
* Factory object needed for deferred initialization of the remaining fields. We delay initializing this field
* itself till we need to read the column data.
*/
private ColumnChunkReader[] columnChunkReaders;

/**
* Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to
* ensure visibility of the derived fields.
* Whether the column location actually exists.
*/
private volatile ColumnChunkReader[] columnChunkReaders;
private boolean exists;
// -----------------------------------------------------------------------

// We should consider moving this to column level if needed. Column-location level likely allows more parallelism.
private volatile PageCache<ATTR> pageCache;
private volatile boolean pagesInitialized;

// Access to following variables must be guarded by initializePages()
// -----------------------------------------------------------------------
private ColumnChunkPageStore<ATTR>[] pageStores;
private Supplier<Chunk<ATTR>>[] dictionaryChunkSuppliers;
private ColumnChunkPageStore<DictionaryKeys>[] dictionaryKeysPageStores;
// -----------------------------------------------------------------------

/**
* Construct a new {@link ParquetColumnLocation} for the specified {@link ParquetTableLocation} and column name.
*
* @param tableLocation The table location enclosing this column location
* @param parquetColumnName The Parquet file column name
* @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location
*/
ParquetColumnLocation(
@NotNull final ParquetTableLocation tableLocation,
@NotNull final String columnName,
@NotNull final String parquetColumnName,
@Nullable final ColumnChunkReader[] columnChunkReaders) {
@NotNull final String parquetColumnName) {
super(tableLocation, columnName);
this.columnName = columnName;
this.parquetColumnName = parquetColumnName;
this.columnChunkReaders = columnChunkReaders;
this.readersInitialized = false;
this.pagesInitialized = false;
}

private PageCache<ATTR> ensurePageCache() {
PageCache<ATTR> localPageCache;
if ((localPageCache = pageCache) != null) {
return localPageCache;
private void initializeReaders() {
if (readersInitialized) {
return;
}

synchronized (this) {
if ((localPageCache = pageCache) != null) {
return localPageCache;
if (readersInitialized) {
return;
}
return pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE);
final List<String> columnPath = tl().getColumnPath(columnName, parquetColumnName);
final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders())
.map(rgr -> rgr.getColumnChunk(columnName, columnPath))
.toArray(ColumnChunkReader[]::new);
exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0);
this.columnChunkReaders = exists ? columnChunkReaders : null;
readersInitialized = true;
}
}

Expand All @@ -114,10 +133,8 @@ public String getImplementationName() {

@Override
public boolean exists() {
// If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to
// see a non-null
// pageStores array
return columnChunkReaders != null || pageStores != null;
initializeReaders();
return exists;
}

private ParquetTableLocation tl() {
Expand Down Expand Up @@ -258,9 +275,9 @@ private <TYPE> ColumnRegionObject<TYPE, ATTR> makeSingleColumnRegionObject(
* @return The page stores
*/
@NotNull
public ColumnChunkPageStore<ATTR>[] getPageStores(
private ColumnChunkPageStore<ATTR>[] getPageStores(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
initializePages(columnDefinition);
return pageStores;
}

Expand All @@ -270,9 +287,9 @@ public ColumnChunkPageStore<ATTR>[] getPageStores(
* @param columnDefinition The {@link ColumnDefinition} used to lookup type information
* @return The dictionary values chunk suppliers, or null if none exist
*/
public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
private Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
initializePages(columnDefinition);
return dictionaryChunkSuppliers;
}

Expand All @@ -285,30 +302,35 @@ public Supplier<Chunk<ATTR>>[] getDictionaryChunkSuppliers(
*/
private ColumnChunkPageStore<DictionaryKeys>[] getDictionaryKeysPageStores(
@NotNull final ColumnDefinition<?> columnDefinition) {
fetchValues(columnDefinition);
initializePages(columnDefinition);
return dictionaryKeysPageStores;
}

@SuppressWarnings("unchecked")
private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
if (columnChunkReaders == null) {
private void initializePages(@NotNull final ColumnDefinition<?> columnDefinition) {
if (pagesInitialized) {
return;
}
synchronized (this) {
if (columnChunkReaders == null) {
if (pagesInitialized) {
return;
}

initializeReaders();
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
final int pageStoreCount = columnChunkReaders.length;
pageStores = new ColumnChunkPageStore[pageStoreCount];
dictionaryChunkSuppliers = new Supplier[pageStoreCount];
dictionaryKeysPageStores = new ColumnChunkPageStore[pageStoreCount];

// We should consider moving this page-cache to column level if needed.
// Column-location level likely allows more parallelism.
final PageCache<ATTR> pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE);

for (int psi = 0; psi < pageStoreCount; ++psi) {
final ColumnChunkReader columnChunkReader = columnChunkReaders[psi];
try {
final ColumnChunkPageStore.CreatorResult<ATTR> creatorResult =
ColumnChunkPageStore.create(
ensurePageCache(),
pageCache,
columnChunkReader,
tl().getRegionParameters().regionMask,
makeToPage(tl().getColumnTypes().get(parquetColumnName),
Expand All @@ -325,6 +347,7 @@ private void fetchValues(@NotNull final ColumnDefinition<?> columnDefinition) {
}

columnChunkReaders = null;
pagesInitialized = true;
}
}

Expand Down
Loading