Skip to content

Commit

Permalink
use launchdarkly feature flagging for column selection (#22577)
Browse files Browse the repository at this point in the history
* use launchdarkly feature flagging for column selection

* check for a null workspace id
  • Loading branch information
mfsiega-airbyte authored Feb 10, 2023
1 parent cb695fd commit 8554b5b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.airbyte.config.ReplicationOutput;
import io.airbyte.config.StandardSyncInput;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.FieldSelectionEnabled;
import io.airbyte.featureflag.Workspace;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricEmittingApps;
Expand Down Expand Up @@ -53,6 +55,7 @@
import java.nio.file.Path;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -158,6 +161,13 @@ public Optional<String> runJob() throws Exception {
sourceLauncherConfig.getDockerImage());

log.info("Setting up replication worker...");
final UUID workspaceId = syncInput.getWorkspaceId();
// NOTE: we apply field selection if the feature flag client says so (recommended) or the old
// environment-variable flags say so (deprecated).
// The latter FeatureFlagHelper will be removed once the flag client is fully deployed.
final boolean fieldSelectionEnabled = workspaceId != null &&
(featureFlagClient.enabled(FieldSelectionEnabled.INSTANCE, new Workspace(workspaceId))
|| FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));
final var replicationWorker = new DefaultReplicationWorker(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
Expand All @@ -173,7 +183,7 @@ public Optional<String> runJob() throws Exception {
new RecordSchemaValidator(featureFlagClient, syncInput.getWorkspaceId(), WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter,
new ConnectorConfigUpdater(sourceApi, destinationApi),
FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, syncInput.getWorkspaceId()));
fieldSelectionEnabled);

log.info("Running replication worker...");
final var jobRoot = TemporalUtils.getJobRoot(configs.getWorkspaceRoot(),
Expand Down
4 changes: 4 additions & 0 deletions airbyte-featureflag/src/main/kotlin/Flags.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,14 @@ object LogConnectorMessages : EnvVar(envVar = "LOG_CONNECTOR_MESSAGES")
object StreamCapableState : EnvVar(envVar = "USE_STREAM_CAPABLE_STATE")
object AutoDetectSchema : EnvVar(envVar = "AUTO_DETECT_SCHEMA")
object NeedStateValidation : EnvVar(envVar = "NEED_STATE_VALIDATION")
// NOTE: this is deprecated in favor of FieldSelectionEnabled and will be removed once that flag is fully deployed.
object ApplyFieldSelection : EnvVar(envVar = "APPLY_FIELD_SELECTION")

object PerfBackgroundJsonValidation : Temporary(key = "performance.backgroundJsonSchemaValidation")

object FieldSelectionEnabled : Temporary(key="connection.columnSelection")

// NOTE: this is deprecated in favor of FieldSelectionEnabled and will be removed once that flag is fully deployed.
object FieldSelectionWorkspaces : EnvVar(envVar = "FIELD_SELECTION_WORKSPACES") {
override fun enabled(ctx: Context): Boolean {
val enabledWorkspaceIds: List<String> = fetcher(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
import io.airbyte.featureflag.FeatureFlagClient;
import io.airbyte.featureflag.FieldSelectionEnabled;
import io.airbyte.featureflag.Workspace;
import io.airbyte.metrics.lib.ApmTraceUtils;
import io.airbyte.metrics.lib.MetricAttribute;
import io.airbyte.metrics.lib.MetricClient;
Expand Down Expand Up @@ -313,6 +315,14 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
final MetricClient metricClient = MetricClientFactory.getMetricClient();
final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage());

final UUID workspaceId = syncInput.getWorkspaceId();
// NOTE: we apply field selection if the feature flag client says so (recommended) or the old
// environment-variable flags say so (deprecated).
// The latter FeatureFlagHelper will be removed once the flag client is fully deployed.
final boolean fieldSelectionEnabled = workspaceId != null &&
(featureFlagClient.enabled(FieldSelectionEnabled.INSTANCE, new Workspace(workspaceId))
|| FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, workspaceId));

return new DefaultReplicationWorker(
jobRunConfig.getJobId(),
Math.toIntExact(jobRunConfig.getAttemptId()),
Expand All @@ -329,7 +339,7 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
new RecordSchemaValidator(featureFlagClient, syncInput.getWorkspaceId(), WorkerUtils.mapStreamNamesToSchemas(syncInput)),
metricReporter,
new ConnectorConfigUpdater(airbyteApiClient.getSourceApi(), airbyteApiClient.getDestinationApi()),
FeatureFlagHelper.isFieldSelectionEnabledForWorkspace(featureFlags, syncInput.getWorkspaceId()));
fieldSelectionEnabled);
};
}

Expand Down

0 comments on commit 8554b5b

Please sign in to comment.