Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core persistence refactor phase 1 - extract BasePersistence and BaseMetaStoreManager to isolate all "transaction" behaviors #1070

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
efbb666
Remove all tightly coupled EntityCache dependencies in the main persi…
dennishuo Feb 24, 2025
04a636e
Add support for actually giving null EntityCache to Resolver; paramet…
dennishuo Feb 24, 2025
31a837b
Remove PolarisMetaStoreSession from FileIOFactory/FileIOUtil in favor…
dennishuo Feb 24, 2025
dc820d7
Merge branch 'main' of github.com:dennishuo/polaris into dhuo-persist…
dennishuo Feb 25, 2025
7d8942f
Restructure persistence class hierarchy and remove vestigial interfaces
dennishuo Feb 25, 2025
54b762e
Remove all "entitiesDropped" members; these were vestigial from tryin…
dennishuo Feb 25, 2025
c7c9c45
Extract BasePersistence interface as parent interface of PolarisMetaS…
dennishuo Feb 25, 2025
db9e71d
Merge branch 'main' of github.com:dennishuo/polaris into dhuo-persist…
dennishuo Feb 26, 2025
87a8010
Push all evidence of the two-phase lookupEntityByName into only the t…
dennishuo Feb 26, 2025
13ba161
Turn PolarisMetaStoreSession into an abstract class and make lookupEn…
dennishuo Feb 26, 2025
5743dd2
Pushdown all calls to writeToEntities into PolarisMetaStoreSession, a…
dennishuo Feb 26, 2025
b739fd1
Add originalEntity to the writeEntity method to enable compare-and-sw…
dennishuo Feb 26, 2025
b52a4af
Break out external-integration related methods from BasePersistence i…
dennishuo Feb 26, 2025
f9c1fd7
Improve javadoc comments, rename PolarisEntityActiveRecord to EntityN…
dennishuo Feb 27, 2025
f0f06ad
Rename PolarisMetaStoreSession to TransactionalPersistence and move i…
dennishuo Feb 27, 2025
ea0a90f
Also move the PolarisTreeMap* classes into the transactional package
dennishuo Feb 27, 2025
f7d602c
Move PolarisMetaStoreManagerImpl into the "transactional" package per…
dennishuo Feb 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.persistence.LocalPolarisMetaStoreManagerFactory;
import org.apache.polaris.core.persistence.PolarisMetaStoreManager;
import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
import org.apache.polaris.core.persistence.bootstrap.RootCredentialsSet;
import org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;

/**
Expand Down Expand Up @@ -60,7 +60,7 @@ protected PolarisEclipseLinkStore createBackingStore(@Nonnull PolarisDiagnostics
}

@Override
protected PolarisMetaStoreSession createMetaStoreSession(
protected TransactionalPersistence createMetaStoreSession(
@Nonnull PolarisEclipseLinkStore store,
@Nonnull RealmContext realmContext,
@Nullable RootCredentialsSet rootCredentialsSet,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,20 +39,20 @@
import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisCallContext;
import org.apache.polaris.core.context.RealmContext;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisChangeTrackingVersions;
import org.apache.polaris.core.entity.PolarisEntitiesActiveKey;
import org.apache.polaris.core.entity.PolarisEntityActiveRecord;
import org.apache.polaris.core.entity.PolarisEntityCore;
import org.apache.polaris.core.entity.PolarisEntityId;
import org.apache.polaris.core.entity.PolarisEntityType;
import org.apache.polaris.core.entity.PolarisGrantRecord;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.exceptions.AlreadyExistsException;
import org.apache.polaris.core.persistence.PolarisMetaStoreManagerImpl;
import org.apache.polaris.core.persistence.PolarisMetaStoreSession;
import org.apache.polaris.core.persistence.BaseMetaStoreManager;
import org.apache.polaris.core.persistence.PrincipalSecretsGenerator;
import org.apache.polaris.core.persistence.RetryOnConcurrencyException;
import org.apache.polaris.core.persistence.transactional.TransactionalPersistence;
import org.apache.polaris.core.storage.PolarisStorageConfigurationInfo;
import org.apache.polaris.core.storage.PolarisStorageIntegration;
import org.apache.polaris.core.storage.PolarisStorageIntegrationProvider;
Expand All @@ -68,7 +68,7 @@
* EclipseLink implementation of a Polaris metadata store supporting persisting and retrieving all
* Polaris metadata from/to the configured database systems.
*/
public class PolarisEclipseLinkMetaStoreSessionImpl implements PolarisMetaStoreSession {
public class PolarisEclipseLinkMetaStoreSessionImpl extends TransactionalPersistence {
private static final Logger LOGGER =
LoggerFactory.getLogger(PolarisEclipseLinkMetaStoreSessionImpl.class);

Expand Down Expand Up @@ -271,14 +271,6 @@ public void writeToEntitiesActive(
this.store.writeToEntitiesActive(localSession.get(), entity);
}

/** {@inheritDoc} */
@Override
public void writeToEntitiesDropped(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
// write it
this.store.writeToEntitiesDropped(localSession.get(), entity);
}

/** {@inheritDoc} */
@Override
public void writeToEntitiesChangeTracking(
Expand Down Expand Up @@ -312,14 +304,6 @@ public void deleteFromEntitiesActive(
this.store.deleteFromEntitiesActive(localSession.get(), new PolarisEntitiesActiveKey(entity));
}

/** {@inheritDoc} */
@Override
public void deleteFromEntitiesDropped(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
// delete it
this.store.deleteFromEntitiesDropped(localSession.get(), entity.getCatalogId(), entity.getId());
}

/**
* {@inheritDoc}
*
Expand Down Expand Up @@ -371,14 +355,6 @@ public void deleteAll(@Nonnull PolarisCallContext callCtx) {
.toList();
}

/** {@inheritDoc} */
@Override
public int lookupEntityVersion(
@Nonnull PolarisCallContext callCtx, long catalogId, long entityId) {
ModelEntity model = this.store.lookupEntity(localSession.get(), catalogId, entityId);
return model == null ? 0 : model.getEntityVersion();
}

/** {@inheritDoc} */
@Override
public @Nonnull List<PolarisChangeTrackingVersions> lookupEntityVersions(
Expand All @@ -404,7 +380,7 @@ public int lookupEntityVersion(
/** {@inheritDoc} */
@Override
@Nullable
public PolarisEntityActiveRecord lookupEntityActive(
public EntityNameLookupRecord lookupEntityActive(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisEntitiesActiveKey entityActiveKey) {
// lookup the active entity slice
return ModelEntityActive.toEntityActive(
Expand All @@ -414,7 +390,7 @@ public PolarisEntityActiveRecord lookupEntityActive(
/** {@inheritDoc} */
@Override
@Nonnull
public List<PolarisEntityActiveRecord> lookupEntityActiveBatch(
public List<EntityNameLookupRecord> lookupEntityActiveBatch(
@Nonnull PolarisCallContext callCtx,
@Nonnull List<PolarisEntitiesActiveKey> entityActiveKeys) {
// now build a list to quickly verify that nothing has changed
Expand All @@ -425,31 +401,31 @@ public List<PolarisEntityActiveRecord> lookupEntityActiveBatch(

/** {@inheritDoc} */
@Override
public @Nonnull List<PolarisEntityActiveRecord> listActiveEntities(
public @Nonnull List<EntityNameLookupRecord> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType) {
return listActiveEntities(callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue());
return listEntities(callCtx, catalogId, parentId, entityType, Predicates.alwaysTrue());
}

@Override
public @Nonnull List<PolarisEntityActiveRecord> listActiveEntities(
public @Nonnull List<EntityNameLookupRecord> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
@Nonnull PolarisEntityType entityType,
@Nonnull Predicate<PolarisBaseEntity> entityFilter) {
// full range scan under the parent for that type
return listActiveEntities(
return listEntities(
callCtx,
catalogId,
parentId,
entityType,
Integer.MAX_VALUE,
entityFilter,
entity ->
new PolarisEntityActiveRecord(
new EntityNameLookupRecord(
entity.getCatalogId(),
entity.getId(),
entity.getParentId(),
Expand All @@ -459,7 +435,7 @@ public List<PolarisEntityActiveRecord> lookupEntityActiveBatch(
}

@Override
public @Nonnull <T> List<T> listActiveEntities(
public @Nonnull <T> List<T> listEntities(
@Nonnull PolarisCallContext callCtx,
long catalogId,
long parentId,
Expand Down Expand Up @@ -674,7 +650,7 @@ PolarisStorageIntegration<T> createStorageIntegration(
PolarisStorageIntegration<T> loadPolarisStorageIntegration(
@Nonnull PolarisCallContext callCtx, @Nonnull PolarisBaseEntity entity) {
PolarisStorageConfigurationInfo storageConfig =
PolarisMetaStoreManagerImpl.readStorageConfiguration(callCtx, entity);
BaseMetaStoreManager.extractStorageConfiguration(callCtx, entity);
return storageIntegrationProvider.getStorageIntegrationForConfig(storageConfig);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
import org.apache.polaris.core.entity.PolarisBaseEntity;
import org.apache.polaris.core.entity.PolarisEntitiesActiveKey;
import org.apache.polaris.core.entity.PolarisEntityActiveRecord;
import org.apache.polaris.core.entity.PolarisEntityCore;
import org.apache.polaris.core.entity.PolarisEntityId;
import org.apache.polaris.core.entity.PolarisEntityType;
Expand All @@ -38,7 +38,6 @@
import org.apache.polaris.jpa.models.ModelEntity;
import org.apache.polaris.jpa.models.ModelEntityActive;
import org.apache.polaris.jpa.models.ModelEntityChangeTracking;
import org.apache.polaris.jpa.models.ModelEntityDropped;
import org.apache.polaris.jpa.models.ModelGrantRecord;
import org.apache.polaris.jpa.models.ModelPrincipalSecrets;
import org.slf4j.Logger;
Expand Down Expand Up @@ -100,18 +99,7 @@ void writeToEntitiesActive(EntityManager session, PolarisBaseEntity entity) {

ModelEntityActive model = lookupEntityActive(session, new PolarisEntitiesActiveKey(entity));
if (model == null) {
session.persist(ModelEntityActive.fromEntityActive(new PolarisEntityActiveRecord(entity)));
}
}

void writeToEntitiesDropped(EntityManager session, PolarisBaseEntity entity) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelEntityDropped entityDropped =
lookupEntityDropped(session, entity.getCatalogId(), entity.getId());
if (entityDropped == null) {
session.persist(ModelEntityDropped.fromEntity(entity));
session.persist(ModelEntityActive.fromEntityActive(new EntityNameLookupRecord(entity)));
}
}

Expand Down Expand Up @@ -158,16 +146,6 @@ void deleteFromEntitiesActive(EntityManager session, PolarisEntitiesActiveKey ke
session.remove(entity);
}

void deleteFromEntitiesDropped(EntityManager session, long catalogId, long entityId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

ModelEntityDropped entity = lookupEntityDropped(session, catalogId, entityId);
diagnosticServices.check(entity != null, "dropped_entity_not_found");

session.remove(entity);
}

void deleteFromEntitiesChangeTracking(EntityManager session, PolarisEntityCore entity) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();
Expand Down Expand Up @@ -216,7 +194,6 @@ void deleteAll(EntityManager session) {

session.createQuery("DELETE from ModelEntity").executeUpdate();
session.createQuery("DELETE from ModelEntityActive").executeUpdate();
session.createQuery("DELETE from ModelEntityDropped").executeUpdate();
session.createQuery("DELETE from ModelEntityChangeTracking").executeUpdate();
session.createQuery("DELETE from ModelGrantRecord").executeUpdate();
session.createQuery("DELETE from ModelPrincipalSecrets").executeUpdate();
Expand Down Expand Up @@ -319,21 +296,6 @@ List<ModelEntity> lookupFullEntitiesActive(
return query.getResultList();
}

ModelEntityDropped lookupEntityDropped(EntityManager session, long catalogId, long entityId) {
diagnosticServices.check(session != null, "session_is_null");
checkInitialized();

return session
.createQuery(
"SELECT m from ModelEntityDropped m where m.catalogId=:catalogId and m.id=:id",
ModelEntityDropped.class)
.setParameter("catalogId", catalogId)
.setParameter("id", entityId)
.getResultStream()
.findFirst()
.orElse(null);
}

ModelEntityChangeTracking lookupEntityChangeTracking(
EntityManager session, long catalogId, long entityId) {
diagnosticServices.check(session != null, "session_is_null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.apache.polaris.core.PolarisDiagnostics;
import org.apache.polaris.core.entity.PolarisPrincipalSecrets;
import org.apache.polaris.core.persistence.BasePolarisMetaStoreManagerTest;
import org.apache.polaris.core.persistence.PolarisMetaStoreManagerImpl;
import org.apache.polaris.core.persistence.PolarisTestMetaStoreManager;
import org.apache.polaris.core.persistence.transactional.PolarisMetaStoreManagerImpl;
import org.apache.polaris.jpa.models.ModelPrincipalSecrets;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import jakarta.persistence.Entity;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
import org.apache.polaris.core.entity.PolarisEntityActiveRecord;
import org.apache.polaris.core.entity.EntityNameLookupRecord;
import org.apache.polaris.core.entity.PolarisEntitySubType;
import org.apache.polaris.core.entity.PolarisEntityType;

Expand Down Expand Up @@ -128,7 +128,7 @@ public ModelEntityActive build() {
}
}

public static ModelEntityActive fromEntityActive(PolarisEntityActiveRecord record) {
public static ModelEntityActive fromEntityActive(EntityNameLookupRecord record) {
return ModelEntityActive.builder()
.catalogId(record.getCatalogId())
.id(record.getId())
Expand All @@ -139,12 +139,12 @@ public static ModelEntityActive fromEntityActive(PolarisEntityActiveRecord recor
.build();
}

public static PolarisEntityActiveRecord toEntityActive(ModelEntityActive model) {
public static EntityNameLookupRecord toEntityActive(ModelEntityActive model) {
if (model == null) {
return null;
}

return new PolarisEntityActiveRecord(
return new EntityNameLookupRecord(
model.catalogId, model.id, model.parentId, model.name, model.typeCode, model.subTypeCode);
}
}
Loading