Skip to content

Commit

Permalink
Procedure to invalidate directory list cache
Browse files Browse the repository at this point in the history
  • Loading branch information
agrawalreetika committed Mar 6, 2024
1 parent c23d1bd commit 7afa178
Show file tree
Hide file tree
Showing 8 changed files with 215 additions and 3 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/product-tests-basic-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ jobs:
export MAVEN_OPTS="${MAVEN_INSTALL_OPTS}"
./mvnw install ${MAVEN_FAST_INSTALL} -am -pl '!presto-docs,!presto-spark-package,!presto-spark-launcher,!presto-spark-testing,!presto-test-coverage'
- name: Run Product Tests Basic Environment
run: presto-product-tests/bin/run_on_docker.sh multinode -x quarantine,big_query,storage_formats,profile_specific_tests,tpcds,cassandra,mysql_connector,postgresql_connector,mysql,kafka,avro
run: presto-product-tests/bin/run_on_docker.sh multinode -x quarantine,big_query,storage_formats,profile_specific_tests,tpcds,cassandra,mysql_connector,postgresql_connector,mysql,kafka,avro,hive_list_caching
8 changes: 8 additions & 0 deletions presto-docs/src/main/sphinx/connector/hive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -855,6 +855,14 @@ The following procedures are available:
file system paths to use lowercase (e.g. ``col_x=SomeValue``). Partitions on the file system
not conforming to this convention are ignored, unless the argument is set to ``false``.

* ``system.invalidate_directory_list_cache()``

Flush full directory list cache.

* ``system.invalidate_directory_list_cache(directory_path)``

Invalidate directory list cache for specified directory_path.

Extra Hidden Columns
--------------------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
Expand All @@ -35,6 +36,7 @@
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static java.util.Objects.requireNonNull;
Expand All @@ -44,8 +46,7 @@ public class CachingDirectoryLister
{
private final Cache<Path, List<HiveFileInfo>> cache;
private final CachedTableChecker cachedTableChecker;

protected final DirectoryLister delegate;
private final DirectoryLister delegate;

@Inject
public CachingDirectoryLister(@ForCachingDirectoryLister DirectoryLister delegate, HiveClientConfig hiveClientConfig)
Expand Down Expand Up @@ -120,6 +121,24 @@ public HiveFileInfo next()
};
}

public void invalidateDirectoryListCache(Optional<String> directoryPath)
{
if (directoryPath.isPresent()) {
if (directoryPath.get().isEmpty()) {
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Directory path can not be a empty string");
}
Path path = new Path(directoryPath.get());
List<HiveFileInfo> files = cache.getIfPresent(path);
if (files == null) {
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Given directory path is not cached : " + directoryPath);
}
cache.invalidate(path);
}
else {
flushCache();
}
}

@Managed
public void flushCache()
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;

import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
import com.facebook.presto.spi.procedure.Procedure;
import com.facebook.presto.spi.procedure.Procedure.Argument;
import com.google.common.collect.ImmutableList;

import javax.inject.Inject;
import javax.inject.Provider;

import java.lang.invoke.MethodHandle;
import java.util.Optional;

import static com.facebook.presto.common.block.MethodHandleUtil.methodHandle;
import static com.facebook.presto.common.type.StandardTypes.VARCHAR;
import static com.facebook.presto.hive.HiveSessionProperties.isUseListDirectoryCache;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_PROCEDURE_ARGUMENT;
import static java.util.Objects.requireNonNull;

public class DirectoryListCacheInvalidationProcedure
implements Provider<Procedure>
{
private static final MethodHandle CACHE_DATA_INVALIDATION = methodHandle(
DirectoryListCacheInvalidationProcedure.class,
"directoryListCacheInvalidation",
ConnectorSession.class,
String.class);

private final DirectoryLister directoryLister;

@Inject
public DirectoryListCacheInvalidationProcedure(DirectoryLister directoryLister)
{
this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
}

@Override
public Procedure get()
{
return new Procedure(
"system",
"invalidate_directory_list_cache",
ImmutableList.of(
new Argument("directory_path", VARCHAR, false, null)),
CACHE_DATA_INVALIDATION.bindTo(this));
}

public void directoryListCacheInvalidation(ConnectorSession session, String directoryPath)
{
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(getClass().getClassLoader())) {
doInvalidateDirectoryListCache(session, Optional.ofNullable(directoryPath));
}
}

private void doInvalidateDirectoryListCache(ConnectorSession session, Optional<String> directoryPath)
{
if (isUseListDirectoryCache(session)) {
CachingDirectoryLister cachingDirectoryLister = (CachingDirectoryLister) directoryLister;
cachingDirectoryLister.invalidateDirectoryListCache(directoryPath);
}
else {
throw new PrestoException(INVALID_PROCEDURE_ARGUMENT, "Directory list cache is not enabled on this catalog");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,6 @@ public void configure(Binder binder)
Multibinder<Procedure> procedures = newSetBinder(binder, Procedure.class);
procedures.addBinding().toProvider(CreateEmptyPartitionProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(SyncPartitionMetadataProcedure.class).in(Scopes.SINGLETON);
procedures.addBinding().toProvider(DirectoryListCacheInvalidationProcedure.class).in(Scopes.SINGLETON);
}
}
25 changes: 25 additions & 0 deletions presto-product-tests/conf/presto/etc/catalog/hivecached.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#
# WARNING
# ^^^^^^^
# This configuration file is for development only and should NOT be used be
# used in production. For example configuration, see the Presto documentation.
#

connector.name=hive-hadoop2
hive.metastore.uri=thrift://hadoop-master:9083
hive.metastore.thrift.client.socks-proxy=hadoop-master:1180
hive.config.resources=/docker/volumes/conf/presto/etc/hive-default-fs-site.xml
hive.allow-add-column=true
hive.allow-drop-column=true
hive.allow-rename-column=true
hive.allow-drop-table=true
hive.allow-rename-table=true
hive.metastore-cache-ttl=0s
hive.fs.cache.max-size=10
hive.max-partitions-per-scan=100
hive.collect-column-statistics-on-write=true

# List file cache
hive.file-status-cache-expire-time=24h
hive.file-status-cache-size=100000000
hive.file-status-cache-tables=*
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public final class TestGroups
public static final String KAFKA = "kafka";
public static final String AVRO = "avro";
public static final String ICEBERG = "iceberg";
public static final String HIVE_LIST_CACHING = "hive_list_caching";

private TestGroups() {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests.hive;

import io.prestodb.tempto.AfterTestWithContext;
import io.prestodb.tempto.BeforeTestWithContext;
import io.prestodb.tempto.ProductTest;
import io.prestodb.tempto.query.QueryResult;
import org.testng.annotations.Test;

import static com.facebook.presto.tests.TestGroups.HIVE_LIST_CACHING;
import static io.prestodb.tempto.query.QueryExecutor.query;
import static org.testng.Assert.assertEquals;

@Test(singleThreaded = true)
public class TestDirectoryListCacheInvalidation
extends ProductTest
{
@BeforeTestWithContext
public void setUp()
{
query("DROP TABLE IF EXISTS hivecached.default.region_cache");
query("CREATE TABLE hivecached.default.region_cache AS SELECT * FROM tpch.tiny.region");
}

@Test(groups = {HIVE_LIST_CACHING})
public void testDirectoryListCacheInvalidation()
{
String jmxMetricsQuery = "SELECT sum(hitcount), sum(misscount) from jmx.current.\"com.facebook.presto.hive:name=hivecached,type=cachingdirectorylister\"";
String regionQuery = "SELECT * FROM hivecached.default.region_cache";

// Initial cache entries, hitcount, misscount will all be zero
QueryResult queryResult = query(jmxMetricsQuery);
assertEquals((long) queryResult.row(0).get(0), 0L);
assertEquals((long) queryResult.row(0).get(1), 0L);

for (int i = 0; i < 2; i++) {
query(regionQuery);
}

QueryResult result = query(jmxMetricsQuery);

long hitCount = (long) result.row(0).get(0);
long missCount = (long) result.row(0).get(1);

assertEquals(hitCount, 1L);
assertEquals(missCount, 1L);

// Invalidate directory list cache
query("CALL hivecached.system.invalidate_directory_list_cache()");

query(regionQuery);
result = query(jmxMetricsQuery);

hitCount = (long) result.row(0).get(0);
missCount = (long) result.row(0).get(1);
// No results are cached, miss count would increase
assertEquals(hitCount, 1L);
assertEquals(missCount, 2L);
}

@AfterTestWithContext
public void tearDown()
{
query("DROP TABLE IF EXISTS hivecached.default.region_cache");
}
}

0 comments on commit 7afa178

Please sign in to comment.