diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java index 63ab9e03d1b..6b9d06c7243 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/TableLocation.java @@ -127,7 +127,8 @@ interface Listener extends BasicTableDataListener { /** * @param name The column name - * @return The ColumnLocation for the defined column under this table location + * @return The ColumnLocation for the defined column under this table location. The exact same ColumnLocation object + * should be returned for the same column name. */ @NotNull ColumnLocation getColumnLocation(@NotNull CharSequence name); diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java index e1c0fed476b..a51eb640cc3 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/locations/impl/AbstractTableLocation.java @@ -87,6 +87,16 @@ public LivenessReferent asLivenessReferent() { // TableLocationState implementation // ------------------------------------------------------------------------------------------------------------------ + /** + * No-op by default, can be overridden by subclasses to initialize state on first access. + *

+ * The expectation for static locations that override this is to call {@link #handleUpdateInternal(RowSet, long)} + * instead of {@link #handleUpdate(RowSet, long)}, and {@link #handleUpdateInternal(TableLocationState)} instead of + * {@link #handleUpdate(TableLocationState)} from inside {@link #initializeState()}. Otherwise, the initialization + * logic will recurse infinitely. + */ + protected void initializeState() {} + @Override @NotNull public final Object getStateLock() { @@ -95,16 +105,19 @@ public final Object getStateLock() { @Override public final RowSet getRowSet() { + initializeState(); return state.getRowSet(); } @Override public final long getSize() { + initializeState(); return state.getSize(); } @Override public final long getLastModifiedTimeMillis() { + initializeState(); return state.getLastModifiedTimeMillis(); } @@ -137,6 +150,11 @@ protected final void deliverInitialSnapshot(@NotNull final Listener listener) { * @param lastModifiedTimeMillis The new lastModificationTimeMillis */ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeMillis) { + initializeState(); + handleUpdateInternal(rowSet, lastModifiedTimeMillis); + } + + protected final void handleUpdateInternal(final RowSet rowSet, final long lastModifiedTimeMillis) { if (state.setValues(rowSet, lastModifiedTimeMillis) && supportsSubscriptions()) { deliverUpdateNotification(); } @@ -149,6 +167,11 @@ public final void handleUpdate(final RowSet rowSet, final long lastModifiedTimeM * @param source The source to copy state values from */ public void handleUpdate(@NotNull final TableLocationState source) { + initializeState(); + handleUpdateInternal(source); + } + + protected final void handleUpdateInternal(@NotNull final TableLocationState source) { if (source.copyStateValuesTo(state) && supportsSubscriptions()) { deliverUpdateNotification(); } diff --git a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java index f33e85514ec..85fec72346f 100644 --- a/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java +++ b/engine/table/src/main/java/io/deephaven/engine/table/impl/sources/regioned/RegionedColumnSourceManager.java @@ -603,7 +603,9 @@ private class IncludedTableLocationEntry implements Comparable> columnLocationStates = new ArrayList<>(); + + // Collection of column sources for which we have added a region, useful for invalidating together + private final Collection> regionedColumnSources = new ArrayList<>(); /** * RowSet in the region's space, not the table's space. @@ -631,13 +633,11 @@ private void processInitial(final RowSetBuilderSequential addedRowSetBuilder, fi .appendRange(regionFirstKey + subRegionFirstKey, regionFirstKey + subRegionLastKey)); for (final ColumnDefinition columnDefinition : columnDefinitions) { - // noinspection unchecked,rawtypes - final ColumnLocationState state = new ColumnLocationState( - columnDefinition, - columnSources.get(columnDefinition.getName()), - location.getColumnLocation(columnDefinition.getName())); - columnLocationStates.add(state); - state.regionAllocated(regionIndex); + final RegionedColumnSource regionedColumnSource = columnSources.get(columnDefinition.getName()); + final ColumnLocation columnLocation = location.getColumnLocation(columnDefinition.getName()); + Assert.eq(regionIndex, "regionIndex", regionedColumnSource.addRegion(columnDefinition, columnLocation), + "regionedColumnSource.addRegion((definition, location)"); + regionedColumnSources.add(regionedColumnSource); } rowSetAtLastUpdate = initialRowSet; @@ -710,7 +710,7 @@ private boolean pollUpdates(final RowSetBuilderSequential addedRowSetBuilder) { } private void invalidate() { - columnLocationStates.forEach(cls -> cls.source.invalidateRegion(regionIndex)); + regionedColumnSources.forEach(source -> source.invalidateRegion(regionIndex)); } @Override @@ -734,30 +734,6 @@ public ImmutableTableLocationKey getKey( } }; - /** - * Batches up a definition, source, and location for ease of use. Implements grouping maintenance. - */ - private static class ColumnLocationState { - - protected final ColumnDefinition definition; - protected final RegionedColumnSource source; - protected final ColumnLocation location; - - private ColumnLocationState( - ColumnDefinition definition, - RegionedColumnSource source, - ColumnLocation location) { - this.definition = definition; - this.source = source; - this.location = location; - } - - private void regionAllocated(final int regionIndex) { - Assert.eq(regionIndex, "regionIndex", source.addRegion(definition, location), - "source.addRegion((definition, location)"); - } - } - public Map getTableAttributes( @NotNull TableUpdateMode tableUpdateMode, @NotNull TableUpdateMode tableLocationUpdateMode) { diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java index 4ddebb33685..c085730ba26 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetColumnLocation.java @@ -39,6 +39,8 @@ import java.math.BigInteger; import java.time.Instant; import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Optional; import java.util.function.Function; import java.util.function.LongFunction; @@ -58,48 +60,68 @@ final class ParquetColumnLocation extends AbstractColumnLoc private static final int MAX_PAGE_CACHE_SIZE = Configuration.getInstance() .getIntegerForClassWithDefault(ParquetColumnLocation.class, "maxPageCacheSize", 8192); + private final String columnName; private final String parquetColumnName; + + + private volatile boolean readersInitialized; + + // Access to following variables must be guarded by initializeReaders() + // ----------------------------------------------------------------------- /** - * Factory object needed for deferred initialization of the remaining fields. Reference serves as a barrier to - * ensure visibility of the derived fields. + * Factory object needed for deferred initialization of the remaining fields. We delay initializing this field + * itself till we need to read the column data. */ - private volatile ColumnChunkReader[] columnChunkReaders; + private ColumnChunkReader[] columnChunkReaders; - // We should consider moving this to column level if needed. Column-location level likely allows more parallelism. - private volatile PageCache pageCache; + /** + * Whether the column location actually exists. + */ + private boolean exists; + // ----------------------------------------------------------------------- + + private volatile boolean pagesInitialized; + // Access to following variables must be guarded by initializePages() + // ----------------------------------------------------------------------- private ColumnChunkPageStore[] pageStores; private Supplier>[] dictionaryChunkSuppliers; private ColumnChunkPageStore[] dictionaryKeysPageStores; + // ----------------------------------------------------------------------- /** * Construct a new {@link ParquetColumnLocation} for the specified {@link ParquetTableLocation} and column name. * * @param tableLocation The table location enclosing this column location * @param parquetColumnName The Parquet file column name - * @param columnChunkReaders The {@link ColumnChunkReader column chunk readers} for this location */ ParquetColumnLocation( @NotNull final ParquetTableLocation tableLocation, @NotNull final String columnName, - @NotNull final String parquetColumnName, - @Nullable final ColumnChunkReader[] columnChunkReaders) { + @NotNull final String parquetColumnName) { super(tableLocation, columnName); + this.columnName = columnName; this.parquetColumnName = parquetColumnName; - this.columnChunkReaders = columnChunkReaders; + this.readersInitialized = false; + this.pagesInitialized = false; } - private PageCache ensurePageCache() { - PageCache localPageCache; - if ((localPageCache = pageCache) != null) { - return localPageCache; + private void initializeReaders() { + if (readersInitialized) { + return; } - synchronized (this) { - if ((localPageCache = pageCache) != null) { - return localPageCache; + if (readersInitialized) { + return; } - return pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE); + final String[] columnPath = tl().getParquetColumnNameToPath().get(parquetColumnName); + final List nameList = + columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath); + final ColumnChunkReader[] columnChunkReaders = Arrays.stream(tl().getRowGroupReaders()) + .map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new); + exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); + this.columnChunkReaders = exists ? columnChunkReaders : null; + readersInitialized = true; } } @@ -114,10 +136,8 @@ public String getImplementationName() { @Override public boolean exists() { - // If we see a null columnChunkReaders array, either we don't exist or we are guaranteed to - // see a non-null - // pageStores array - return columnChunkReaders != null || pageStores != null; + initializeReaders(); + return exists; } private ParquetTableLocation tl() { @@ -258,9 +278,9 @@ private ColumnRegionObject makeSingleColumnRegionObject( * @return The page stores */ @NotNull - public ColumnChunkPageStore[] getPageStores( + private ColumnChunkPageStore[] getPageStores( @NotNull final ColumnDefinition columnDefinition) { - fetchValues(columnDefinition); + initializePages(columnDefinition); return pageStores; } @@ -270,9 +290,9 @@ public ColumnChunkPageStore[] getPageStores( * @param columnDefinition The {@link ColumnDefinition} used to lookup type information * @return The dictionary values chunk suppliers, or null if none exist */ - public Supplier>[] getDictionaryChunkSuppliers( + private Supplier>[] getDictionaryChunkSuppliers( @NotNull final ColumnDefinition columnDefinition) { - fetchValues(columnDefinition); + initializePages(columnDefinition); return dictionaryChunkSuppliers; } @@ -285,30 +305,35 @@ public Supplier>[] getDictionaryChunkSuppliers( */ private ColumnChunkPageStore[] getDictionaryKeysPageStores( @NotNull final ColumnDefinition columnDefinition) { - fetchValues(columnDefinition); + initializePages(columnDefinition); return dictionaryKeysPageStores; } @SuppressWarnings("unchecked") - private void fetchValues(@NotNull final ColumnDefinition columnDefinition) { - if (columnChunkReaders == null) { + private void initializePages(@NotNull final ColumnDefinition columnDefinition) { + if (pagesInitialized) { return; } synchronized (this) { - if (columnChunkReaders == null) { + if (pagesInitialized) { return; } - + initializeReaders(); final int pageStoreCount = columnChunkReaders.length; pageStores = new ColumnChunkPageStore[pageStoreCount]; dictionaryChunkSuppliers = new Supplier[pageStoreCount]; dictionaryKeysPageStores = new ColumnChunkPageStore[pageStoreCount]; + + // We should consider moving this page-cache to column level if needed. + // Column-location level likely allows more parallelism. + final PageCache pageCache = new PageCache<>(INITIAL_PAGE_CACHE_SIZE, MAX_PAGE_CACHE_SIZE); + for (int psi = 0; psi < pageStoreCount; ++psi) { final ColumnChunkReader columnChunkReader = columnChunkReaders[psi]; try { final ColumnChunkPageStore.CreatorResult creatorResult = ColumnChunkPageStore.create( - ensurePageCache(), + pageCache, columnChunkReader, tl().getRegionParameters().regionMask, makeToPage(tl().getColumnTypes().get(parquetColumnName), @@ -325,6 +350,7 @@ private void fetchValues(@NotNull final ColumnDefinition columnDefinition) { } columnChunkReaders = null; + pagesInitialized = true; } } diff --git a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java index 7c8f47636e5..ee2fe802710 100644 --- a/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java +++ b/extensions/parquet/table/src/main/java/io/deephaven/parquet/table/location/ParquetTableLocation.java @@ -22,7 +22,6 @@ import io.deephaven.engine.table.impl.select.SourceColumn; import io.deephaven.engine.table.impl.sources.regioned.RegionedColumnSource; import io.deephaven.engine.table.impl.sources.regioned.RegionedPageStore; -import io.deephaven.parquet.base.ColumnChunkReader; import io.deephaven.parquet.base.ParquetFileReader; import io.deephaven.parquet.base.RowGroupReader; import io.deephaven.parquet.table.ParquetInstructions; @@ -33,7 +32,6 @@ import io.deephaven.parquet.table.metadata.GroupingColumnInfo; import io.deephaven.parquet.table.metadata.SortColumnInfo; import io.deephaven.parquet.table.metadata.TableInfo; -import io.deephaven.util.channel.SeekableChannelsProvider; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.format.RowGroup; import org.apache.parquet.hadoop.metadata.ParquetMetadata; @@ -57,74 +55,93 @@ public class ParquetTableLocation extends AbstractTableLocation { private static final String IMPLEMENTATION_NAME = ParquetColumnLocation.class.getSimpleName(); private final ParquetInstructions readInstructions; - private final ParquetFileReader parquetFileReader; - private final int[] rowGroupIndices; - private final RowGroup[] rowGroups; - private final RegionedPageStore.Parameters regionParameters; - private final Map parquetColumnNameToPath; + private volatile boolean isInitialized; - private final TableInfo tableInfo; - private final Map groupingColumns; - private final List dataIndexes; - private final Map columnTypes; - private final List sortingColumns; + // Access to all the following variables must be guarded by initialize() + // ----------------------------------------------------------------------- + private ParquetFileReader parquetFileReader; - private final String version; + private RegionedPageStore.Parameters regionParameters; + private Map parquetColumnNameToPath; - private volatile RowGroupReader[] rowGroupReaders; + private TableInfo tableInfo; + private Map groupingColumns; + private Map columnTypes; + private List sortingColumns; + + private RowGroupReader[] rowGroupReaders; + // ----------------------------------------------------------------------- public ParquetTableLocation(@NotNull final TableKey tableKey, @NotNull final ParquetTableLocationKey tableLocationKey, @NotNull final ParquetInstructions readInstructions) { super(tableKey, tableLocationKey, false); this.readInstructions = readInstructions; - final ParquetMetadata parquetMetadata; - // noinspection SynchronizationOnLocalVariableOrMethodParameter - synchronized (tableLocationKey) { - // Following methods are internally synchronized, we synchronize them together here to minimize lock/unlock - // calls - parquetFileReader = tableLocationKey.getFileReader(); - parquetMetadata = tableLocationKey.getMetadata(); - rowGroupIndices = tableLocationKey.getRowGroupIndices(); + this.isInitialized = false; + } + + private void initialize() { + if (isInitialized) { + return; } + synchronized (this) { + if (isInitialized) { + return; + } + final ParquetMetadata parquetMetadata; + final ParquetTableLocationKey tableLocationKey = getParquetKey(); + final int[] rowGroupIndices; + synchronized (tableLocationKey) { + // Following methods are internally synchronized, we synchronize them together here to minimize + // lock/unlock calls + parquetFileReader = tableLocationKey.getFileReader(); + parquetMetadata = tableLocationKey.getMetadata(); + rowGroupIndices = tableLocationKey.getRowGroupIndices(); + } - final int rowGroupCount = rowGroupIndices.length; - rowGroups = IntStream.of(rowGroupIndices) - .mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi)) - .sorted(Comparator.comparingInt(RowGroup::getOrdinal)) - .toArray(RowGroup[]::new); - final long maxRowCount = Arrays.stream(rowGroups).mapToLong(RowGroup::getNum_rows).max().orElse(0L); - regionParameters = new RegionedPageStore.Parameters( - RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, rowGroupCount, maxRowCount); - - parquetColumnNameToPath = new HashMap<>(); - for (final ColumnDescriptor column : parquetFileReader.getSchema().getColumns()) { - final String[] path = column.getPath(); - if (path.length > 1) { - parquetColumnNameToPath.put(path[0], path); + final int rowGroupCount = rowGroupIndices.length; + final RowGroup[] rowGroups = IntStream.of(rowGroupIndices) + .mapToObj(rgi -> parquetFileReader.fileMetaData.getRow_groups().get(rgi)) + .sorted(Comparator.comparingInt(RowGroup::getOrdinal)) + .toArray(RowGroup[]::new); + final long maxRowCount = Arrays.stream(rowGroups).mapToLong(RowGroup::getNum_rows).max().orElse(0L); + regionParameters = new RegionedPageStore.Parameters( + RegionedColumnSource.ROW_KEY_TO_SUB_REGION_ROW_INDEX_MASK, rowGroupCount, maxRowCount); + + parquetColumnNameToPath = new HashMap<>(); + for (final ColumnDescriptor column : parquetFileReader.getSchema().getColumns()) { + final String[] path = column.getPath(); + if (path.length > 1) { + parquetColumnNameToPath.put(path[0], path); + } } - } - // TODO (https://github.com/deephaven/deephaven-core/issues/958): - // When/if we support _metadata files for Deephaven-written Parquet tables, we may need to revise this - // in order to read *this* file's metadata, rather than inheriting file metadata from the _metadata file. - // Obvious issues included data index table paths, codecs, etc. - // Presumably, we could store per-file instances of the metadata in the _metadata file's map. - tableInfo = ParquetSchemaReader - .parseMetadata(parquetMetadata.getFileMetaData().getKeyValueMetaData()) - .orElse(TableInfo.builder().build()); - version = tableInfo.version(); - groupingColumns = tableInfo.groupingColumnMap(); - dataIndexes = tableInfo.dataIndexes(); - columnTypes = tableInfo.columnTypeMap(); - sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); - - if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) { - // We do not have the last modified time for non-file URIs - handleUpdate(computeIndex(), TableLocationState.NULL_TIME); - } else { - handleUpdate(computeIndex(), new File(tableLocationKey.getURI()).lastModified()); + // TODO (https://github.com/deephaven/deephaven-core/issues/958): + // When/if we support _metadata files for Deephaven-written Parquet tables, we may need to revise this + // in order to read *this* file's metadata, rather than inheriting file metadata from the _metadata file. + // Obvious issues included data index table paths, codecs, etc. + // Presumably, we could store per-file instances of the metadata in the _metadata file's map. + tableInfo = ParquetSchemaReader + .parseMetadata(parquetMetadata.getFileMetaData().getKeyValueMetaData()) + .orElse(TableInfo.builder().build()); + groupingColumns = tableInfo.groupingColumnMap(); + columnTypes = tableInfo.columnTypeMap(); + sortingColumns = SortColumnInfo.sortColumns(tableInfo.sortingColumns()); + + rowGroupReaders = IntStream.of(rowGroupIndices) + .mapToObj(idx -> parquetFileReader.getRowGroup(idx, tableInfo.version())) + .sorted(Comparator.comparingInt(rgr -> rgr.getRowGroup().getOrdinal())) + .toArray(RowGroupReader[]::new); + + if (!FILE_URI_SCHEME.equals(tableLocationKey.getURI().getScheme())) { + // We do not have the last modified time for non-file URIs + handleUpdateInternal(computeIndex(rowGroups), TableLocationState.NULL_TIME); + } else { + handleUpdateInternal(computeIndex(rowGroups), new File(tableLocationKey.getURI()).lastModified()); + } + + isInitialized = true; } } @@ -144,55 +161,47 @@ ParquetInstructions getReadInstructions() { return readInstructions; } - SeekableChannelsProvider getChannelProvider() { - return parquetFileReader.getChannelsProvider(); - } - RegionedPageStore.Parameters getRegionParameters() { + initialize(); return regionParameters; } public Map getColumnTypes() { + initialize(); return columnTypes; } - private RowGroupReader[] getRowGroupReaders() { - RowGroupReader[] local; - if ((local = rowGroupReaders) != null) { - return local; - } - synchronized (this) { - if ((local = rowGroupReaders) != null) { - return local; - } - return rowGroupReaders = IntStream.of(rowGroupIndices) - .mapToObj(idx -> parquetFileReader.getRowGroup(idx, version)) - .sorted(Comparator.comparingInt(rgr -> rgr.getRowGroup().getOrdinal())) - .toArray(RowGroupReader[]::new); - } + RowGroupReader[] getRowGroupReaders() { + initialize(); + return rowGroupReaders; } @Override @NotNull public List getSortedColumns() { + initialize(); return sortingColumns; } + @NotNull + Map getParquetColumnNameToPath() { + initialize(); + return parquetColumnNameToPath; + } + + @Override + protected final void initializeState() { + initialize(); + } + @Override @NotNull protected ColumnLocation makeColumnLocation(@NotNull final String columnName) { final String parquetColumnName = readInstructions.getParquetColumnNameFromColumnNameOrDefault(columnName); - final String[] columnPath = parquetColumnNameToPath.get(parquetColumnName); - final List nameList = - columnPath == null ? Collections.singletonList(parquetColumnName) : Arrays.asList(columnPath); - final ColumnChunkReader[] columnChunkReaders = Arrays.stream(getRowGroupReaders()) - .map(rgr -> rgr.getColumnChunk(columnName, nameList)).toArray(ColumnChunkReader[]::new); - final boolean exists = Arrays.stream(columnChunkReaders).anyMatch(ccr -> ccr != null && ccr.numRows() > 0); - return new ParquetColumnLocation<>(this, columnName, parquetColumnName, - exists ? columnChunkReaders : null); + return new ParquetColumnLocation<>(this, columnName, parquetColumnName); } - private RowSet computeIndex() { + private RowSet computeIndex(@NotNull final RowGroup[] rowGroups) { final RowSetBuilderSequential sequentialBuilder = RowSetFactory.builderSequential(); for (int rgi = 0; rgi < rowGroups.length; ++rgi) { @@ -211,12 +220,16 @@ private RowSet computeIndex() { @Override @NotNull public List getDataIndexColumns() { - if (dataIndexes.isEmpty() && groupingColumns.isEmpty()) { + initialize(); + if (tableInfo.dataIndexes().isEmpty() && groupingColumns.isEmpty()) { return List.of(); } - final List dataIndexColumns = new ArrayList<>(dataIndexes.size() + groupingColumns.size()); + final List dataIndexColumns = + new ArrayList<>(tableInfo.dataIndexes().size() + groupingColumns.size()); // Add the data indexes to the list - dataIndexes.stream().map(di -> di.columns().toArray(String[]::new)).forEach(dataIndexColumns::add); + tableInfo.dataIndexes().stream() + .map(di -> di.columns().toArray(String[]::new)) + .forEach(dataIndexColumns::add); // Add grouping columns to the list groupingColumns.keySet().stream().map(colName -> new String[] {colName}).forEach(dataIndexColumns::add); return dataIndexColumns; @@ -224,20 +237,18 @@ public List getDataIndexColumns() { @Override public boolean hasDataIndex(@NotNull final String... columns) { + initialize(); // Check if the column name matches any of the grouping columns if (columns.length == 1 && groupingColumns.containsKey(columns[0])) { // Validate the index file exists (without loading and parsing it) - final IndexFileMetadata metadata = getIndexFileMetadata(getParquetKey().getURI(), tableInfo, columns); + final IndexFileMetadata metadata = getIndexFileMetadata(getParquetKey().getURI(), columns); return metadata != null && parquetFileExists(metadata.fileURI); } // Check if the column names match any of the data indexes - for (final DataIndexInfo dataIndex : dataIndexes) { + for (final DataIndexInfo dataIndex : tableInfo.dataIndexes()) { if (dataIndex.matchesColumns(columns)) { // Validate the index file exists (without loading and parsing it) - final IndexFileMetadata metadata = getIndexFileMetadata( - getParquetKey().getURI(), - tableInfo, - columns); + final IndexFileMetadata metadata = getIndexFileMetadata(getParquetKey().getURI(), columns); return metadata != null && parquetFileExists(metadata.fileURI); } } @@ -252,10 +263,8 @@ private static boolean parquetFileExists(@NotNull final URI fileURI) { @Override @Nullable public BasicDataIndex loadDataIndex(@NotNull final String... columns) { - if (tableInfo == null) { - return null; - } - final IndexFileMetadata indexFileMetaData = getIndexFileMetadata(getParquetKey().getURI(), tableInfo, columns); + initialize(); + final IndexFileMetadata indexFileMetaData = getIndexFileMetadata(getParquetKey().getURI(), columns); if (indexFileMetaData == null) { throw new TableDataException( String.format( @@ -301,13 +310,12 @@ private static URI makeRelativeURI(@NotNull final URI parentFileURI, @NotNull fi } } - private static IndexFileMetadata getIndexFileMetadata( + private IndexFileMetadata getIndexFileMetadata( @NotNull final URI parentFileURI, - @NotNull final TableInfo info, @NotNull final String... keyColumnNames) { if (keyColumnNames.length == 1) { // If there's only one key column, there might be (legacy) grouping info - final GroupingColumnInfo groupingColumnInfo = info.groupingColumnMap().get(keyColumnNames[0]); + final GroupingColumnInfo groupingColumnInfo = groupingColumns.get(keyColumnNames[0]); if (groupingColumnInfo != null) { return new IndexFileMetadata( makeRelativeURI(parentFileURI, groupingColumnInfo.groupingTablePath()), @@ -318,7 +326,7 @@ private static IndexFileMetadata getIndexFileMetadata( // Either there are more than 1 key columns, or there was no grouping info, so lets see if there was a // DataIndex. - final DataIndexInfo dataIndexInfo = info.dataIndexes().stream() + final DataIndexInfo dataIndexInfo = tableInfo.dataIndexes().stream() .filter(item -> item.matchesColumns(keyColumnNames)) .findFirst() .orElse(null); diff --git a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java index 530be1c5d6b..f4bc2d3ffcf 100644 --- a/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java +++ b/extensions/parquet/table/src/test/java/io/deephaven/parquet/table/ParquetTableReadWriteTest.java @@ -23,6 +23,7 @@ import io.deephaven.engine.primitive.iterator.CloseablePrimitiveIteratorOfInt; import io.deephaven.engine.primitive.iterator.CloseablePrimitiveIteratorOfLong; import io.deephaven.engine.primitive.iterator.CloseablePrimitiveIteratorOfShort; +import io.deephaven.engine.rowset.impl.TrackingWritableRowSetImpl; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.ColumnSource; import io.deephaven.engine.table.PartitionedTable; @@ -33,6 +34,7 @@ import io.deephaven.engine.table.impl.QueryTable; import io.deephaven.engine.table.impl.dataindex.DataIndexUtils; import io.deephaven.engine.table.impl.indexer.DataIndexer; +import io.deephaven.engine.table.impl.locations.ColumnLocation; import io.deephaven.engine.table.impl.locations.impl.StandaloneTableKey; import io.deephaven.engine.table.impl.select.FormulaEvaluationException; import io.deephaven.engine.table.impl.select.FunctionalColumn; @@ -86,6 +88,7 @@ import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.io.UncheckedIOException; import java.math.BigDecimal; import java.math.BigInteger; import java.net.URI; @@ -104,6 +107,7 @@ import java.util.Map; import java.util.Set; import java.util.Objects; +import java.util.function.Consumer; import java.util.function.DoubleConsumer; import java.util.function.Function; import java.util.function.IntConsumer; @@ -132,6 +136,7 @@ import static io.deephaven.parquet.table.ParquetTools.writeTables; import static io.deephaven.util.QueryConstants.*; import static org.junit.Assert.*; +import static org.junit.Assert.assertNotNull; @Category(OutOfBandTest.class) public final class ParquetTableReadWriteTest { @@ -3651,6 +3656,111 @@ public void overflowingCodecsTest() { assertEquals(columnMetadata.getEncodingStats().getNumDataPagesEncodedAs(Encoding.PLAIN), 2); } + private static void verifyMakeHandleException(final Runnable throwingRunnable) { + try { + throwingRunnable.run(); + fail("Expected UncheckedIOException"); + } catch (final UncheckedIOException e) { + assertTrue(e.getMessage().contains("makeHandle encountered exception")); + } + } + + private static void makeNewTableLocationAndVerifyNoException( + final Consumer parquetTableLocationConsumer) { + final File dest = new File(rootFile, "real.parquet"); + final Table table = TableTools.emptyTable(5).update("A=(int)i", "B=(long)i", "C=(double)i"); + DataIndexer.getOrCreateDataIndex(table, "A"); + writeTable(table, dest.getPath()); + + final ParquetTableLocationKey newTableLocationKey = + new ParquetTableLocationKey(dest.toURI(), 0, null, ParquetInstructions.EMPTY); + final ParquetTableLocation newTableLocation = + new ParquetTableLocation(StandaloneTableKey.getInstance(), newTableLocationKey, EMPTY); + + // The following operations should not throw exceptions + parquetTableLocationConsumer.accept(newTableLocation); + dest.delete(); + } + + @Test + public void testTableLocationReading() { + // Make a new ParquetTableLocation for a non-existent parquet file + final File nonExistentParquetFile = new File(rootFile, "non-existent.parquet"); + assertFalse(nonExistentParquetFile.exists()); + final ParquetTableLocationKey nonExistentTableLocationKey = + new ParquetTableLocationKey(nonExistentParquetFile.toURI(), 0, null, ParquetInstructions.EMPTY); + final ParquetTableLocation nonExistentTableLocation = + new ParquetTableLocation(StandaloneTableKey.getInstance(), nonExistentTableLocationKey, EMPTY); + + // Ensure operations don't touch the file or throw exceptions + assertEquals(nonExistentTableLocation.getTableKey(), StandaloneTableKey.getInstance()); + assertEquals(nonExistentTableLocation.getKey(), nonExistentTableLocationKey); + assertNotNull(nonExistentTableLocation.toString()); + assertNotNull(nonExistentTableLocation.asLivenessReferent()); + assertNotNull(nonExistentTableLocation.getStateLock()); + nonExistentTableLocation.refresh(); + + // Verify that we can get a column location for a non-existent column + final ColumnLocation nonExistentColumnLocation = nonExistentTableLocation.getColumnLocation("A"); + assertNotNull(nonExistentColumnLocation); + assertEquals("A", nonExistentColumnLocation.getName()); + assertEquals(nonExistentTableLocation, nonExistentColumnLocation.getTableLocation()); + assertNotNull(nonExistentColumnLocation.toString()); + assertNotNull(nonExistentColumnLocation.getImplementationName()); + + // Verify that all the following operations will fail when the file does not exist and pass when it does + // APIs from TableLocation + verifyMakeHandleException(nonExistentTableLocation::getDataIndexColumns); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getDataIndexColumns); + + verifyMakeHandleException(nonExistentTableLocation::getSortedColumns); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getSortedColumns); + + verifyMakeHandleException(nonExistentTableLocation::getColumnTypes); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getColumnTypes); + + verifyMakeHandleException(nonExistentTableLocation::hasDataIndex); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::hasDataIndex); + + // Assuming here there will be an index on column "A" + verifyMakeHandleException(nonExistentTableLocation::getDataIndex); + makeNewTableLocationAndVerifyNoException(tableLocation -> tableLocation.getDataIndex("A")); + + verifyMakeHandleException(nonExistentTableLocation::loadDataIndex); + makeNewTableLocationAndVerifyNoException(tableLocation -> tableLocation.loadDataIndex("A")); + + // APIs from TableLocationState + verifyMakeHandleException(nonExistentTableLocation::getRowSet); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getRowSet); + + verifyMakeHandleException(nonExistentTableLocation::getSize); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getSize); + + verifyMakeHandleException(nonExistentTableLocation::getLastModifiedTimeMillis); + makeNewTableLocationAndVerifyNoException(ParquetTableLocation::getLastModifiedTimeMillis); + + verifyMakeHandleException(() -> nonExistentTableLocation.handleUpdate(new TrackingWritableRowSetImpl(), 0)); + + // APIs from ColumnLocation + verifyMakeHandleException(nonExistentColumnLocation::exists); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionChar( + ColumnDefinition.fromGenericType("A", char.class, Character.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionByte( + ColumnDefinition.fromGenericType("A", byte.class, Byte.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionShort( + ColumnDefinition.fromGenericType("A", short.class, Short.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionInt( + ColumnDefinition.fromGenericType("A", int.class, Integer.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionLong( + ColumnDefinition.fromGenericType("A", long.class, Long.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionFloat( + ColumnDefinition.fromGenericType("A", float.class, Float.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionDouble( + ColumnDefinition.fromGenericType("A", double.class, Double.class))); + verifyMakeHandleException(() -> nonExistentColumnLocation.makeColumnRegionObject( + ColumnDefinition.fromGenericType("A", String.class, String.class))); + } + @Test public void readWriteStatisticsTest() { // Test simple structured table.