From cf19b92255fc71b96560b97d6d305318a86b5576 Mon Sep 17 00:00:00 2001 From: Naveen Mahadevuni Date: Thu, 23 Jan 2025 20:16:03 +0530 Subject: [PATCH] Invalidate ORC metadata cache based on file modification time (#24346) --- .../presto/hive/HiveClientModule.java | 16 ++--- .../hive/orc/OrcPageSourceFactoryUtils.java | 3 +- .../presto/iceberg/IcebergCommonModule.java | 16 ++--- .../iceberg/IcebergPageSourceProvider.java | 4 +- .../presto/orc/AbstractOrcRecordReader.java | 6 +- .../orc/CachingStripeMetadataSource.java | 71 ++++++++++++------- .../orc/DwrfAwareStripeMetadataSource.java | 15 ++-- .../presto/orc/OrcBatchRecordReader.java | 6 +- .../com/facebook/presto/orc/OrcReader.java | 57 ++++++++++++--- .../presto/orc/OrcSelectiveRecordReader.java | 6 +- .../orc/StorageStripeMetadataSource.java | 7 +- .../presto/orc/StripeMetadataSource.java | 53 +++++++++++++- .../com/facebook/presto/orc/StripeReader.java | 11 +-- .../orc/cache/CachingOrcFileTailSource.java | 22 ++++-- .../presto/orc/cache/OrcFileTailSource.java | 2 +- .../orc/cache/StorageOrcFileTailSource.java | 4 +- .../presto/orc/metadata/OrcFileTail.java | 10 ++- .../presto/orc/AbstractTestOrcReader.java | 33 ++++++--- .../com/facebook/presto/orc/OrcTester.java | 12 ++-- .../presto/orc/TestColumnStatistics.java | 3 +- .../presto/orc/TestOrcFileIntrospection.java | 3 +- .../orc/TestOrcReaderDwrfStripeCaching.java | 3 +- .../TestOrcRecordReaderDwrfStripeCaching.java | 6 +- .../orc/TestStorageOrcFileTailSource.java | 9 +-- .../facebook/presto/orc/TestStreamLayout.java | 3 +- 25 files changed, 274 insertions(+), 107 deletions(-) diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java index 23cd40cb39594..7413c05c85071 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/HiveClientModule.java @@ -86,12 +86,10 @@ import com.google.inject.Scopes; import com.google.inject.TypeLiteral; import com.google.inject.multibindings.Multibinder; -import io.airlift.slice.Slice; import org.weakref.jmx.MBeanExporter; import javax.inject.Singleton; -import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; @@ -101,6 +99,8 @@ import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; import static com.facebook.airlift.json.smile.SmileCodecBinder.smileCodecBinder; import static com.facebook.drift.codec.guice.ThriftCodecBinder.thriftCodecBinder; +import static com.facebook.presto.orc.StripeMetadataSource.CacheableRowGroupIndices; +import static com.facebook.presto.orc.StripeMetadataSource.CacheableSlice; import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static java.lang.Math.toIntExact; @@ -320,15 +320,15 @@ public StripeMetadataSourceFactory createStripeMetadataSourceFactory(OrcCacheCon { StripeMetadataSource stripeMetadataSource = new StorageStripeMetadataSource(); if (orcCacheConfig.isStripeMetadataCacheEnabled()) { - Cache footerCache = CacheBuilder.newBuilder() + Cache footerCache = CacheBuilder.newBuilder() .maximumWeight(orcCacheConfig.getStripeFooterCacheSize().toBytes()) - .weigher((id, footer) -> toIntExact(((Slice) footer).getRetainedSize())) + .weigher((id, footer) -> toIntExact(((CacheableSlice) footer).getSlice().getRetainedSize())) .expireAfterAccess(orcCacheConfig.getStripeFooterCacheTtlSinceLastAccess().toMillis(), MILLISECONDS) .recordStats() .build(); - Cache streamCache = CacheBuilder.newBuilder() + Cache streamCache = CacheBuilder.newBuilder() .maximumWeight(orcCacheConfig.getStripeStreamCacheSize().toBytes()) - .weigher((id, stream) -> toIntExact(((Slice) stream).getRetainedSize())) + .weigher((id, stream) -> toIntExact(((CacheableSlice) stream).getSlice().getRetainedSize())) .expireAfterAccess(orcCacheConfig.getStripeStreamCacheTtlSinceLastAccess().toMillis(), MILLISECONDS) .recordStats() .build(); @@ -337,11 +337,11 @@ public StripeMetadataSourceFactory createStripeMetadataSourceFactory(OrcCacheCon exporter.export(generatedNameOf(CacheStatsMBean.class, connectorId + "_StripeFooter"), footerCacheStatsMBean); exporter.export(generatedNameOf(CacheStatsMBean.class, connectorId + "_StripeStream"), streamCacheStatsMBean); - Optional>> rowGroupIndexCache = Optional.empty(); + Optional> rowGroupIndexCache = Optional.empty(); if (orcCacheConfig.isRowGroupIndexCacheEnabled()) { rowGroupIndexCache = Optional.of(CacheBuilder.newBuilder() .maximumWeight(orcCacheConfig.getRowGroupIndexCacheSize().toBytes()) - .weigher((id, rowGroupIndices) -> toIntExact(((List) rowGroupIndices).stream().mapToLong(RowGroupIndex::getRetainedSizeInBytes).sum())) + .weigher((id, rowGroupIndices) -> toIntExact(((CacheableRowGroupIndices) rowGroupIndices).getRowGroupIndices().stream().mapToLong(RowGroupIndex::getRetainedSizeInBytes).sum())) .expireAfterAccess(orcCacheConfig.getStripeStreamCacheTtlSinceLastAccess().toMillis(), MILLISECONDS) .recordStats() .build()); diff --git a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactoryUtils.java b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactoryUtils.java index 1e3bae934eb12..5051cf97ca8ab 100644 --- a/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactoryUtils.java +++ b/presto-hive/src/main/java/com/facebook/presto/hive/orc/OrcPageSourceFactoryUtils.java @@ -104,7 +104,8 @@ public static OrcReader getOrcReader(OrcEncoding orcEncoding, List footerCache = CacheBuilder.newBuilder() + Cache footerCache = CacheBuilder.newBuilder() .maximumWeight(orcCacheConfig.getStripeFooterCacheSize().toBytes()) - .weigher((id, footer) -> toIntExact(((Slice) footer).getRetainedSize())) + .weigher((id, footer) -> toIntExact(((CacheableSlice) footer).getSlice().getRetainedSize())) .expireAfterAccess(orcCacheConfig.getStripeFooterCacheTtlSinceLastAccess().toMillis(), MILLISECONDS) .recordStats() .build(); - Cache streamCache = CacheBuilder.newBuilder() + Cache streamCache = CacheBuilder.newBuilder() .maximumWeight(orcCacheConfig.getStripeStreamCacheSize().toBytes()) - .weigher((id, stream) -> toIntExact(((Slice) stream).getRetainedSize())) + .weigher((id, stream) -> toIntExact(((CacheableSlice) stream).getSlice().getRetainedSize())) .expireAfterAccess(orcCacheConfig.getStripeStreamCacheTtlSinceLastAccess().toMillis(), MILLISECONDS) .recordStats() .build(); @@ -262,11 +262,11 @@ public StripeMetadataSourceFactory createStripeMetadataSourceFactory(OrcCacheCon exporter.export(generatedNameOf(CacheStatsMBean.class, connectorId + "_StripeFooter"), footerCacheStatsMBean); exporter.export(generatedNameOf(CacheStatsMBean.class, connectorId + "_StripeStream"), streamCacheStatsMBean); - Optional>> rowGroupIndexCache = Optional.empty(); + Optional> rowGroupIndexCache = Optional.empty(); if (orcCacheConfig.isRowGroupIndexCacheEnabled()) { rowGroupIndexCache = Optional.of(CacheBuilder.newBuilder() .maximumWeight(orcCacheConfig.getRowGroupIndexCacheSize().toBytes()) - .weigher((id, rowGroupIndices) -> toIntExact(((List) rowGroupIndices).stream().mapToLong(RowGroupIndex::getRetainedSizeInBytes).sum())) + .weigher((id, rowGroupIndices) -> toIntExact(((CacheableRowGroupIndices) rowGroupIndices).getRowGroupIndices().stream().mapToLong(RowGroupIndex::getRetainedSizeInBytes).sum())) .expireAfterAccess(orcCacheConfig.getStripeStreamCacheTtlSinceLastAccess().toMillis(), MILLISECONDS) .recordStats() .build()); diff --git a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java index 7d0aab867c02c..0dd5b46f15eb6 100644 --- a/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java +++ b/presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergPageSourceProvider.java @@ -156,6 +156,7 @@ import static com.facebook.presto.memory.context.AggregatedMemoryContext.newSimpleAggregatedMemoryContext; import static com.facebook.presto.orc.OrcEncoding.ORC; import static com.facebook.presto.orc.OrcReader.INITIAL_BATCH_SIZE; +import static com.facebook.presto.orc.OrcReader.MODIFICATION_TIME_NOT_SET; import static com.facebook.presto.parquet.ParquetTypeUtils.getColumnIO; import static com.facebook.presto.parquet.ParquetTypeUtils.getDescriptors; import static com.facebook.presto.parquet.ParquetTypeUtils.getParquetTypeByName; @@ -505,7 +506,8 @@ private static ConnectorPageSourceWithRowPositions createBatchOrcPageSource( isCacheable, dwrfEncryptionProvider, dwrfKeyProvider, - runtimeStats); + runtimeStats, + MODIFICATION_TIME_NOT_SET); List physicalColumnHandles = new ArrayList<>(regularColumns.size()); ImmutableMap.Builder includedColumns = ImmutableMap.builder(); diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/AbstractOrcRecordReader.java b/presto-orc/src/main/java/com/facebook/presto/orc/AbstractOrcRecordReader.java index bfcae98d8f7ef..0d0a07ada9b96 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/AbstractOrcRecordReader.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/AbstractOrcRecordReader.java @@ -155,7 +155,8 @@ public AbstractOrcRecordReader( StripeMetadataSource stripeMetadataSource, boolean cacheable, RuntimeStats runtimeStats, - Optional fileIntrospector) + Optional fileIntrospector, + long fileModificationTime) { requireNonNull(includedColumns, "includedColumns is null"); requireNonNull(predicate, "predicate is null"); @@ -262,7 +263,8 @@ public AbstractOrcRecordReader( cacheable, this.dwrfEncryptionGroupMap, runtimeStats, - fileIntrospector); + fileIntrospector, + fileModificationTime); this.streamReaders = requireNonNull(streamReaders, "streamReaders is null"); for (int columnId = 0; columnId < root.getFieldCount(); columnId++) { diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/CachingStripeMetadataSource.java b/presto-orc/src/main/java/com/facebook/presto/orc/CachingStripeMetadataSource.java index 1aede72f6c803..50c88f8873b04 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/CachingStripeMetadataSource.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/CachingStripeMetadataSource.java @@ -34,7 +34,6 @@ import java.util.Map; import java.util.Map.Entry; import java.util.Optional; -import java.util.concurrent.ExecutionException; import static com.facebook.presto.common.RuntimeUnit.BYTE; import static com.facebook.presto.common.RuntimeUnit.NONE; @@ -48,11 +47,11 @@ public class CachingStripeMetadataSource implements StripeMetadataSource { private final StripeMetadataSource delegate; - private final Cache footerSliceCache; - private final Cache stripeStreamCache; - private final Optional>> rowGroupIndexCache; + private final Cache footerSliceCache; + private final Cache stripeStreamCache; + private final Optional> rowGroupIndexCache; - public CachingStripeMetadataSource(StripeMetadataSource delegate, Cache footerSliceCache, Cache stripeStreamCache, Optional>> rowGroupIndexCache) + public CachingStripeMetadataSource(StripeMetadataSource delegate, Cache footerSliceCache, Cache stripeStreamCache, Optional> rowGroupIndexCache) { this.delegate = requireNonNull(delegate, "delegate is null"); this.footerSliceCache = requireNonNull(footerSliceCache, "footerSliceCache is null"); @@ -61,39 +60,56 @@ public CachingStripeMetadataSource(StripeMetadataSource delegate, Cache delegate.getStripeFooterSlice(orcDataSource, stripeId, footerOffset, footerLength, cacheable)); + cacheableSlice = new CacheableSlice(delegate.getStripeFooterSlice(orcDataSource, stripeId, footerOffset, footerLength, cacheable, fileModificationTime), fileModificationTime); + footerSliceCache.put(stripeId, cacheableSlice); + return cacheableSlice.getSlice(); } - catch (ExecutionException | UncheckedExecutionException e) { + catch (UncheckedExecutionException e) { throwIfInstanceOf(e.getCause(), IOException.class); throw new IOException("Unexpected error in stripe footer reading after footerSliceCache miss", e.getCause()); } } @Override - public Map getInputs(OrcDataSource orcDataSource, StripeId stripeId, Map diskRanges, boolean cacheable) + public Map getInputs(OrcDataSource orcDataSource, StripeId stripeId, Map diskRanges, boolean cacheable, long fileModificationTime) throws IOException { if (!cacheable) { - return delegate.getInputs(orcDataSource, stripeId, diskRanges, cacheable); + return delegate.getInputs(orcDataSource, stripeId, diskRanges, cacheable, fileModificationTime); } // Fetch existing stream slice from cache ImmutableMap.Builder inputsBuilder = ImmutableMap.builder(); ImmutableMap.Builder uncachedDiskRangesBuilder = ImmutableMap.builder(); for (Entry entry : diskRanges.entrySet()) { + StripeStreamId stripeStreamId = new StripeStreamId(stripeId, entry.getKey()); if (isCachedStream(entry.getKey().getStreamKind())) { - Slice streamSlice = stripeStreamCache.getIfPresent(new StripeStreamId(stripeId, entry.getKey())); - if (streamSlice != null) { - inputsBuilder.put(entry.getKey(), new OrcDataSourceInput(new BasicSliceInput(streamSlice), streamSlice.length())); + CacheableSlice streamSlice = stripeStreamCache.getIfPresent(stripeStreamId); + if (streamSlice != null && streamSlice.getFileModificationTime() == fileModificationTime) { + inputsBuilder.put(entry.getKey(), new OrcDataSourceInput(new BasicSliceInput(streamSlice.getSlice()), streamSlice.getSlice().length())); } else { + if (streamSlice != null) { + stripeStreamCache.invalidate(stripeStreamId); + // This get call is to increment the miss count for invalidated entries so the stats are recorded correctly. + stripeStreamCache.getIfPresent(stripeStreamId); + } uncachedDiskRangesBuilder.put(entry); } } @@ -103,12 +119,12 @@ public Map getInputs(OrcDataSource orcDataSource, } // read ranges and update cache - Map uncachedInputs = delegate.getInputs(orcDataSource, stripeId, uncachedDiskRangesBuilder.build(), cacheable); + Map uncachedInputs = delegate.getInputs(orcDataSource, stripeId, uncachedDiskRangesBuilder.build(), cacheable, fileModificationTime); for (Entry entry : uncachedInputs.entrySet()) { if (isCachedStream(entry.getKey().getStreamKind())) { // We need to rewind the input after eagerly reading the slice. Slice streamSlice = Slices.wrappedBuffer(entry.getValue().getInput().readSlice(toIntExact(entry.getValue().getInput().length())).getBytes()); - stripeStreamCache.put(new StripeStreamId(stripeId, entry.getKey()), streamSlice); + stripeStreamCache.put(new StripeStreamId(stripeId, entry.getKey()), new CacheableSlice(streamSlice, fileModificationTime)); inputsBuilder.put(entry.getKey(), new OrcDataSourceInput(new BasicSliceInput(streamSlice), toIntExact(streamSlice.getRetainedSize()))); } else { @@ -126,24 +142,31 @@ public List getRowIndexes( StreamId streamId, OrcInputStream inputStream, List bloomFilters, - RuntimeStats runtimeStats) + RuntimeStats runtimeStats, + long fileModificationTime) throws IOException { + StripeStreamId stripeStreamId = new StripeStreamId(stripId, streamId); if (rowGroupIndexCache.isPresent()) { - List rowGroupIndices = rowGroupIndexCache.get().getIfPresent(new StripeStreamId(stripId, streamId)); - if (rowGroupIndices != null) { + CacheableRowGroupIndices cacheableRowGroupIndices = rowGroupIndexCache.get().getIfPresent(stripeStreamId); + if (cacheableRowGroupIndices != null && cacheableRowGroupIndices.getFileModificationTime() == fileModificationTime) { runtimeStats.addMetricValue("OrcRowGroupIndexCacheHit", NONE, 1); - runtimeStats.addMetricValue("OrcRowGroupIndexInMemoryBytesRead", BYTE, rowGroupIndices.stream().mapToLong(RowGroupIndex::getRetainedSizeInBytes).sum()); - return rowGroupIndices; + runtimeStats.addMetricValue("OrcRowGroupIndexInMemoryBytesRead", BYTE, cacheableRowGroupIndices.getRowGroupIndices().stream().mapToLong(RowGroupIndex::getRetainedSizeInBytes).sum()); + return cacheableRowGroupIndices.getRowGroupIndices(); } else { + if (cacheableRowGroupIndices != null) { + rowGroupIndexCache.get().invalidate(stripeStreamId); + // This get call is to increment the miss count for invalidated entries so the stats are recorded correctly. + rowGroupIndexCache.get().getIfPresent(stripeStreamId); + } runtimeStats.addMetricValue("OrcRowGroupIndexCacheHit", NONE, 0); runtimeStats.addMetricValue("OrcRowGroupIndexStorageBytesRead", BYTE, inputStream.getRetainedSizeInBytes()); } } - List rowGroupIndices = delegate.getRowIndexes(metadataReader, hiveWriterVersion, stripId, streamId, inputStream, bloomFilters, runtimeStats); + List rowGroupIndices = delegate.getRowIndexes(metadataReader, hiveWriterVersion, stripId, streamId, inputStream, bloomFilters, runtimeStats, fileModificationTime); if (rowGroupIndexCache.isPresent()) { - rowGroupIndexCache.get().put(new StripeStreamId(stripId, streamId), rowGroupIndices); + rowGroupIndexCache.get().put(stripeStreamId, new CacheableRowGroupIndices(rowGroupIndices, fileModificationTime)); } return rowGroupIndices; } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/DwrfAwareStripeMetadataSource.java b/presto-orc/src/main/java/com/facebook/presto/orc/DwrfAwareStripeMetadataSource.java index c2c0ad56f9dba..3f4994029b065 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/DwrfAwareStripeMetadataSource.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/DwrfAwareStripeMetadataSource.java @@ -52,23 +52,23 @@ public DwrfAwareStripeMetadataSource(StripeMetadataSource delegate, DwrfStripeCa } @Override - public Slice getStripeFooterSlice(OrcDataSource orcDataSource, StripeId stripeId, long footerOffset, int footerLength, boolean cacheable) + public Slice getStripeFooterSlice(OrcDataSource orcDataSource, StripeId stripeId, long footerOffset, int footerLength, boolean cacheable, long fileModificationTime) throws IOException { Optional stripeFooterSlice = stripeCache.getStripeFooterSlice(stripeId, footerLength); if (stripeFooterSlice.isPresent()) { return stripeFooterSlice.get(); } - return delegate.getStripeFooterSlice(orcDataSource, stripeId, footerOffset, footerLength, cacheable); + return delegate.getStripeFooterSlice(orcDataSource, stripeId, footerOffset, footerLength, cacheable, fileModificationTime); } @Override - public Map getInputs(OrcDataSource orcDataSource, StripeId stripeId, Map diskRanges, boolean cacheable) + public Map getInputs(OrcDataSource orcDataSource, StripeId stripeId, Map diskRanges, boolean cacheable, long fileModificationTime) throws IOException { Optional stripeCacheIndexStreamsSlice = stripeCache.getIndexStreamsSlice(stripeId); if (!stripeCacheIndexStreamsSlice.isPresent()) { - return delegate.getInputs(orcDataSource, stripeId, diskRanges, cacheable); + return delegate.getInputs(orcDataSource, stripeId, diskRanges, cacheable, fileModificationTime); } Slice cacheSlice = stripeCacheIndexStreamsSlice.get(); @@ -91,7 +91,7 @@ public Map getInputs(OrcDataSource orcDataSource, ImmutableMap dataStreams = dataStreamsBuilder.build(); if (!dataStreams.isEmpty()) { - Map dataStreamInputs = delegate.getInputs(orcDataSource, stripeId, dataStreams, cacheable); + Map dataStreamInputs = delegate.getInputs(orcDataSource, stripeId, dataStreams, cacheable, fileModificationTime); inputsBuilder.putAll(dataStreamInputs); } @@ -106,9 +106,10 @@ public List getRowIndexes( StreamId streamId, OrcInputStream inputStream, List bloomFilters, - RuntimeStats runtimeStats) + RuntimeStats runtimeStats, + long fileModificationTime) throws IOException { - return delegate.getRowIndexes(metadataReader, hiveWriterVersion, stripeId, streamId, inputStream, bloomFilters, runtimeStats); + return delegate.getRowIndexes(metadataReader, hiveWriterVersion, stripeId, streamId, inputStream, bloomFilters, runtimeStats, fileModificationTime); } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcBatchRecordReader.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcBatchRecordReader.java index b957d012ea648..cbe13b7527d99 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcBatchRecordReader.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcBatchRecordReader.java @@ -68,7 +68,8 @@ public OrcBatchRecordReader( int initialBatchSize, StripeMetadataSource stripeMetadataSource, boolean cacheable, - RuntimeStats runtimeStats) + RuntimeStats runtimeStats, + long fileModificationTime) throws OrcCorruptionException { @@ -107,7 +108,8 @@ public OrcBatchRecordReader( stripeMetadataSource, cacheable, runtimeStats, - Optional.empty()); + Optional.empty(), + fileModificationTime); } public int nextBatch() diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcReader.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcReader.java index b3227dff84e9d..5921070a41dfd 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcReader.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcReader.java @@ -66,6 +66,7 @@ public class OrcReader public static final int MAX_BATCH_SIZE = 1024; public static final int INITIAL_BATCH_SIZE = 1; public static final int BATCH_SIZE_GROWTH_FACTOR = 2; + public static final long MODIFICATION_TIME_NOT_SET = 0L; private final OrcDataSource orcDataSource; private final ExceptionWrappingMetadataReader metadataReader; @@ -86,10 +87,11 @@ public class OrcReader private final OrcReaderOptions orcReaderOptions; private final boolean cacheable; + private final long fileModificationTime; private final RuntimeStats runtimeStats; - // This is based on the Apache Hive ORC code + @VisibleForTesting public OrcReader( OrcDataSource orcDataSource, OrcEncoding orcEncoding, @@ -102,6 +104,35 @@ public OrcReader( DwrfKeyProvider dwrfKeyProvider, RuntimeStats runtimeStats) throws IOException + { + this( + orcDataSource, + orcEncoding, + orcFileTailSource, + StripeMetadataSourceFactory.of(stripeMetadataSource), + aggregatedMemoryContext, + orcReaderOptions, + cacheable, + dwrfEncryptionProvider, + dwrfKeyProvider, + runtimeStats, + MODIFICATION_TIME_NOT_SET); + } + + // This is based on the Apache Hive ORC code + public OrcReader( + OrcDataSource orcDataSource, + OrcEncoding orcEncoding, + OrcFileTailSource orcFileTailSource, + StripeMetadataSource stripeMetadataSource, + OrcAggregatedMemoryContext aggregatedMemoryContext, + OrcReaderOptions orcReaderOptions, + boolean cacheable, + DwrfEncryptionProvider dwrfEncryptionProvider, + DwrfKeyProvider dwrfKeyProvider, + RuntimeStats runtimeStats, + long fileModificationTime) + throws IOException { this( orcDataSource, @@ -115,7 +146,8 @@ public OrcReader( dwrfEncryptionProvider, dwrfKeyProvider, runtimeStats, - Optional.empty()); + Optional.empty(), + fileModificationTime); } public OrcReader( @@ -128,7 +160,8 @@ public OrcReader( boolean cacheable, DwrfEncryptionProvider dwrfEncryptionProvider, DwrfKeyProvider dwrfKeyProvider, - RuntimeStats runtimeStats) + RuntimeStats runtimeStats, + long fileModificationTime) throws IOException { this( @@ -143,7 +176,8 @@ public OrcReader( dwrfEncryptionProvider, dwrfKeyProvider, runtimeStats, - Optional.empty()); + Optional.empty(), + fileModificationTime); } OrcReader( @@ -158,7 +192,8 @@ public OrcReader( DwrfEncryptionProvider dwrfEncryptionProvider, DwrfKeyProvider dwrfKeyProvider, RuntimeStats runtimeStats, - Optional fileIntrospector) + Optional fileIntrospector, + long fileModificationTime) throws IOException { this.orcReaderOptions = requireNonNull(orcReaderOptions, "orcReaderOptions is null"); @@ -170,7 +205,7 @@ public OrcReader( this.writeValidation = requireNonNull(writeValidation, "writeValidation is null"); this.fileIntrospector = requireNonNull(fileIntrospector, "fileIntrospector is null"); - OrcFileTail orcFileTail = orcFileTailSource.getOrcFileTail(orcDataSource, metadataReader, writeValidation, cacheable); + OrcFileTail orcFileTail = orcFileTailSource.getOrcFileTail(orcDataSource, metadataReader, writeValidation, cacheable, fileModificationTime); fileIntrospector.ifPresent(introspector -> introspector.onFileTail(orcFileTail)); this.bufferSize = orcFileTail.getBufferSize(); @@ -234,6 +269,7 @@ public OrcReader( } this.cacheable = requireNonNull(cacheable, "cacheable is null"); + this.fileModificationTime = fileModificationTime; Optional dwrfStripeCache = Optional.empty(); if (orcFileTail.getDwrfStripeCacheData().isPresent() && footer.getDwrfStripeCacheOffsets().isPresent()) { @@ -366,7 +402,8 @@ public OrcBatchRecordReader createBatchRecordReader( initialBatchSize, stripeMetadataSource, cacheable, - runtimeStats); + runtimeStats, + fileModificationTime); } public OrcSelectiveRecordReader createSelectiveRecordReader( @@ -420,7 +457,8 @@ public OrcSelectiveRecordReader createSelectiveRecordReader( stripeMetadataSource, cacheable, runtimeStats, - fileIntrospector); + fileIntrospector, + fileModificationTime); } private static OrcDataSource wrapWithCacheIfTiny(OrcDataSource dataSource, DataSize maxCacheSize, OrcAggregatedMemoryContext systemMemoryContext) @@ -463,7 +501,8 @@ static void validateFile( dwrfEncryptionProvider, dwrfKeyProvider, new RuntimeStats(), - Optional.empty()); + Optional.empty(), + MODIFICATION_TIME_NOT_SET); try (OrcBatchRecordReader orcRecordReader = orcReader.createBatchRecordReader( readTypes.build(), OrcPredicate.TRUE, diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/OrcSelectiveRecordReader.java b/presto-orc/src/main/java/com/facebook/presto/orc/OrcSelectiveRecordReader.java index 9defe56b2da04..92c5cc1f2d5aa 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/OrcSelectiveRecordReader.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/OrcSelectiveRecordReader.java @@ -188,7 +188,8 @@ public OrcSelectiveRecordReader( StripeMetadataSource stripeMetadataSource, boolean cacheable, RuntimeStats runtimeStats, - Optional fileIntrospector) + Optional fileIntrospector, + long fileModificationTime) { super(includedColumns, requiredSubfields, @@ -231,7 +232,8 @@ public OrcSelectiveRecordReader( stripeMetadataSource, cacheable, runtimeStats, - fileIntrospector); + fileIntrospector, + fileModificationTime); // Hive column indices can't be used to index into arrays because they are negative // for partition and hidden columns. Hence, we create synthetic zero-based indices. diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/StorageStripeMetadataSource.java b/presto-orc/src/main/java/com/facebook/presto/orc/StorageStripeMetadataSource.java index eac745e21b61c..397dba1071e4d 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/StorageStripeMetadataSource.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/StorageStripeMetadataSource.java @@ -32,7 +32,7 @@ public class StorageStripeMetadataSource implements StripeMetadataSource { @Override - public Slice getStripeFooterSlice(OrcDataSource orcDataSource, StripeId stripeId, long footerOffset, int footerLength, boolean cacheable) + public Slice getStripeFooterSlice(OrcDataSource orcDataSource, StripeId stripeId, long footerOffset, int footerLength, boolean cacheable, long fileModificationTime) throws IOException { byte[] tailBuffer = new byte[footerLength]; @@ -41,7 +41,7 @@ public Slice getStripeFooterSlice(OrcDataSource orcDataSource, StripeId stripeId } @Override - public Map getInputs(OrcDataSource orcDataSource, StripeId stripeId, Map diskRanges, boolean cacheable) + public Map getInputs(OrcDataSource orcDataSource, StripeId stripeId, Map diskRanges, boolean cacheable, long fileModificationTime) throws IOException { // @@ -68,7 +68,8 @@ public List getRowIndexes( StreamId streamId, OrcInputStream inputStream, List bloomFilters, - RuntimeStats runtimeStats) + RuntimeStats runtimeStats, + long fileModificationTime) throws IOException { return metadataReader.readRowIndexes(hiveWriterVersion, inputStream, bloomFilters); diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/StripeMetadataSource.java b/presto-orc/src/main/java/com/facebook/presto/orc/StripeMetadataSource.java index d8723f7a8e4df..59746fab887ec 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/StripeMetadataSource.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/StripeMetadataSource.java @@ -26,16 +26,19 @@ import java.util.List; import java.util.Map; +import static java.util.Objects.requireNonNull; + public interface StripeMetadataSource { - Slice getStripeFooterSlice(OrcDataSource orcDataSource, StripeId stripeId, long footerOffset, int footerLength, boolean cacheable) + Slice getStripeFooterSlice(OrcDataSource orcDataSource, StripeId stripeId, long footerOffset, int footerLength, boolean cacheable, long fileModificationTime) throws IOException; Map getInputs( OrcDataSource orcDataSource, StripeId stripeId, Map diskRanges, - boolean cacheable) + boolean cacheable, + long fileModificationTime) throws IOException; List getRowIndexes( @@ -45,6 +48,50 @@ List getRowIndexes( StreamId streamId, OrcInputStream inputStream, List bloomFilters, - RuntimeStats runtimeStats) + RuntimeStats runtimeStats, + long fileModificationTime) throws IOException; + + class CacheableSlice + { + private final Slice slice; + private final long fileModificationTime; + + CacheableSlice(Slice slice, long fileModificationTime) + { + this.slice = requireNonNull(slice, "slice is null"); + this.fileModificationTime = fileModificationTime; + } + + public Slice getSlice() + { + return slice; + } + + public long getFileModificationTime() + { + return fileModificationTime; + } + } + + class CacheableRowGroupIndices + { + private final List rowGroupIndices; + private final long fileModificationTime; + + public CacheableRowGroupIndices(List rowGroupIndices, long fileModificationTime) + { + this.rowGroupIndices = requireNonNull(rowGroupIndices, "rowGroupIndices is null"); + this.fileModificationTime = fileModificationTime; + } + public List getRowGroupIndices() + { + return rowGroupIndices; + } + + public long getFileModificationTime() + { + return fileModificationTime; + } + } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/StripeReader.java b/presto-orc/src/main/java/com/facebook/presto/orc/StripeReader.java index 7fd42fbba896f..06f7b5c9c2e4f 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/StripeReader.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/StripeReader.java @@ -96,6 +96,7 @@ public class StripeReader private final Optional writeValidation; private final StripeMetadataSource stripeMetadataSource; private final boolean cacheable; + private final long fileModificationTime; private final Multimap dwrfEncryptionGroupColumns; private final RuntimeStats runtimeStats; private final Optional fileIntrospector; @@ -114,7 +115,8 @@ public StripeReader( boolean cacheable, Map dwrfEncryptionGroupMap, RuntimeStats runtimeStats, - Optional fileIntrospector) + Optional fileIntrospector, + long fileModificationTime) { this.orcDataSource = requireNonNull(orcDataSource, "orcDataSource is null"); this.decompressor = requireNonNull(decompressor, "decompressor is null"); @@ -130,6 +132,7 @@ public StripeReader( this.dwrfEncryptionGroupColumns = invertEncryptionGroupMap(requireNonNull(dwrfEncryptionGroupMap, "dwrfEncryptionGroupMap is null")); this.runtimeStats = requireNonNull(runtimeStats, "runtimeStats is null"); this.fileIntrospector = requireNonNull(fileIntrospector, "fileIntrospector is null"); + this.fileModificationTime = fileModificationTime; } private Multimap invertEncryptionGroupMap(Map dwrfEncryptionGroupMap) @@ -346,7 +349,7 @@ private Map readDiskRanges( // // read ranges - Map streamsData = stripeMetadataSource.getInputs(orcDataSource, stripeId, diskRanges, cacheable); + Map streamsData = stripeMetadataSource.getInputs(orcDataSource, stripeId, diskRanges, cacheable, fileModificationTime); // transform streams to OrcInputStream ImmutableMap.Builder streamsBuilder = ImmutableMap.builder(); @@ -484,7 +487,7 @@ public StripeFooter readStripeFooter(StripeId stripeId, StripeInformation stripe int footerLength = toIntExact(stripe.getFooterLength()); // read the footer - Slice footerSlice = stripeMetadataSource.getStripeFooterSlice(orcDataSource, stripeId, footerOffset, footerLength, cacheable); + Slice footerSlice = stripeMetadataSource.getStripeFooterSlice(orcDataSource, stripeId, footerOffset, footerLength, cacheable, fileModificationTime); try (InputStream inputStream = new OrcInputStream( orcDataSource.getId(), // Memory is not accounted as the buffer is expected to be tiny and will be immediately discarded @@ -531,7 +534,7 @@ private Map> readColumnIndexes(Map bloomFilters = bloomFilterIndexes.get(streamId.getColumn()); - List rowGroupIndexes = stripeMetadataSource.getRowIndexes(metadataReader, hiveWriterVersion, stripeId, streamId, inputStream, bloomFilters, runtimeStats); + List rowGroupIndexes = stripeMetadataSource.getRowIndexes(metadataReader, hiveWriterVersion, stripeId, streamId, inputStream, bloomFilters, runtimeStats, fileModificationTime); columnIndexes.put(entry.getKey(), rowGroupIndexes); } } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/cache/CachingOrcFileTailSource.java b/presto-orc/src/main/java/com/facebook/presto/orc/cache/CachingOrcFileTailSource.java index 4c1b8a8fdb933..50eece4c4f685 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/cache/CachingOrcFileTailSource.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/cache/CachingOrcFileTailSource.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.util.Optional; -import java.util.concurrent.ExecutionException; import static com.google.common.base.Throwables.throwIfInstanceOf; import static java.util.Objects.requireNonNull; @@ -41,16 +40,27 @@ public CachingOrcFileTailSource(OrcFileTailSource delegate, Cache writeValidation, boolean cacheable) + public OrcFileTail getOrcFileTail(OrcDataSource orcDataSource, MetadataReader metadataReader, Optional writeValidation, boolean cacheable, long fileModificationTime) throws IOException { + if (!cacheable) { + return delegate.getOrcFileTail(orcDataSource, metadataReader, writeValidation, cacheable, fileModificationTime); + } try { - if (cacheable) { - return cache.get(orcDataSource.getId(), () -> delegate.getOrcFileTail(orcDataSource, metadataReader, writeValidation, cacheable)); + OrcFileTail orcFileTail = cache.getIfPresent(orcDataSource.getId()); + if (orcFileTail != null) { + if (orcFileTail.getFileModificationTime() == fileModificationTime) { + return orcFileTail; + } + cache.invalidate(orcDataSource.getId()); // stale entry + // This get call is to increment the miss count for invalidated entries so the stats are recorded correctly. + cache.getIfPresent(orcDataSource.getId()); } - return delegate.getOrcFileTail(orcDataSource, metadataReader, writeValidation, cacheable); + orcFileTail = delegate.getOrcFileTail(orcDataSource, metadataReader, writeValidation, cacheable, fileModificationTime); + cache.put(orcDataSource.getId(), orcFileTail); + return orcFileTail; } - catch (ExecutionException | UncheckedExecutionException e) { + catch (UncheckedExecutionException e) { throwIfInstanceOf(e.getCause(), IOException.class); throw new IOException("Unexpected error in orc file tail reading after cache miss", e.getCause()); } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/cache/OrcFileTailSource.java b/presto-orc/src/main/java/com/facebook/presto/orc/cache/OrcFileTailSource.java index 8033f78f37539..0c972cd515698 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/cache/OrcFileTailSource.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/cache/OrcFileTailSource.java @@ -23,6 +23,6 @@ public interface OrcFileTailSource { - OrcFileTail getOrcFileTail(OrcDataSource orcDataSource, MetadataReader metadataReader, Optional writeValidation, boolean cacheable) + OrcFileTail getOrcFileTail(OrcDataSource orcDataSource, MetadataReader metadataReader, Optional writeValidation, boolean cacheable, long fileModificationTime) throws IOException; } diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/cache/StorageOrcFileTailSource.java b/presto-orc/src/main/java/com/facebook/presto/orc/cache/StorageOrcFileTailSource.java index 3406c1320a916..fde107d722592 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/cache/StorageOrcFileTailSource.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/cache/StorageOrcFileTailSource.java @@ -64,7 +64,7 @@ public StorageOrcFileTailSource(int expectedFooterSizeInBytes, boolean dwrfStrip } @Override - public OrcFileTail getOrcFileTail(OrcDataSource orcDataSource, MetadataReader metadataReader, Optional writeValidation, boolean cacheable) + public OrcFileTail getOrcFileTail(OrcDataSource orcDataSource, MetadataReader metadataReader, Optional writeValidation, boolean cacheable, long fileModificationTime) throws IOException { long size = orcDataSource.getSize(); @@ -162,7 +162,7 @@ public OrcFileTail getOrcFileTail(OrcDataSource orcDataSource, MetadataReader me dwrfStripeCacheData = Optional.of(new DwrfStripeCacheData(dwrfStripeCacheSlice, dwrfStripeCacheSize, stripeCacheMode)); } - return new OrcFileTail(hiveWriterVersion, bufferSize, compressionKind, footerSlice, footerSize, metadataSlice, metadataSize, dwrfStripeCacheData); + return new OrcFileTail(hiveWriterVersion, bufferSize, compressionKind, footerSlice, footerSize, metadataSlice, metadataSize, dwrfStripeCacheData, fileModificationTime); } /** diff --git a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/OrcFileTail.java b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/OrcFileTail.java index d639449df08a4..6f726edbef3e3 100644 --- a/presto-orc/src/main/java/com/facebook/presto/orc/metadata/OrcFileTail.java +++ b/presto-orc/src/main/java/com/facebook/presto/orc/metadata/OrcFileTail.java @@ -30,6 +30,7 @@ public class OrcFileTail private final Slice metadataSlice; private final int metadataSize; private final Optional dwrfStripeCacheData; + private final long fileModificationTime; public OrcFileTail( HiveWriterVersion hiveWriterVersion, @@ -39,7 +40,8 @@ public OrcFileTail( int footerSize, Slice metadataSlice, int metadataSize, - Optional dwrfStripeCacheData) + Optional dwrfStripeCacheData, + long fileModificationTime) { this.hiveWriterVersion = requireNonNull(hiveWriterVersion, "hiveWriterVersion is null"); this.bufferSize = bufferSize; @@ -49,6 +51,7 @@ public OrcFileTail( this.metadataSlice = requireNonNull(metadataSlice, "metadataSlice is null"); this.metadataSize = metadataSize; this.dwrfStripeCacheData = requireNonNull(dwrfStripeCacheData, "dwrfStripeCacheData is null"); + this.fileModificationTime = fileModificationTime; } public HiveWriterVersion getHiveWriterVersion() @@ -104,4 +107,9 @@ public int getTotalSize() { return getFooterSize() + getMetadataSize() + getDwrfStripeCacheSize(); } + + public long getFileModificationTime() + { + return fileModificationTime; + } } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/AbstractTestOrcReader.java b/presto-orc/src/test/java/com/facebook/presto/orc/AbstractTestOrcReader.java index db80111decebd..457acb71be179 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/AbstractTestOrcReader.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/AbstractTestOrcReader.java @@ -36,7 +36,6 @@ import com.google.common.collect.ImmutableList.Builder; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Range; -import io.airlift.slice.Slice; import io.airlift.units.DataSize; import io.airlift.units.Duration; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; @@ -79,6 +78,8 @@ import static com.facebook.presto.orc.OrcTester.createCustomOrcRecordReader; import static com.facebook.presto.orc.OrcTester.createOrcRecordWriter; import static com.facebook.presto.orc.OrcTester.createSettableStructObjectInspector; +import static com.facebook.presto.orc.StripeMetadataSource.CacheableRowGroupIndices; +import static com.facebook.presto.orc.StripeMetadataSource.CacheableSlice; import static com.facebook.presto.testing.DateTimeTestingUtils.sqlTimestampOf; import static com.facebook.presto.testing.TestingConnectorSession.SESSION; import static com.google.common.collect.Iterables.concat; @@ -202,32 +203,32 @@ public void testCaching() .build(); OrcFileTailSource orcFileTailSource = new CachingOrcFileTailSource(new StorageOrcFileTailSource(), orcFileTailCache); - Cache stripeFootercache = CacheBuilder.newBuilder() + Cache stripeFootercache = CacheBuilder.newBuilder() .maximumWeight(new DataSize(1, MEGABYTE).toBytes()) - .weigher((id, footer) -> ((Slice) footer).length()) + .weigher((id, footer) -> ((CacheableSlice) footer).getSlice().length()) .expireAfterAccess(new Duration(10, MINUTES).toMillis(), MILLISECONDS) .recordStats() .build(); - Cache stripeStreamCache = CacheBuilder.newBuilder() + Cache stripeStreamCache = CacheBuilder.newBuilder() .maximumWeight(new DataSize(1, MEGABYTE).toBytes()) - .weigher((id, stream) -> ((Slice) stream).length()) + .weigher((id, stream) -> ((CacheableSlice) stream).getSlice().length()) .expireAfterAccess(new Duration(10, MINUTES).toMillis(), MILLISECONDS) .recordStats() .build(); - Optional>> rowGroupIndexCache = Optional.of(CacheBuilder.newBuilder() + Optional> rowGroupIndexCache = Optional.of(CacheBuilder.newBuilder() .maximumWeight(new DataSize(1, MEGABYTE).toBytes()) - .weigher((id, rowGroupIndices) -> toIntExact(((List) rowGroupIndices).stream().mapToLong(RowGroupIndex::getRetainedSizeInBytes).sum())) + .weigher((id, rowGroupIndices) -> toIntExact(((CacheableRowGroupIndices) rowGroupIndices).getRowGroupIndices().stream().mapToLong(RowGroupIndex::getRetainedSizeInBytes).sum())) .expireAfterAccess(new Duration(10, MINUTES).toMillis(), MILLISECONDS) .recordStats() .build()); StripeMetadataSource stripeMetadataSource = new CachingStripeMetadataSource(new StorageStripeMetadataSource(), stripeFootercache, stripeStreamCache, rowGroupIndexCache); try (TempFile tempFile = createTempFile(10001)) { - OrcBatchRecordReader storageReader = createCustomOrcRecordReader(tempFile, ORC, OrcPredicate.TRUE, ImmutableList.of(BIGINT), INITIAL_BATCH_SIZE, orcFileTailSource, stripeMetadataSource, true, ImmutableMap.of(), false); + OrcBatchRecordReader storageReader = createCustomOrcRecordReader(tempFile, ORC, OrcPredicate.TRUE, ImmutableList.of(BIGINT), INITIAL_BATCH_SIZE, orcFileTailSource, stripeMetadataSource, true, ImmutableMap.of(), false, tempFile.getFile().lastModified()); assertEquals(orcFileTailCache.stats().missCount(), 1); assertEquals(orcFileTailCache.stats().hitCount(), 0); - OrcBatchRecordReader cacheReader = createCustomOrcRecordReader(tempFile, ORC, OrcPredicate.TRUE, ImmutableList.of(BIGINT), INITIAL_BATCH_SIZE, orcFileTailSource, stripeMetadataSource, true, ImmutableMap.of(), false); + OrcBatchRecordReader cacheReader = createCustomOrcRecordReader(tempFile, ORC, OrcPredicate.TRUE, ImmutableList.of(BIGINT), INITIAL_BATCH_SIZE, orcFileTailSource, stripeMetadataSource, true, ImmutableMap.of(), false, tempFile.getFile().lastModified()); assertEquals(orcFileTailCache.stats().missCount(), 1); assertEquals(orcFileTailCache.stats().hitCount(), 1); @@ -250,6 +251,20 @@ public void testCaching() assertEquals(rowGroupIndexCache.get().stats().missCount(), 1); assertEquals(rowGroupIndexCache.get().stats().hitCount(), 1); assertEquals(storageReader.readBlock(0).getInt(0), cacheReader.readBlock(0).getInt(0)); + + // Test cache invalidation based on file modified time. + long fileModificationTime = System.currentTimeMillis(); + // This read will invalidate the entry and increases the hit count and miss count + cacheReader = createCustomOrcRecordReader(tempFile, ORC, OrcPredicate.TRUE, ImmutableList.of(BIGINT), INITIAL_BATCH_SIZE, orcFileTailSource, stripeMetadataSource, true, ImmutableMap.of(), false, fileModificationTime); + assertEquals(orcFileTailCache.stats().missCount(), 2); + assertEquals(orcFileTailCache.stats().hitCount(), 2); + cacheReader.nextBatch(); + assertEquals(stripeFootercache.stats().missCount(), 2); + assertEquals(stripeFootercache.stats().hitCount(), 2); + assertEquals(stripeStreamCache.stats().missCount(), 4); + assertEquals(stripeStreamCache.stats().hitCount(), 4); + assertEquals(rowGroupIndexCache.get().stats().missCount(), 2); + assertEquals(rowGroupIndexCache.get().stats().hitCount(), 2); } } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/OrcTester.java b/presto-orc/src/test/java/com/facebook/presto/orc/OrcTester.java index 3c6a2497b6b78..1018c310c5619 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/OrcTester.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/OrcTester.java @@ -155,6 +155,7 @@ import static com.facebook.presto.orc.NoOpOrcWriterStats.NOOP_WRITER_STATS; import static com.facebook.presto.orc.NoopOrcAggregatedMemoryContext.NOOP_ORC_AGGREGATED_MEMORY_CONTEXT; import static com.facebook.presto.orc.OrcReader.MAX_BATCH_SIZE; +import static com.facebook.presto.orc.OrcReader.MODIFICATION_TIME_NOT_SET; import static com.facebook.presto.orc.OrcTester.Format.DWRF; import static com.facebook.presto.orc.OrcTester.Format.ORC_11; import static com.facebook.presto.orc.OrcTester.Format.ORC_12; @@ -1143,7 +1144,7 @@ public static void assertFileContentsPresto( return; } - try (OrcBatchRecordReader recordReader = createCustomOrcRecordReader(tempFile, orcEncoding, orcPredicate, types, MAX_BATCH_SIZE, new StorageOrcFileTailSource(), new StorageStripeMetadataSource(), false, intermediateEncryptionKeys, false)) { + try (OrcBatchRecordReader recordReader = createCustomOrcRecordReader(tempFile, orcEncoding, orcPredicate, types, MAX_BATCH_SIZE, new StorageOrcFileTailSource(), new StorageStripeMetadataSource(), false, intermediateEncryptionKeys, false, tempFile.getFile().lastModified())) { assertEquals(recordReader.getReaderPosition(), 0); assertEquals(recordReader.getFilePosition(), 0); @@ -1561,7 +1562,8 @@ static OrcBatchRecordReader createCustomOrcRecordReader(TempFile tempFile, OrcEn new StorageStripeMetadataSource(), cacheable, ImmutableMap.of(), - mapNullKeysEnabled); + mapNullKeysEnabled, + MODIFICATION_TIME_NOT_SET); } static OrcBatchRecordReader createCustomOrcRecordReader( @@ -1574,7 +1576,8 @@ static OrcBatchRecordReader createCustomOrcRecordReader( StripeMetadataSource stripeMetadataSource, boolean cacheable, Map intermediateEncryptionKeys, - boolean mapNullKeysEnabled) + boolean mapNullKeysEnabled, + long fileModificationTime) throws IOException { OrcDataSource orcDataSource = new FileOrcDataSource(tempFile.getFile(), new DataSize(1, MEGABYTE), new DataSize(1, MEGABYTE), new DataSize(1, MEGABYTE), true); @@ -1593,7 +1596,8 @@ static OrcBatchRecordReader createCustomOrcRecordReader( cacheable, new DwrfEncryptionProvider(new UnsupportedEncryptionLibrary(), new TestingEncryptionLibrary()), DwrfKeyProvider.of(intermediateEncryptionKeys), - new RuntimeStats()); + new RuntimeStats(), + fileModificationTime); assertEquals(orcReader.getFooter().getRowsInRowGroup(), 10_000); diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestColumnStatistics.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestColumnStatistics.java index 810cc5ac011bc..2057f2e384080 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestColumnStatistics.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestColumnStatistics.java @@ -959,7 +959,8 @@ private static CapturingOrcFileIntrospector introspectOrcFile(Type type, TempFil NO_ENCRYPTION, DwrfKeyProvider.EMPTY, new RuntimeStats(), - Optional.of(introspector)); + Optional.of(introspector), + file.getFile().lastModified()); OrcSelectiveRecordReader recordReader = reader.createSelectiveRecordReader( ImmutableMap.of(0, type), diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcFileIntrospection.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcFileIntrospection.java index 920bf21905810..283b5d1c2f67b 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcFileIntrospection.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcFileIntrospection.java @@ -140,7 +140,8 @@ private static void readFile(Type type, CapturingOrcFileIntrospector introspecto DwrfEncryptionProvider.NO_ENCRYPTION, DwrfKeyProvider.EMPTY, new RuntimeStats(), - Optional.of(introspector)); + Optional.of(introspector), + tempFile.getFile().lastModified()); OrcSelectiveRecordReader recordReader = reader.createSelectiveRecordReader( ImmutableMap.of(0, type), diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcReaderDwrfStripeCaching.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcReaderDwrfStripeCaching.java index 821a5a0355ec6..b0c5a23b710f7 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcReaderDwrfStripeCaching.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcReaderDwrfStripeCaching.java @@ -254,7 +254,8 @@ private Optional getDwrfStripeCache(File orcFile) false, NO_ENCRYPTION, DwrfKeyProvider.EMPTY, - new RuntimeStats()); + new RuntimeStats(), + orcFile.lastModified()); return stripeMetadataSourceFactory.getDwrfStripeCache(); } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcRecordReaderDwrfStripeCaching.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcRecordReaderDwrfStripeCaching.java index 6727be978e584..9d19cfb8b4521 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcRecordReaderDwrfStripeCaching.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestOrcRecordReaderDwrfStripeCaching.java @@ -156,7 +156,8 @@ private void assertFileContentCachingEnabled(File orcFile, List forbi false, NO_ENCRYPTION, DwrfKeyProvider.EMPTY, - new RuntimeStats()); + new RuntimeStats(), + orcFile.lastModified()); assertRecordValues(orcDataSource, orcReader); @@ -181,7 +182,8 @@ private void assertFileContentCachingDisabled(File orcFile) false, NO_ENCRYPTION, DwrfKeyProvider.EMPTY, - new RuntimeStats()); + new RuntimeStats(), + orcFile.lastModified()); assertRecordValues(orcDataSource, orcReader); } diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestStorageOrcFileTailSource.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestStorageOrcFileTailSource.java index 582d17b850bd4..abb3ce0f52723 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestStorageOrcFileTailSource.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestStorageOrcFileTailSource.java @@ -34,6 +34,7 @@ import java.io.OutputStream; import java.util.Optional; +import static com.facebook.presto.orc.OrcReader.MODIFICATION_TIME_NOT_SET; import static com.facebook.presto.orc.metadata.DwrfStripeCacheMode.INDEX_AND_FOOTER; import static com.facebook.presto.orc.proto.DwrfProto.CompressionKind.NONE; import static com.facebook.presto.orc.proto.DwrfProto.StripeCacheMode.BOTH; @@ -91,7 +92,7 @@ public void testReadExpectedFooterSize() int expectedFooterSizeInBytes = 567; StorageOrcFileTailSource src = new StorageOrcFileTailSource(expectedFooterSizeInBytes, false); TestingOrcDataSource orcDataSource = new TestingOrcDataSource(createFileOrcDataSource()); - src.getOrcFileTail(orcDataSource, metadataReader, Optional.empty(), false); + src.getOrcFileTail(orcDataSource, metadataReader, Optional.empty(), false, MODIFICATION_TIME_NOT_SET); // make sure only the configured expectedFooterSizeInBytes bytes have been read assertEquals(orcDataSource.getReadCount(), 1); @@ -121,7 +122,7 @@ public void testSkipDwrfStripeCacheIfDisabled() // read the file tail with the disabled "read dwrf stripe cache" feature StorageOrcFileTailSource src = new StorageOrcFileTailSource(tailReadSizeInBytes, false); TestingOrcDataSource orcDataSource = new TestingOrcDataSource(createFileOrcDataSource()); - OrcFileTail orcFileTail = src.getOrcFileTail(orcDataSource, metadataReader, Optional.empty(), false); + OrcFileTail orcFileTail = src.getOrcFileTail(orcDataSource, metadataReader, Optional.empty(), false, MODIFICATION_TIME_NOT_SET); assertEquals(orcFileTail.getMetadataSize(), 0); DwrfProto.Footer actualFooter = readFooter(orcFileTail); @@ -160,7 +161,7 @@ public void testReadDwrfStripeCacheIfEnabled() // read the file tail with the enabled "read dwrf stripe cache" feature StorageOrcFileTailSource src = new StorageOrcFileTailSource(FOOTER_READ_SIZE_IN_BYTES, true); OrcDataSource orcDataSource = createFileOrcDataSource(); - OrcFileTail orcFileTail = src.getOrcFileTail(orcDataSource, metadataReader, Optional.empty(), false); + OrcFileTail orcFileTail = src.getOrcFileTail(orcDataSource, metadataReader, Optional.empty(), false, MODIFICATION_TIME_NOT_SET); assertEquals(orcFileTail.getMetadataSize(), 0); DwrfProto.Footer actualFooter = readFooter(orcFileTail); @@ -190,7 +191,7 @@ public void testReadDwrfStripeCacheIfEnabledButAbsent() // read the file tail with the enabled "read dwrf stripe cache" feature StorageOrcFileTailSource src = new StorageOrcFileTailSource(FOOTER_READ_SIZE_IN_BYTES, true); OrcDataSource orcDataSource = createFileOrcDataSource(); - OrcFileTail orcFileTail = src.getOrcFileTail(orcDataSource, metadataReader, Optional.empty(), false); + OrcFileTail orcFileTail = src.getOrcFileTail(orcDataSource, metadataReader, Optional.empty(), false, MODIFICATION_TIME_NOT_SET); assertEquals(orcFileTail.getMetadataSize(), 0); DwrfProto.Footer actualFooter = readFooter(orcFileTail); diff --git a/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java b/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java index d148898233ed4..9c26584f82e1a 100644 --- a/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java +++ b/presto-orc/src/test/java/com/facebook/presto/orc/TestStreamLayout.java @@ -735,7 +735,8 @@ private static void readFileWithIntrospector(Type type, CapturingOrcFileIntrospe DwrfEncryptionProvider.NO_ENCRYPTION, DwrfKeyProvider.EMPTY, new RuntimeStats(), - Optional.of(introspector)); + Optional.of(introspector), + tempFile.getFile().lastModified()); OrcSelectiveRecordReader recordReader = reader.createSelectiveRecordReader( ImmutableMap.of(0, type),