From 0c27c74500fd77fd74d862fab1bb5fb72a3f4cd2 Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Thu, 2 Feb 2023 12:29:55 -0500 Subject: [PATCH] Trace refresh schema operations --- .../sync/RefreshSchemaActivityImpl.java | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java index d3ecbb1709b80..b959351864d68 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/RefreshSchemaActivityImpl.java @@ -5,6 +5,8 @@ package io.airbyte.workers.temporal.sync; import static io.airbyte.metrics.lib.ApmTraceConstants.ACTIVITY_TRACE_OPERATION_NAME; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.CONNECTION_ID_KEY; +import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.SOURCE_ID_KEY; import datadog.trace.api.Trace; import io.airbyte.api.client.generated.SourceApi; @@ -13,8 +15,10 @@ import io.airbyte.api.client.model.generated.SourceDiscoverSchemaRequestBody; import io.airbyte.api.client.model.generated.SourceIdRequestBody; import io.airbyte.commons.features.EnvVariableFeatureFlags; +import io.airbyte.metrics.lib.ApmTraceUtils; import jakarta.inject.Singleton; import java.time.OffsetDateTime; +import java.util.Map; import java.util.UUID; import lombok.extern.slf4j.Slf4j; @@ -25,48 +29,54 @@ public class RefreshSchemaActivityImpl implements RefreshSchemaActivity { private final SourceApi sourceApi; private final EnvVariableFeatureFlags envVariableFeatureFlags; - public RefreshSchemaActivityImpl(SourceApi sourceApi, - EnvVariableFeatureFlags envVariableFeatureFlags) { + public RefreshSchemaActivityImpl(final SourceApi sourceApi, + final EnvVariableFeatureFlags envVariableFeatureFlags) { this.sourceApi = sourceApi; this.envVariableFeatureFlags = envVariableFeatureFlags; } @Override @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) - public boolean shouldRefreshSchema(UUID sourceCatalogId) { + public boolean shouldRefreshSchema(final UUID sourceCatalogId) { if (!envVariableFeatureFlags.autoDetectSchema()) { return false; } + ApmTraceUtils.addTagsToTrace(Map.of(SOURCE_ID_KEY, sourceCatalogId)); return !schemaRefreshRanRecently(sourceCatalogId); } @Override - public void refreshSchema(UUID sourceCatalogId, UUID connectionId) { + @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) + public void refreshSchema(final UUID sourceCatalogId, final UUID connectionId) { if (!envVariableFeatureFlags.autoDetectSchema()) { return; } - SourceDiscoverSchemaRequestBody requestBody = + ApmTraceUtils.addTagsToTrace(Map.of(CONNECTION_ID_KEY, connectionId, SOURCE_ID_KEY, sourceCatalogId)); + + final SourceDiscoverSchemaRequestBody requestBody = new SourceDiscoverSchemaRequestBody().sourceId(sourceCatalogId).disableCache(true).connectionId(connectionId); try { sourceApi.discoverSchemaForSource(requestBody); } catch (final Exception e) { + ApmTraceUtils.addExceptionToTrace(e); // catching this exception because we don't want to block replication due to a failed schema refresh log.error("Attempted schema refresh, but failed with error: ", e); } } - private boolean schemaRefreshRanRecently(UUID sourceCatalogId) { + private boolean schemaRefreshRanRecently(final UUID sourceCatalogId) { try { - SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceCatalogId); - ActorCatalogWithUpdatedAt mostRecentFetchEvent = sourceApi.getMostRecentSourceActorCatalog(sourceIdRequestBody); + final SourceIdRequestBody sourceIdRequestBody = new SourceIdRequestBody().sourceId(sourceCatalogId); + final ActorCatalogWithUpdatedAt mostRecentFetchEvent = sourceApi.getMostRecentSourceActorCatalog(sourceIdRequestBody); if (mostRecentFetchEvent.getUpdatedAt() == null) { return false; } return mostRecentFetchEvent.getUpdatedAt() > OffsetDateTime.now().minusHours(24l).toEpochSecond(); - } catch (ApiException e) { + } catch (final ApiException e) { + ApmTraceUtils.addExceptionToTrace(e); // catching this exception because we don't want to block replication due to a failed schema refresh log.info("Encountered an error fetching most recent actor catalog fetch event: ", e); return true;