Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Iceberg] Add manifest file caching for HMS-based deployments #24481

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion presto-docs/src/main/sphinx/connector/iceberg.rst
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,8 @@ Property Name Description
``iceberg.io.manifest.cache.max-total-bytes`` Maximum size of cache size in bytes. ``104857600``

``iceberg.io.manifest.cache.expiration-interval-ms`` Maximum time duration in milliseconds for which an entry ``60000``
stays in the manifest cache.
stays in the manifest cache. Set to 0 to disable entry
expiration.

``iceberg.io.manifest.cache.max-content-length`` Maximum length of a manifest file to be considered for ``8388608``
caching in bytes. Manifest files with a length exceeding
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;

import com.facebook.airlift.log.Logger;
import org.apache.iceberg.io.ByteBufferInputStream;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;

import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static org.apache.iceberg.io.IOUtil.readRemaining;

public class HdfsCachedInputFile
implements InputFile
{
private static final Logger LOG = Logger.get(HdfsCachedInputFile.class);

private final InputFile delegate;
private final ManifestFileCacheKey cacheKey;
private final ManifestFileCache cache;

public HdfsCachedInputFile(InputFile delegate, ManifestFileCacheKey cacheKey, ManifestFileCache cache)
{
this.delegate = requireNonNull(delegate, "delegate is null");
this.cacheKey = requireNonNull(cacheKey, "cacheKey is null");
this.cache = requireNonNull(cache, "cache is null");
}

@Override
public long getLength()
{
ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
if (cachedContent != null) {
return cachedContent.getLength();
}
return delegate.getLength();
}

@Override
public SeekableInputStream newStream()
{
ManifestFileCachedContent cachedContent = cache.getIfPresent(cacheKey);
if (cachedContent != null) {
return ByteBufferInputStream.wrap(cachedContent.getData());
}

long fileLength = delegate.getLength();
if (cache.isEnabled() && fileLength <= cache.getMaxFileLength()) {
try {
ManifestFileCachedContent content = readFully(delegate, fileLength, cache.getBufferChunkSize());
cache.put(cacheKey, content);
cache.recordFileSize(content.getLength());
return ByteBufferInputStream.wrap(content.getData());
}
catch (IOException e) {
LOG.warn("Failed to cache input file: {}. Falling back to direct read.", delegate.location(), e);
}
}

return delegate.newStream();
}

@Override
public String location()
{
return delegate.location();
}

@Override
public boolean exists()
{
return cache.getIfPresent(cacheKey) != null || delegate.exists();
}

private static ManifestFileCachedContent readFully(InputFile input, long fileLength, long chunkSize)
throws IOException
{
try (SeekableInputStream stream = input.newStream()) {
long totalBytesToRead = fileLength;
List<ByteBuffer> buffers = new ArrayList<>(
((int) (fileLength / chunkSize)) +
(fileLength % chunkSize == 0 ? 0 : 1));

while (totalBytesToRead > 0) {
int bytesToRead = (int) Math.min(chunkSize, totalBytesToRead);
byte[] buf = new byte[bytesToRead];
int bytesRead = readRemaining(stream, buf, 0, bytesToRead);
totalBytesToRead -= bytesRead;

if (bytesRead < bytesToRead) {
throw new IOException(
format("Failed to read %d bytes from file %s : %d bytes read.",
fileLength, input.location(), fileLength - totalBytesToRead));
}
else {
buffers.add(ByteBuffer.wrap(buf));
}
}
return new ManifestFileCachedContent(buffers, fileLength);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,30 @@
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.spi.PrestoException;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;

import java.io.IOException;
import java.util.Optional;

import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class HdfsFileIO
implements FileIO
{
private final HdfsEnvironment environment;
private final HdfsContext context;
private final ManifestFileCache manifestFileCache;

public HdfsFileIO(HdfsEnvironment environment, HdfsContext context)
public HdfsFileIO(ManifestFileCache manifestFileCache, HdfsEnvironment environment, HdfsContext context)
{
this.environment = requireNonNull(environment, "environment is null");
this.context = requireNonNull(context, "context is null");
this.manifestFileCache = requireNonNull(manifestFileCache, "manifestFileCache is null");
}

@Override
Expand All @@ -44,6 +49,25 @@ public InputFile newInputFile(String path)
return new HdfsInputFile(new Path(path), environment, context);
}

@Override
public InputFile newInputFile(String path, long length)
{
return new HdfsInputFile(new Path(path), environment, context, Optional.of(length));
}

@Override
public InputFile newInputFile(ManifestFile manifest)
{
checkArgument(
manifest.keyMetadata() == null,
"Cannot decrypt manifest: %s (use EncryptingFileIO)",
manifest.path());
InputFile inputFile = new HdfsInputFile(new Path(manifest.path()), environment, context, Optional.of(manifest.length()));
return manifestFileCache.isEnabled() ?
new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(manifest.path()), manifestFileCache) :
inputFile;
}

@Override
public OutputFile newOutputFile(String path)
{
Expand All @@ -61,4 +85,12 @@ public void deleteFile(String pathString)
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to delete file: " + path, e);
}
}

protected InputFile newCachedInputFile(String path)
{
InputFile inputFile = new HdfsInputFile(new Path(path), environment, context);
return manifestFileCache.isEnabled() ?
new HdfsCachedInputFile(inputFile, new ManifestFileCacheKey(path), manifestFileCache) :
inputFile;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.iceberg.io.SeekableInputStream;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;

import static com.facebook.presto.iceberg.IcebergErrorCode.ICEBERG_FILESYSTEM_ERROR;
import static java.util.Objects.requireNonNull;
Expand All @@ -32,25 +34,42 @@ public class HdfsInputFile
private final InputFile delegate;
private final HdfsEnvironment environment;
private final String user;
private final AtomicLong length;

public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context, Optional<Long> length)
{
requireNonNull(path, "path is null");
this.environment = requireNonNull(environment, "environment is null");
this.length = new AtomicLong(length.orElse(-1L));
requireNonNull(context, "context is null");
try {
this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
if (this.length.get() < 0) {
this.delegate = HadoopInputFile.fromPath(path, environment.getFileSystem(context, path), environment.getConfiguration(context, path));
}
else {
this.delegate = HadoopInputFile.fromPath(path, this.length.get(), environment.getFileSystem(context, path), environment.getConfiguration(context, path));
}
}
catch (IOException e) {
throw new PrestoException(ICEBERG_FILESYSTEM_ERROR, "Failed to create input file: " + path, e);
}
this.user = context.getIdentity().getUser();
}

public HdfsInputFile(Path path, HdfsEnvironment environment, HdfsContext context)
{
this(path, environment, context, Optional.empty());
}

@Override
public long getLength()
{
return environment.doAs(user, delegate::getLength);
return length.updateAndGet(value -> {
if (value < 0) {
return environment.doAs(user, delegate::getLength);
}
return value;
});
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public class HiveTableOperations
private final String tableName;
private final Optional<String> owner;
private final Optional<String> location;
private final FileIO fileIO;
private final HdfsFileIO fileIO;
private final IcebergHiveTableOperationsConfig config;

private TableMetadata currentMetadata;
Expand All @@ -121,10 +121,11 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
ManifestFileCache manifestFileCache,
String database,
String table)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
Expand All @@ -140,12 +141,13 @@ public HiveTableOperations(
HdfsEnvironment hdfsEnvironment,
HdfsContext hdfsContext,
IcebergHiveTableOperationsConfig config,
ManifestFileCache manifestFileCache,
String database,
String table,
String owner,
String location)
{
this(new HdfsFileIO(hdfsEnvironment, hdfsContext),
this(new HdfsFileIO(manifestFileCache, hdfsEnvironment, hdfsContext),
metastore,
metastoreContext,
config,
Expand All @@ -156,7 +158,7 @@ public HiveTableOperations(
}

private HiveTableOperations(
FileIO fileIO,
HdfsFileIO fileIO,
ExtendedHiveMetastore metastore,
MetastoreContext metastoreContext,
IcebergHiveTableOperationsConfig config,
Expand Down Expand Up @@ -409,7 +411,7 @@ private void refreshFromMetadataLocation(String newLocation)
config.getTableRefreshMaxRetryTime().toMillis(),
config.getTableRefreshBackoffScaleFactor())
.run(metadataLocation -> newMetadata.set(
TableMetadataParser.read(fileIO, io().newInputFile(metadataLocation))));
TableMetadataParser.read(fileIO, fileIO.newCachedInputFile(metadataLocation))));
}
catch (RuntimeException e) {
throw new TableNotFoundException(getSchemaTableName(), "Table metadata is missing", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@

import javax.inject.Singleton;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -213,6 +215,26 @@ public StatisticsFileCache createStatisticsFileCache(IcebergConfig config, MBean
return statisticsFileCache;
}

@Singleton
@Provides
public ManifestFileCache createManifestFileCache(IcebergConfig config, MBeanExporter exporter)
{
CacheBuilder<ManifestFileCacheKey, ManifestFileCachedContent> delegate = CacheBuilder.newBuilder()
Copy link
Member

@agrawalreetika agrawalreetika Mar 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two Questions here -

  1. Did we consider using the Caffeine cache itself for Manifest file caching? Could Caffeine Window TinyLfu eviction be effective here for optimal hit rate?
  2. For the cache key, we can just use the Manifest file path string, or are there issues with that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opted to create a new type for the cache key even though it's just the file in case we need to modify it more easily in the future

As for caffeine, I spoke about this with Anant, but it's more complicated to use them due to a lack of utility classes and existing methods for Jmx metric support. It's a large enough effort that moving to Caffeine should be a separate PR. I have a draft up already for this. However, it still needs a little bit of work before it's ready to review #24608

.maximumWeight(config.getMaxManifestCacheSize())
.<ManifestFileCacheKey, ManifestFileCachedContent>weigher((key, entry) -> (int) entry.getData().stream().mapToLong(ByteBuffer::capacity).sum())
.recordStats();
if (config.getManifestCacheExpireDuration() > 0) {
delegate.expireAfterWrite(Duration.ofMillis(config.getManifestCacheExpireDuration()));
}
ManifestFileCache manifestFileCache = new ManifestFileCache(
delegate.build(),
config.getManifestCachingEnabled(),
config.getManifestCacheMaxContentLength(),
config.getManifestCacheMaxChunkSize().toBytes());
exporter.export(generatedNameOf(ManifestFileCache.class, connectorId), manifestFileCache);
return manifestFileCache;
}

@ForCachingHiveMetastore
@Singleton
@Provides
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,11 @@ public class IcebergConfig

private EnumSet<ColumnStatisticType> hiveStatisticsMergeFlags = EnumSet.noneOf(ColumnStatisticType.class);
private String fileIOImpl = HadoopFileIO.class.getName();
private boolean manifestCachingEnabled;
private boolean manifestCachingEnabled = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this intended?

Copy link
Contributor Author

@ZacBlanco ZacBlanco Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is intentional. Performance is significantly worse with it disabled, and I don't think there are any known downsides to making this enabled by default other than an increased memory footprint

private long maxManifestCacheSize = IO_MANIFEST_CACHE_MAX_TOTAL_BYTES_DEFAULT;
private long manifestCacheExpireDuration = IO_MANIFEST_CACHE_EXPIRATION_INTERVAL_MS_DEFAULT;
private long manifestCacheMaxContentLength = IO_MANIFEST_CACHE_MAX_CONTENT_LENGTH_DEFAULT;
private DataSize manifestCacheMaxChunkSize = succinctDataSize(2, MEGABYTE);
private int splitManagerThreads = Runtime.getRuntime().availableProcessors();
private DataSize maxStatisticsFileCacheSize = succinctDataSize(256, MEGABYTE);

Expand Down Expand Up @@ -362,6 +363,20 @@ public IcebergConfig setManifestCacheMaxContentLength(long manifestCacheMaxConte
return this;
}

public DataSize getManifestCacheMaxChunkSize()
{
return manifestCacheMaxChunkSize;
}

@Min(1024)
@Config("iceberg.io.manifest.cache.max-chunk-size")
@ConfigDescription("Maximum length of a buffer used to cache manifest file content. Only applicable to HIVE catalog.")
public IcebergConfig setManifestCacheMaxChunkSize(DataSize manifestCacheMaxChunkSize)
{
this.manifestCacheMaxChunkSize = manifestCacheMaxChunkSize;
return this;
}

@Min(0)
public int getSplitManagerThreads()
{
Expand Down
Loading
Loading