Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core persistence refactor phase 1 - extract BasePersistence and BaseMetaStoreManager to isolate all "transaction" behaviors #1070

Merged
Merged
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
efbb666
Remove all tightly coupled EntityCache dependencies in the main persi…
dennishuo Feb 24, 2025
04a636e
Add support for actually giving null EntityCache to Resolver; paramet…
dennishuo Feb 24, 2025
31a837b
Remove PolarisMetaStoreSession from FileIOFactory/FileIOUtil in favor…
dennishuo Feb 24, 2025
dc820d7
Merge branch 'main' of github.com:dennishuo/polaris into dhuo-persist…
dennishuo Feb 25, 2025
7d8942f
Restructure persistence class hierarchy and remove vestigial interfaces
dennishuo Feb 25, 2025
54b762e
Remove all "entitiesDropped" members; these were vestigial from tryin…
dennishuo Feb 25, 2025
c7c9c45
Extract BasePersistence interface as parent interface of PolarisMetaS…
dennishuo Feb 25, 2025
db9e71d
Merge branch 'main' of github.com:dennishuo/polaris into dhuo-persist…
dennishuo Feb 26, 2025
87a8010
Push all evidence of the two-phase lookupEntityByName into only the t…
dennishuo Feb 26, 2025
13ba161
Turn PolarisMetaStoreSession into an abstract class and make lookupEn…
dennishuo Feb 26, 2025
5743dd2
Pushdown all calls to writeToEntities into PolarisMetaStoreSession, a…
dennishuo Feb 26, 2025
b739fd1
Add originalEntity to the writeEntity method to enable compare-and-sw…
dennishuo Feb 26, 2025
b52a4af
Break out external-integration related methods from BasePersistence i…
dennishuo Feb 26, 2025
f9c1fd7
Improve javadoc comments, rename PolarisEntityActiveRecord to EntityN…
dennishuo Feb 27, 2025
f0f06ad
Rename PolarisMetaStoreSession to TransactionalPersistence and move i…
dennishuo Feb 27, 2025
ea0a90f
Also move the PolarisTreeMap* classes into the transactional package
dennishuo Feb 27, 2025
f7d602c
Move PolarisMetaStoreManagerImpl into the "transactional" package per…
dennishuo Feb 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Remove PolarisMetaStoreSession from FileIOFactory/FileIOUtil in favor…
… of CallContext

This appeared to be some leaky divergence that occurred after CallContext had been
removed, but PolarisMetaStoreSession really is only a low-level implementation detail
that should never be handled by BasePolarisCatalog/FileIOFactory.

This plumbs CallContext explicitly into the FileIOFactory and FileIOUtil methods and
thus removes a large source of CallContext.getCurrentContext calls; now the threadlocal
doesn't have to be set at all in BasePolarisCatalogTest.
dennishuo committed Feb 24, 2025
commit 31a837be25b18e783d9698a01923f84076013570
Original file line number Diff line number Diff line change
@@ -161,6 +161,7 @@ public Map<String, String> getConfigOverrides() {
@Inject PolarisDiagnostics diagServices;

private BasePolarisCatalog catalog;
private CallContext callContext;
private AwsStorageConfigInfo storageConfigModel;
private StsClient stsClient;
private String realmName;
@@ -199,8 +200,7 @@ public void before(TestInfo testInfo) {
new PolarisEntityManager(
metaStoreManager, new StorageCredentialCache(), new EntityCache(metaStoreManager));

CallContext callContext = CallContext.of(realmContext, polarisContext);
CallContext.setCurrentContext(callContext);
callContext = CallContext.of(realmContext, polarisContext);

PrincipalEntity rootEntity =
new PrincipalEntity(
@@ -527,7 +527,7 @@ public void testValidateNotificationFailToCreateFileIO() {
final String tableMetadataLocation = tableLocation + "metadata/";
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
CallContext.getCurrentContext(), entityManager, securityContext, catalog().name());
callContext, entityManager, securityContext, catalog().name());
FileIOFactory fileIOFactory =
spy(
new DefaultFileIOFactory(
@@ -538,7 +538,7 @@ public void testValidateNotificationFailToCreateFileIO() {
new BasePolarisCatalog(
entityManager,
metaStoreManager,
CallContext.getCurrentContext(),
callContext,
passthroughView,
securityContext,
Mockito.mock(TaskExecutor.class),
@@ -854,7 +854,6 @@ public void testUpdateNotificationCreateTableWithLocalFilePrefix() {
.setName(catalogWithoutStorage)
.build());

CallContext callContext = CallContext.getCurrentContext();
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, catalogWithoutStorage);
@@ -919,7 +918,6 @@ public void testUpdateNotificationCreateTableWithHttpPrefix() {
.setName(catalogName)
.build());

CallContext callContext = CallContext.getCurrentContext();
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, catalogName);
@@ -1434,7 +1432,7 @@ public void testDropTableWithPurge() {
new RealmEntityManagerFactory(metaStoreManagerFactory),
metaStoreManagerFactory,
configurationStore))
.apply(taskEntity, () -> realmName);
.apply(taskEntity, callContext);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class);
}

@@ -1461,8 +1459,6 @@ public void testDropTableWithPurgeDisabled() {
.addProperty(PolarisConfiguration.DROP_WITH_PURGE_ENABLED.catalogConfig(), "false")
.setStorageConfigurationInfo(noPurgeStorageConfigModel, storageLocation)
.build());
RealmContext realmContext = () -> "realm";
CallContext callContext = CallContext.of(realmContext, polarisContext);
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, noPurgeCatalogName);
@@ -1542,9 +1538,6 @@ public void testRetriableException() {

@Test
public void testFileIOWrapper() {
RealmContext realmContext = () -> "realm";
CallContext callContext = CallContext.of(realmContext, polarisContext);
CallContext.setCurrentContext(callContext);
PolarisPassthroughResolutionView passthroughView =
new PolarisPassthroughResolutionView(
callContext, entityManager, securityContext, CATALOG_NAME);
@@ -1600,15 +1593,15 @@ public void testFileIOWrapper() {
new FileIOFactory() {
@Override
public FileIO loadFileIO(
@NotNull RealmContext realmContext,
@NotNull CallContext callContext,
@NotNull String ioImplClassName,
@NotNull Map<String, String> properties,
@NotNull TableIdentifier identifier,
@NotNull Set<String> tableLocations,
@NotNull Set<PolarisStorageActions> storageActions,
@NotNull PolarisResolvedPathWrapper resolvedEntityPath) {
return measured.loadFileIO(
realmContext,
callContext,
"org.apache.iceberg.inmemory.InMemoryFileIO",
Map.of(),
TABLE,
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) {
new FileIOFactory() {
@Override
public FileIO loadFileIO(
@NotNull RealmContext realmContext,
@NotNull CallContext callContext,
@NotNull String ioImplClassName,
@NotNull Map<String, String> properties,
@NotNull TableIdentifier identifier,
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ private TaskFileIOSupplier buildTaskFileIOSupplier(FileIO fileIO) {
new FileIOFactory() {
@Override
public FileIO loadFileIO(
@Nonnull RealmContext realmContext,
@Nonnull CallContext callContext,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
Original file line number Diff line number Diff line change
@@ -836,10 +836,9 @@ public Map<String, String> getCredentialConfig(
return Map.of();
}
return FileIOUtil.refreshCredentials(
callContext.getRealmContext(),
callContext,
entityManager,
getCredentialVendor(),
callContext.getPolarisCallContext().getMetaStore(),
callContext.getPolarisCallContext().getConfigurationStore(),
tableIdentifier,
getLocationsAllowedToBeAccessed(tableMetadata),
@@ -1614,7 +1613,7 @@ private FileIO loadFileIOForTableLike(
// Reload fileIO based on table specific context
FileIO fileIO =
fileIOFactory.loadFileIO(
callContext.getRealmContext(),
callContext,
ioImplClassName,
tableProperties,
identifier,
@@ -2077,13 +2076,7 @@ private FileIO loadFileIO(String ioImpl, Map<String, String> properties) {
new PolarisResolvedPathWrapper(List.of(resolvedCatalogEntity));
Set<PolarisStorageActions> storageActions = Set.of(PolarisStorageActions.ALL);
return fileIOFactory.loadFileIO(
callContext.getRealmContext(),
ioImpl,
properties,
identifier,
locations,
storageActions,
resolvedPath);
callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath);
}

private void blockedUserSpecifiedWriteLocation(Map<String, String> properties) {
Original file line number Diff line number Diff line change
@@ -32,6 +32,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
@@ -70,13 +71,14 @@ public DefaultFileIOFactory(

@Override
public FileIO loadFileIO(
@Nonnull RealmContext realmContext,
@Nonnull CallContext callContext,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
@Nonnull Set<String> tableLocations,
@Nonnull Set<PolarisStorageActions> storageActions,
@Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
RealmContext realmContext = callContext.getRealmContext();
PolarisEntityManager entityManager =
realmEntityManagerFactory.getOrCreateEntityManager(realmContext);
PolarisCredentialVendor credentialVendor =
@@ -93,10 +95,9 @@ public FileIO loadFileIO(
.map(
storageInfo ->
FileIOUtil.refreshCredentials(
realmContext,
callContext,
entityManager,
credentialVendor,
metaStoreSession,
configurationStore,
identifier,
tableLocations,
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
import java.util.Set;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisStorageActions;

@@ -41,7 +41,7 @@ public interface FileIOFactory {
* <p>This method may obtain subscoped credentials to restrict the FileIO's permissions, ensuring
* secure and limited access to the table's data and locations.
*
* @param realmContext the realm for which the FileIO is being loaded.
* @param callContext the call for which the FileIO is being loaded.
* @param ioImplClassName the class name of the FileIO implementation to load.
* @param properties configuration properties for the FileIO.
* @param identifier the table identifier.
@@ -51,7 +51,7 @@ public interface FileIOFactory {
* @return a configured FileIO instance.
*/
FileIO loadFileIO(
@Nonnull RealmContext realmContext,
@Nonnull CallContext callContext,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
Original file line number Diff line number Diff line change
@@ -25,11 +25,9 @@
import org.apache.polaris.core.PolarisConfiguration;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.PolarisEntity;
import org.apache.polaris.core.entity.PolarisEntityConstants;
import org.apache.polaris.core.persistence.PolarisEntityManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisCredentialVendor;
import org.apache.polaris.core.storage.PolarisStorageActions;
@@ -78,16 +76,14 @@ public static Optional<PolarisEntity> findStorageInfoFromHierarchy(
* </ul>
*/
public static Map<String, String> refreshCredentials(
RealmContext realmContext,
CallContext callContext,
PolarisEntityManager entityManager,
PolarisCredentialVendor credentialVendor,
PolarisMetaStoreSession metaStoreSession,
PolarisConfigurationStore configurationStore,
TableIdentifier tableIdentifier,
Set<String> tableLocations,
Set<PolarisStorageActions> storageActions,
PolarisEntity entity) {
CallContext callContext = CallContext.getCurrentContext();

boolean skipCredentialSubscopingIndirection =
configurationStore.getConfiguration(
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisStorageActions;
@@ -52,7 +52,7 @@ public WasbTranslatingFileIOFactory(

@Override
public FileIO loadFileIO(
@Nonnull RealmContext realmContext,
@Nonnull CallContext callContext,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
@@ -61,7 +61,7 @@ public FileIO loadFileIO(
@Nonnull PolarisResolvedPathWrapper resolvedEntityPath) {
return new WasbTranslatingFileIO(
defaultFileIOFactory.loadFileIO(
realmContext,
callContext,
ioImplClassName,
properties,
identifier,
Original file line number Diff line number Diff line change
@@ -73,7 +73,7 @@ public boolean canHandleTask(TaskEntity task) {
public boolean handleTask(TaskEntity task, CallContext callContext) {
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
TableIdentifier tableId = cleanupTask.getTableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext.getRealmContext())) {
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, callContext)) {
if (task.getTaskType() == AsyncTaskType.MANIFEST_FILE_CLEANUP) {
ManifestFile manifestFile = decodeManifestData(cleanupTask.getManifestFileData());
return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
Original file line number Diff line number Diff line change
@@ -89,7 +89,7 @@ public boolean handleTask(TaskEntity cleanupTask, CallContext callContext) {
// It's likely the cleanupTask has already been completed, but wasn't dropped successfully.
// Log a
// warning and move on
try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext.getRealmContext())) {
try (FileIO fileIO = fileIOSupplier.apply(cleanupTask, callContext)) {
if (!TaskUtils.exists(tableEntity.getMetadataLocation(), fileIO)) {
LOGGER
.atWarn()
Original file line number Diff line number Diff line change
@@ -28,7 +28,7 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.entity.PolarisTaskConstants;
import org.apache.polaris.core.entity.TableLikeEntity;
import org.apache.polaris.core.entity.TaskEntity;
@@ -38,7 +38,7 @@
import org.apache.polaris.service.catalog.io.FileIOFactory;

@ApplicationScoped
public class TaskFileIOSupplier implements BiFunction<TaskEntity, RealmContext, FileIO> {
public class TaskFileIOSupplier implements BiFunction<TaskEntity, CallContext, FileIO> {
private final FileIOFactory fileIOFactory;

@Inject
@@ -47,7 +47,7 @@ public TaskFileIOSupplier(FileIOFactory fileIOFactory) {
}

@Override
public FileIO apply(TaskEntity task, RealmContext realmContext) {
public FileIO apply(TaskEntity task, CallContext callContext) {
Map<String, String> internalProperties = task.getInternalPropertiesAsMap();
Map<String, String> properties = new HashMap<>(internalProperties);

@@ -65,6 +65,6 @@ public FileIO apply(TaskEntity task, RealmContext realmContext) {
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.io.ResolvingFileIO");

return fileIOFactory.loadFileIO(
realmContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath);
callContext, ioImpl, properties, identifier, locations, storageActions, resolvedPath);
}
}
Original file line number Diff line number Diff line change
@@ -189,7 +189,7 @@ public void testLoadFileIOForCleanupTask() {
Assertions.assertThat(tasks).hasSize(1);
TaskEntity taskEntity = TaskEntity.of(tasks.get(0));
FileIO fileIO =
new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, realmContext);
new TaskFileIOSupplier(testServices.fileIOFactory()).apply(taskEntity, callContext);
Assertions.assertThat(fileIO).isNotNull().isInstanceOf(InMemoryFileIO.class);

// 1. BasePolarisCatalog:doCommit: for writing the table during the creation
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.apache.polaris.core.PolarisConfigurationStore;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.persistence.MetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisResolvedPathWrapper;
import org.apache.polaris.core.storage.PolarisStorageActions;
@@ -64,7 +64,7 @@ public MeasuredFileIOFactory(

@Override
public FileIO loadFileIO(
@Nonnull RealmContext realmContext,
@Nonnull CallContext callContext,
@Nonnull String ioImplClassName,
@Nonnull Map<String, String> properties,
@Nonnull TableIdentifier identifier,
@@ -79,7 +79,7 @@ public FileIO loadFileIO(
MeasuredFileIO wrapped =
new MeasuredFileIO(
defaultFileIOFactory.loadFileIO(
realmContext,
callContext,
ioImplClassName,
properties,
identifier,