Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bootstrap): bootstrap template mcps #11518

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ project.ext.externalDependency = [
'annotationApi': 'javax.annotation:javax.annotation-api:1.3.2',
'jakartaAnnotationApi': 'jakarta.annotation:jakarta.annotation-api:3.0.0',
'classGraph': 'io.github.classgraph:classgraph:4.8.172',
'mustache': 'com.github.spullara.mustache.java:compiler:0.9.14'
]

allprojects {
Expand Down
2 changes: 2 additions & 0 deletions datahub-upgrade/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ dependencies {
implementation project(':metadata-dao-impl:kafka-producer')
implementation externalDependency.charle

implementation externalDependency.mustache
implementation externalDependency.javaxInject
implementation(externalDependency.hadoopClient) {
exclude group: 'net.minidev', module: 'json-smart'
Expand Down Expand Up @@ -83,6 +84,7 @@ dependencies {
testImplementation externalDependency.springBootTest
testImplementation externalDependency.mockito
testImplementation externalDependency.testng
testImplementation 'uk.org.webcompere:system-stubs-testng:2.1.7'
testRuntimeOnly externalDependency.logbackClassic

constraints {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public interface UpgradeManager {

/** Register an {@link Upgrade} with the manaager. */
void register(Upgrade upgrade);
UpgradeManager register(Upgrade upgrade);

/** Kick off an {@link Upgrade} by identifier. */
UpgradeResult execute(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import javax.annotation.Nonnull;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class BootstrapMCPConfig {

@Nonnull
@Value("${systemUpdate.bootstrap.mcpConfig}")
private String bootstrapMCPConfig;

@Bean(name = "bootstrapMCPNonBlocking")
public BootstrapMCP bootstrapMCPNonBlocking(
final OperationContext opContext, EntityService<?> entityService) throws IOException {
return new BootstrapMCP(opContext, bootstrapMCPConfig, entityService, false);
}

@Bean(name = "bootstrapMCPBlocking")
public BootstrapMCP bootstrapMCPBlocking(
final OperationContext opContext, EntityService<?> entityService) throws IOException {
return new BootstrapMCP(opContext, bootstrapMCPConfig, entityService, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.datahub.upgrade.system.SystemUpdate;
import com.linkedin.datahub.upgrade.system.SystemUpdateBlocking;
import com.linkedin.datahub.upgrade.system.SystemUpdateNonBlocking;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import com.linkedin.gms.factory.config.ConfigurationProvider;
import com.linkedin.gms.factory.kafka.DataHubKafkaProducerFactory;
Expand All @@ -31,6 +32,7 @@
import io.datahubproject.metadata.services.RestrictedService;
import java.util.List;
import javax.annotation.Nonnull;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
Expand All @@ -54,21 +56,31 @@ public class SystemUpdateConfig {
public SystemUpdate systemUpdate(
final List<BlockingSystemUpgrade> blockingSystemUpgrades,
final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
final DataHubStartupStep dataHubStartupStep) {
return new SystemUpdate(blockingSystemUpgrades, nonBlockingSystemUpgrades, dataHubStartupStep);
final DataHubStartupStep dataHubStartupStep,
@Qualifier("bootstrapMCPBlocking") @NonNull final BootstrapMCP bootstrapMCPBlocking,
@Qualifier("bootstrapMCPNonBlocking") @NonNull final BootstrapMCP bootstrapMCPNonBlocking) {
return new SystemUpdate(
blockingSystemUpgrades,
nonBlockingSystemUpgrades,
dataHubStartupStep,
bootstrapMCPBlocking,
bootstrapMCPNonBlocking);
}

@Bean(name = "systemUpdateBlocking")
public SystemUpdateBlocking systemUpdateBlocking(
final List<BlockingSystemUpgrade> blockingSystemUpgrades,
final DataHubStartupStep dataHubStartupStep) {
return new SystemUpdateBlocking(blockingSystemUpgrades, List.of(), dataHubStartupStep);
final DataHubStartupStep dataHubStartupStep,
@Qualifier("bootstrapMCPBlocking") @NonNull final BootstrapMCP bootstrapMCPBlocking) {
return new SystemUpdateBlocking(
blockingSystemUpgrades, dataHubStartupStep, bootstrapMCPBlocking);
}

@Bean(name = "systemUpdateNonBlocking")
public SystemUpdateNonBlocking systemUpdateNonBlocking(
final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades) {
return new SystemUpdateNonBlocking(List.of(), nonBlockingSystemUpgrades, null);
final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Qualifier("bootstrapMCPNonBlocking") @NonNull final BootstrapMCP bootstrapMCPNonBlocking) {
return new SystemUpdateNonBlocking(nonBlockingSystemUpgrades, bootstrapMCPNonBlocking);
}

@Value("#{systemEnvironment['DATAHUB_REVISION'] ?: '0'}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class DefaultUpgradeContext implements UpgradeContext {
private final List<String> args;
private final Map<String, Optional<String>> parsedArgs;

DefaultUpgradeContext(
public DefaultUpgradeContext(
@Nonnull OperationContext opContext,
Upgrade upgrade,
UpgradeReport report,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ public class DefaultUpgradeManager implements UpgradeManager {
private final Map<String, Upgrade> _upgrades = new HashMap<>();

@Override
public void register(@Nonnull Upgrade upgrade) {
public UpgradeManager register(@Nonnull Upgrade upgrade) {
_upgrades.put(upgrade.id(), upgrade);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeCleanupStep;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -22,7 +23,9 @@ public class SystemUpdate implements Upgrade {
public SystemUpdate(
@NonNull final List<BlockingSystemUpgrade> blockingSystemUpgrades,
@NonNull final List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Nullable final DataHubStartupStep dataHubStartupStep) {
@Nullable final DataHubStartupStep dataHubStartupStep,
@Nullable final BootstrapMCP bootstrapMCPBlocking,
@Nullable final BootstrapMCP bootstrapMCPNonBlocking) {

steps = new LinkedList<>();
cleanupSteps = new LinkedList<>();
Expand All @@ -32,11 +35,23 @@ public SystemUpdate(
cleanupSteps.addAll(
blockingSystemUpgrades.stream().flatMap(up -> up.cleanupSteps().stream()).toList());

// bootstrap blocking only
if (bootstrapMCPBlocking != null) {
steps.addAll(bootstrapMCPBlocking.steps());
cleanupSteps.addAll(bootstrapMCPBlocking.cleanupSteps());
}

// emit system update message if blocking upgrade(s) present
if (dataHubStartupStep != null && !blockingSystemUpgrades.isEmpty()) {
steps.add(dataHubStartupStep);
}

// bootstrap non-blocking only
if (bootstrapMCPNonBlocking != null) {
steps.addAll(bootstrapMCPNonBlocking.steps());
cleanupSteps.addAll(bootstrapMCPNonBlocking.cleanupSteps());
}

// add non-blocking upgrades last
steps.addAll(nonBlockingSystemUpgrades.stream().flatMap(up -> up.steps().stream()).toList());
cleanupSteps.addAll(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
package com.linkedin.datahub.upgrade.system;

import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import java.util.List;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;

public class SystemUpdateBlocking extends SystemUpdate {

public SystemUpdateBlocking(
@NonNull List<BlockingSystemUpgrade> blockingSystemUpgrades,
@NonNull List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Nullable DataHubStartupStep dataHubStartupStep) {
super(blockingSystemUpgrades, nonBlockingSystemUpgrades, dataHubStartupStep);
@NonNull DataHubStartupStep dataHubStartupStep,
@NonNull final BootstrapMCP bootstrapMCPBlocking) {
super(blockingSystemUpgrades, List.of(), dataHubStartupStep, bootstrapMCPBlocking, null);
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
package com.linkedin.datahub.upgrade.system;

import com.linkedin.datahub.upgrade.system.elasticsearch.steps.DataHubStartupStep;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.BootstrapMCP;
import java.util.List;
import lombok.NonNull;
import org.jetbrains.annotations.Nullable;

public class SystemUpdateNonBlocking extends SystemUpdate {

public SystemUpdateNonBlocking(
@NonNull List<BlockingSystemUpgrade> blockingSystemUpgrades,
@NonNull List<NonBlockingSystemUpgrade> nonBlockingSystemUpgrades,
@Nullable DataHubStartupStep dataHubStartupStep) {
super(blockingSystemUpgrades, nonBlockingSystemUpgrades, dataHubStartupStep);
final BootstrapMCP bootstrapMCPNonBlocking) {
super(List.of(), nonBlockingSystemUpgrades, null, null, bootstrapMCPNonBlocking);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.Upgrade;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;

public class BootstrapMCP implements Upgrade {
private final List<UpgradeStep> _steps;

public BootstrapMCP(
OperationContext opContext,
@Nullable String bootstrapMCPConfig,
EntityService<?> entityService,
boolean isBlocking)
throws IOException {
if (bootstrapMCPConfig != null && !bootstrapMCPConfig.isEmpty()) {
_steps =
BootstrapMCPUtil.generateSteps(opContext, isBlocking, bootstrapMCPConfig, entityService);
} else {
_steps = ImmutableList.of();
}
}

@Override
public String id() {
return getClass().getSimpleName();
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package com.linkedin.datahub.upgrade.system.bootstrapmcps;

import static com.linkedin.metadata.Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.datahub.upgrade.system.bootstrapmcps.model.BootstrapMCPConfigFile;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.upgrade.DataHubUpgradeState;
import io.datahubproject.metadata.context.OperationContext;
import java.io.IOException;
import java.util.List;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

/**
* This bootstrap step is responsible for upgrading DataHub policy documents with new searchable
* fields in ES
*/
@Slf4j
public class BootstrapMCPStep implements UpgradeStep {
private final String upgradeId;
private final Urn upgradeIdUrn;

private final OperationContext opContext;
private final EntityService<?> entityService;
@Getter private final BootstrapMCPConfigFile.MCPTemplate mcpTemplate;

public BootstrapMCPStep(
OperationContext opContext,
EntityService<?> entityService,
BootstrapMCPConfigFile.MCPTemplate mcpTemplate) {
this.opContext = opContext;
this.entityService = entityService;
this.mcpTemplate = mcpTemplate;
this.upgradeId =
String.join("-", List.of("bootstrap", mcpTemplate.getName(), mcpTemplate.getVersion()));
this.upgradeIdUrn = BootstrapStep.getUpgradeUrn(this.upgradeId);
}

@Override
public String id() {
return upgradeId;
}

@Override
public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {
try {
AspectsBatch batch = BootstrapMCPUtil.generateAspectBatch(opContext, mcpTemplate);
log.info("Ingesting {} MCPs", batch.getItems().size());
entityService.ingestProposal(opContext, batch, mcpTemplate.isAsync());
} catch (IOException e) {
log.error("Error bootstrapping MCPs", e);
return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.FAILED);
}

BootstrapStep.setUpgradeResult(context.opContext(), upgradeIdUrn, entityService);

return new DefaultUpgradeStepResult(id(), DataHubUpgradeState.SUCCEEDED);
};
}

/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return mcpTemplate.isOptional();
}

/** Returns whether the upgrade should be skipped. */
@Override
public boolean skip(UpgradeContext context) {
if (!mcpTemplate.isForce()) {
boolean previouslyRun =
entityService.exists(
context.opContext(), upgradeIdUrn, DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, true);
if (previouslyRun) {
log.info("{} was already run. Skipping.", id());
}
return previouslyRun;
} else {
log.info("{} forced run.", id());
return false;
}
}
}
Loading
Loading