Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Progress Bar Persistence Read/Write #20787

Merged
merged 21 commits into from
Dec 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
0be298c
Checkpoint: Add interface methods.
davinchia Dec 20, 2022
d6e12a7
Correct the attempt id column to be a bigint.
davinchia Dec 24, 2022
326c555
Correct the wrong constraints.
davinchia Dec 24, 2022
402913c
Start adding tests. Alphabetise the SyncStats.yaml. Add get StreamSta…
davinchia Dec 24, 2022
513c068
Implement the right migration.
davinchia Dec 24, 2022
5bf2b49
Add estimated fieldds to the SyncStats message.
davinchia Dec 24, 2022
6d3661f
Change interface to a combined stats POJO object. Modify query to sor…
davinchia Dec 24, 2022
6699e03
Merge remote-tracking branch 'origin/master' into davinchia/implement…
davinchia Dec 27, 2022
ed35076
Get the basic test passing.
davinchia Dec 27, 2022
055a599
Add upsert handling for sync stats table.
davinchia Dec 27, 2022
3d3a5bc
Edit migration to allow for namespace to be null. Implement writing t…
davinchia Dec 27, 2022
4c6701a
Add upsert handling for stream stats table. Buff up test suite.
davinchia Dec 27, 2022
1059b86
Checkpoint: Everything works except upsert for a stream with null nam…
davinchia Dec 27, 2022
8d304f4
Get everything passing.
davinchia Dec 27, 2022
efca05e
Clean up.
davinchia Dec 27, 2022
5500bc2
Merge branch 'master' into davinchia/implement-progress-bar-persistence
davinchia Dec 27, 2022
6b7eddd
Remove bad annotation.
davinchia Dec 27, 2022
56436ff
Add field for namespace.
davinchia Dec 27, 2022
44f6bb6
Merge branch 'master' into davinchia/implement-progress-bar-persistence
davinchia Dec 27, 2022
c25d472
Respond to PR feedback.
davinchia Dec 28, 2022
b3a1027
Respond to PR feedback.
davinchia Dec 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3970,6 +3970,8 @@ components:
properties:
streamName:
type: string
streamNamespace:
type: string
stats:
$ref: "#/components/schemas/AttemptStats"
AttemptFailureSummary:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.18.002", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.26.001", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
Expand Down
5 changes: 5 additions & 0 deletions airbyte-commons/src/main/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@
If a database has tons of DDL queries, the logs would be filled with such messages-->
<Logger name="io.debezium.relational.history" level="OFF" />

<!--Uncomment the following to debug JOOQ generated SQL queries.-->
<!--<Logger name="org.jooq.tools.LoggerListener" level="debug">-->
<!-- <AppenderRef ref="Console"/>-->
<!--</Logger>-->

</Loggers>

</Configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,23 @@ required:
- bytesEmitted
additionalProperties: true
properties:
recordsEmitted:
type: integer
bytesEmitted:
type: integer
sourceStateMessagesEmitted:
description: Number of State messages emitted by the Source Connector
type: integer
destinationStateMessagesEmitted:
description: Number of State messages emitted by the Destination Connector
type: integer
recordsCommitted:
type: integer # if unset, committed records could not be computed
destinationWriteEndTime:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sorted this file and added the estimated fields.

type: integer
description: The exit time of the destination container/pod
destinationWriteStartTime:
type: integer
description: The boot time of the destination container/pod
estimatedBytes:
type: integer
description: The total estimated number of bytes for the sync
estimatedRecords:
type: integer
description: The total estimated number of records for the sync
meanSecondsBeforeSourceStateMessageEmitted:
type: integer
maxSecondsBeforeSourceStateMessageEmitted:
Expand All @@ -29,21 +34,22 @@ properties:
type: integer
meanSecondsBetweenStateMessageEmittedandCommitted:
type: integer
replicationStartTime:
recordsEmitted:
type: integer
description: The start of the replication activity
recordsCommitted:
type: integer # if unset, committed records could not be computed
replicationEndTime:
type: integer
description: The end of the replication activity
sourceReadStartTime:
replicationStartTime:
type: integer
description: The boot time of the source container/pod
description: The start of the replication activity
sourceReadEndTime:
type: integer
description: The exit time of the source container/pod
destinationWriteStartTime:
sourceReadStartTime:
type: integer
description: The boot time of the destination container/pod
destinationWriteEndTime:
description: The boot time of the source container/pod
sourceStateMessagesEmitted:
description: Number of State messages emitted by the Source Connector
type: integer
description: The exit time of the destination container/pod
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.db.instance.jobs.migrations;

import static org.jooq.impl.DSL.constraint;

import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class V0_40_26_001__CorrectStreamStatsTable extends BaseJavaMigration {

private static final Logger LOGGER = LoggerFactory.getLogger(V0_40_26_001__CorrectStreamStatsTable.class);

@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());

// Warning: please do not use any jOOQ generated code to write a migration.
// As database schema changes, the generated jOOQ code can be deprecated. So
// old migration may not compile if there is any generated code.
try (final DSLContext ctx = DSL.using(context.getConnection())) {
// This actually needs to be bigint to match the id column on the attempts table.
String streamStats = "stream_stats";
ctx.alterTable(streamStats).alter("attempt_id").set(SQLDataType.BIGINT.nullable(false)).execute();
// Not all streams provide a namespace.
ctx.alterTable(streamStats).alter("stream_namespace").set(SQLDataType.VARCHAR.nullable(true)).execute();

// The constraint should also take into account the stream namespace. Drop the constraint and
// recreate it.
ctx.alterTable(streamStats).dropUnique("stream_stats_attempt_id_stream_name_key").execute();
ctx.alterTable(streamStats).add(constraint("uniq_stream_attempt").unique("attempt_id", "stream_name", "stream_namespace")).execute();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this constraint doesn't behave as you expect it to because of how SQL handles NULL values.
rows with a null are ignored from this constraint, for this behavior, there should be two constraints, one for when namespace isn't null with all three columns, one should be for when the stream namespace is null with attempt_id/stream_name.

last time I looked at this with jooq, I ended up checking this in our application logic because some solutions were postgres specific and I was not able to figure out the joop way of describing it.

Copy link
Contributor Author

@davinchia davinchia Dec 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did check the current constraint separately (manually writing to the database) and it does include the NULL namespace case (i.e. errors out). This makes me think this is a JOOQ thing.

I also checked both constraints and in combination with the above, decided this is simpler than adding another constraint that only exists because of JOOQ.

}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ create table "public"."normalization_summaries"(
);
create table "public"."stream_stats"(
"id" uuid not null,
"attempt_id" int4 not null,
"stream_namespace" varchar(2147483647) not null,
"attempt_id" int8 not null,
"stream_namespace" varchar(2147483647) null,
"stream_name" varchar(2147483647) not null,
"records_emitted" int8 null,
"bytes_emitted" int8 null,
Expand Down Expand Up @@ -122,10 +122,11 @@ create index "jobs_status_idx" on "public"."jobs"("status" asc);
create unique index "normalization_summaries_pkey" on "public"."normalization_summaries"("id" asc);
create index "normalization_summary_attempt_id_idx" on "public"."normalization_summaries"("attempt_id" asc);
create index "index" on "public"."stream_stats"("attempt_id" asc);
create unique index "stream_stats_attempt_id_stream_name_key" on "public"."stream_stats"(
create unique index "stream_stats_pkey" on "public"."stream_stats"("id" asc);
create unique index "uniq_stream_attempt" on "public"."stream_stats"(
"attempt_id" asc,
"stream_name" asc
"stream_name" asc,
"stream_namespace" asc
);
create unique index "stream_stats_pkey" on "public"."stream_stats"("id" asc);
create index "attempt_id_idx" on "public"."sync_stats"("attempt_id" asc);
create unique index "sync_stats_pkey" on "public"."sync_stats"("id" asc);
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.ATTEMPTS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.JOBS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.NORMALIZATION_SUMMARIES;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.STREAM_STATS;
import static io.airbyte.db.instance.jobs.jooq.generated.Tables.SYNC_STATS;

import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -33,7 +34,9 @@
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.persistence.PersistenceHelpers;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
Expand Down Expand Up @@ -337,8 +340,6 @@ public Optional<String> getAttemptTemporalWorkflowId(final long jobId, final int
public void writeOutput(final long jobId, final int attemptNumber, final JobOutput output)
throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
final SyncStats syncStats = output.getSync().getStandardSyncSummary().getTotalStats();
final NormalizationSummary normalizationSummary = output.getSync().getNormalizationSummary();

jobDatabase.transaction(ctx -> {
ctx.update(ATTEMPTS)
Expand All @@ -348,10 +349,12 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp
.execute();
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);

final SyncStats syncStats = output.getSync().getStandardSyncSummary().getTotalStats();
if (syncStats != null) {
writeSyncStats(now, syncStats, attemptId, ctx);
saveToSyncStatsTable(now, syncStats, attemptId, ctx);
}

final NormalizationSummary normalizationSummary = output.getSync().getNormalizationSummary();
if (normalizationSummary != null) {
ctx.insertInto(NORMALIZATION_SUMMARIES)
.set(NORMALIZATION_SUMMARIES.ID, UUID.randomUUID())
Expand All @@ -369,14 +372,65 @@ public void writeOutput(final long jobId, final int attemptNumber, final JobOutp

}

private static void writeSyncStats(final OffsetDateTime now, final SyncStats syncStats, final Long attemptId, final DSLContext ctx) {
@Override
public void writeStats(final long jobId,
final int attemptNumber,
final long estimatedRecords,
final long estimatedBytes,
final long recordsEmitted,
final long bytesEmitted,
final List<StreamSyncStats> streamStats)
throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
jobDatabase.transaction(ctx -> {
final var attemptId = getAttemptId(jobId, attemptNumber, ctx);

final var syncStats = new SyncStats()
.withEstimatedRecords(estimatedRecords)
.withEstimatedBytes(estimatedBytes)
.withRecordsEmitted(recordsEmitted)
.withBytesEmitted(bytesEmitted);
saveToSyncStatsTable(now, syncStats, attemptId, ctx);

saveToStreamStatsTable(now, streamStats, attemptId, ctx);
return null;
});

}

private static void saveToSyncStatsTable(final OffsetDateTime now, final SyncStats syncStats, final Long attemptId, final DSLContext ctx) {
// Although JOOQ supports upsert using the onConflict statement, we cannot use it as the table
// currently has duplicate records and also doesn't contain the unique constraint on the attempt_id
// column JOOQ requires. We are forced to check for existence.
final var isExisting = ctx.fetchExists(SYNC_STATS, SYNC_STATS.ATTEMPT_ID.eq(attemptId));
if (isExisting) {
ctx.update(SYNC_STATS)
.set(SYNC_STATS.UPDATED_AT, now)
.set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted())
.set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted())
.set(SYNC_STATS.ESTIMATED_RECORDS, syncStats.getEstimatedRecords())
.set(SYNC_STATS.ESTIMATED_BYTES, syncStats.getEstimatedBytes())
.set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted())
.set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted())
.set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted())
.set(SYNC_STATS.MAX_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMaxSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MEAN_SECONDS_BEFORE_SOURCE_STATE_MESSAGE_EMITTED, syncStats.getMeanSecondsBeforeSourceStateMessageEmitted())
.set(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMaxSecondsBetweenStateMessageEmittedandCommitted())
.set(SYNC_STATS.MEAN_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED, syncStats.getMeanSecondsBetweenStateMessageEmittedandCommitted())
.where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.execute();
return;
}

ctx.insertInto(SYNC_STATS)
.set(SYNC_STATS.ID, UUID.randomUUID())
.set(SYNC_STATS.UPDATED_AT, now)
.set(SYNC_STATS.CREATED_AT, now)
.set(SYNC_STATS.ATTEMPT_ID, attemptId)
.set(SYNC_STATS.UPDATED_AT, now)
.set(SYNC_STATS.BYTES_EMITTED, syncStats.getBytesEmitted())
.set(SYNC_STATS.RECORDS_EMITTED, syncStats.getRecordsEmitted())
.set(SYNC_STATS.ESTIMATED_RECORDS, syncStats.getEstimatedRecords())
.set(SYNC_STATS.ESTIMATED_BYTES, syncStats.getEstimatedBytes())
.set(SYNC_STATS.RECORDS_COMMITTED, syncStats.getRecordsCommitted())
.set(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED, syncStats.getSourceStateMessagesEmitted())
.set(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED, syncStats.getDestinationStateMessagesEmitted())
Expand All @@ -387,6 +441,50 @@ private static void writeSyncStats(final OffsetDateTime now, final SyncStats syn
.execute();
}

private static void saveToStreamStatsTable(final OffsetDateTime now,
final List<StreamSyncStats> perStreamStats,
final Long attemptId,
final DSLContext ctx) {
Optional.ofNullable(perStreamStats).orElse(Collections.emptyList()).forEach(
streamStats -> {
// We cannot entirely rely on JOOQ's generated SQL for upserts as it does not support null fields
// for conflict detection. We are forced to separately check for existence.
final var stats = streamStats.getStats();
final var isExisting =
ctx.fetchExists(STREAM_STATS, STREAM_STATS.ATTEMPT_ID.eq(attemptId).and(STREAM_STATS.STREAM_NAME.eq(streamStats.getStreamName()))
.and(PersistenceHelpers.isNullOrEquals(STREAM_STATS.STREAM_NAMESPACE, streamStats.getStreamNamespace())));
if (isExisting) {
ctx.update(STREAM_STATS)
.set(STREAM_STATS.UPDATED_AT, now)
.set(STREAM_STATS.BYTES_EMITTED, stats.getBytesEmitted())
.set(STREAM_STATS.RECORDS_EMITTED, stats.getRecordsEmitted())
.set(STREAM_STATS.ESTIMATED_RECORDS, stats.getEstimatedRecords())
.set(STREAM_STATS.ESTIMATED_BYTES, stats.getEstimatedBytes())
.where(STREAM_STATS.ATTEMPT_ID.eq(attemptId))
.execute();
return;
}

ctx.insertInto(STREAM_STATS)
.set(STREAM_STATS.ID, UUID.randomUUID())
.set(STREAM_STATS.ATTEMPT_ID, attemptId)
.set(STREAM_STATS.STREAM_NAME, streamStats.getStreamName())
.set(STREAM_STATS.STREAM_NAMESPACE, streamStats.getStreamNamespace())
.set(STREAM_STATS.CREATED_AT, now)
.set(STREAM_STATS.UPDATED_AT, now)
.set(STREAM_STATS.BYTES_EMITTED, stats.getBytesEmitted())
.set(STREAM_STATS.RECORDS_EMITTED, stats.getRecordsEmitted())
.set(STREAM_STATS.ESTIMATED_BYTES, stats.getEstimatedBytes())
.set(STREAM_STATS.ESTIMATED_RECORDS, stats.getEstimatedRecords())
.set(STREAM_STATS.UPDATED_AT, now)
.set(STREAM_STATS.BYTES_EMITTED, stats.getBytesEmitted())
.set(STREAM_STATS.RECORDS_EMITTED, stats.getRecordsEmitted())
.set(STREAM_STATS.ESTIMATED_BYTES, stats.getEstimatedBytes())
.set(STREAM_STATS.ESTIMATED_RECORDS, stats.getEstimatedRecords())
.execute();
});
}

@Override
public void writeAttemptFailureSummary(final long jobId, final int attemptNumber, final AttemptFailureSummary failureSummary) throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);
Expand All @@ -400,14 +498,16 @@ public void writeAttemptFailureSummary(final long jobId, final int attemptNumber
}

@Override
public List<SyncStats> getSyncStats(final long jobId, final int attemptNumber) throws IOException {
public AttemptStats getAttemptStats(final long jobId, final int attemptNumber) throws IOException {
return jobDatabase
.query(ctx -> {
final Long attemptId = getAttemptId(jobId, attemptNumber, ctx);
return ctx.select(DSL.asterisk()).from(DSL.table("sync_stats")).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.fetch(getSyncStatsRecordMapper())
.stream()
.toList();
final var syncStats = ctx.select(DSL.asterisk()).from(SYNC_STATS).where(SYNC_STATS.ATTEMPT_ID.eq(attemptId))
.orderBy(SYNC_STATS.UPDATED_AT.desc())
.fetchOne(getSyncStatsRecordMapper());
final var perStreamStats = ctx.select(DSL.asterisk()).from(STREAM_STATS).where(STREAM_STATS.ATTEMPT_ID.eq(attemptId))
.fetch(getStreamStatsRecordsMapper());
return new AttemptStats(syncStats, perStreamStats);
});
}

Expand All @@ -423,7 +523,8 @@ public List<NormalizationSummary> getNormalizationSummary(final long jobId, fina
});
}

private static Long getAttemptId(final long jobId, final int attemptNumber, final DSLContext ctx) {
@VisibleForTesting
static Long getAttemptId(final long jobId, final int attemptNumber, final DSLContext ctx) {
final Optional<Record> record =
ctx.fetch("SELECT id from attempts where job_id = ? AND attempt_number = ?", jobId,
attemptNumber).stream().findFirst();
Expand All @@ -432,6 +533,7 @@ private static Long getAttemptId(final long jobId, final int attemptNumber, fina

private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
return record -> new SyncStats().withBytesEmitted(record.get(SYNC_STATS.BYTES_EMITTED)).withRecordsEmitted(record.get(SYNC_STATS.RECORDS_EMITTED))
.withEstimatedBytes(record.get(SYNC_STATS.ESTIMATED_BYTES)).withEstimatedRecords(record.get(SYNC_STATS.ESTIMATED_RECORDS))
.withSourceStateMessagesEmitted(record.get(SYNC_STATS.SOURCE_STATE_MESSAGES_EMITTED))
.withDestinationStateMessagesEmitted(record.get(SYNC_STATS.DESTINATION_STATE_MESSAGES_EMITTED))
.withRecordsCommitted(record.get(SYNC_STATS.RECORDS_COMMITTED))
Expand All @@ -441,8 +543,19 @@ private static RecordMapper<Record, SyncStats> getSyncStatsRecordMapper() {
.withMaxSecondsBetweenStateMessageEmittedandCommitted(record.get(SYNC_STATS.MAX_SECONDS_BETWEEN_STATE_MESSAGE_EMITTED_AND_COMMITTED));
}

private static RecordMapper<Record, StreamSyncStats> getStreamStatsRecordsMapper() {
return record -> {
final var stats = new SyncStats()
.withEstimatedRecords(record.get(STREAM_STATS.ESTIMATED_RECORDS)).withEstimatedBytes(record.get(STREAM_STATS.ESTIMATED_BYTES))
.withRecordsEmitted(record.get(STREAM_STATS.RECORDS_EMITTED)).withBytesEmitted(record.get(STREAM_STATS.BYTES_EMITTED));
return new StreamSyncStats()
.withStreamName(record.get(STREAM_STATS.STREAM_NAME)).withStreamNamespace(record.get(STREAM_STATS.STREAM_NAMESPACE))
.withStats(stats);
};
}

private static RecordMapper<Record, NormalizationSummary> getNormalizationSummaryRecordMapper() {
final RecordMapper<Record, NormalizationSummary> recordMapper = record -> {
return record -> {
try {
return new NormalizationSummary().withStartTime(record.get(NORMALIZATION_SUMMARIES.START_TIME).toInstant().toEpochMilli())
.withEndTime(record.get(NORMALIZATION_SUMMARIES.END_TIME).toInstant().toEpochMilli())
Expand All @@ -451,7 +564,6 @@ private static RecordMapper<Record, NormalizationSummary> getNormalizationSummar
throw new RuntimeException(e);
}
};
return recordMapper;
}

private static List<FailureReason> deserializeFailureReasons(final Record record) throws JsonProcessingException {
Expand Down
Loading