Skip to content

Commit

Permalink
Merge branch 'master' into fleetio-source-connector-cursor-pagination
Browse files Browse the repository at this point in the history
  • Loading branch information
jmmizerany committed Feb 15, 2024
2 parents 75eff0b + b05c490 commit 156b5ad
Show file tree
Hide file tree
Showing 503 changed files with 27,561 additions and 5,905 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.50.48
current_version = 0.50.50
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
37 changes: 0 additions & 37 deletions .github/workflows/jacoco_report.yml

This file was deleted.

8 changes: 8 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,14 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.20.9 | 2024-02-15 | [\#35240](https://github.com/airbytehq/airbyte/pull/35240) | Make state emission to platform inside state manager itself. |
| 0.20.8 | 2024-02-15 | [\#35285](https://github.com/airbytehq/airbyte/pull/35285) | Improve blobstore module structure. |
| 0.20.7 | 2024-02-13 | [\#35236](https://github.com/airbytehq/airbyte/pull/35236) | output logs to files in addition to stdout when running tests |
| 0.20.6 | 2024-02-12 | [\#35036](https://github.com/airbytehq/airbyte/pull/35036) | Add trace utility to emit analytics messages. |
| 0.20.5 | 2024-02-13 | [\#34869](https://github.com/airbytehq/airbyte/pull/34869) | Don't emit final state in SourceStateIterator there is an underlying stream failure. |
| 0.20.4 | 2024-02-12 | [\#35042](https://github.com/airbytehq/airbyte/pull/35042) | Use delegate's isDestinationV2 invocation in SshWrappedDestination. |
| 0.20.3 | 2024-02-09 | [\#34580](https://github.com/airbytehq/airbyte/pull/34580) | Support special chars in mysql/mssql database name. |
| 0.20.2 | 2024-02-12 | [\#35111](https://github.com/airbytehq/airbyte/pull/35144) | Make state emission from async framework synchronized. |
| 0.20.1 | 2024-02-11 | [\#35111](https://github.com/airbytehq/airbyte/pull/35111) | Fix GlobalAsyncStateManager stats counting logic. |
| 0.20.0 | 2024-02-09 | [\#34562](https://github.com/airbytehq/airbyte/pull/34562) | Add new test cases to BaseTypingDedupingTest to exercise special characters. |
| 0.19.0 | 2024-02-01 | [\#34745](https://github.com/airbytehq/airbyte/pull/34745) | Reorganize CDK module structure. |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.db;

import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage;

/**
* Utility class to define constants associated with database source connector analytics events.
* Make sure to add the analytics event to
* https://www.notion.so/Connector-Analytics-Events-892a79a49852465f8d59a18bd84c36de
*/
public class DbAnalyticsUtils {

public static final String CDC_CURSOR_INVALID_KEY = "db-sources-cdc-cursor-invalid";

public static AirbyteAnalyticsTraceMessage cdcCursorInvalidMessage() {
return new AirbyteAnalyticsTraceMessage().withType(CDC_CURSOR_INVALID_KEY).withValue("1");
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
package io.airbyte.cdk.integrations.base;

import io.airbyte.commons.stream.AirbyteStreamStatusHolder;
import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteErrorTraceMessage.FailureType;
import io.airbyte.protocol.models.v0.AirbyteEstimateTraceMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteTraceMessage;
import java.time.Instant;
import java.util.function.Consumer;
import org.apache.commons.lang3.exception.ExceptionUtils;

Expand Down Expand Up @@ -50,6 +52,10 @@ public static void emitEstimateTrace(final long byteEstimate,
.withNamespace(streamNamespace))));
}

public static void emitAnalyticsTrace(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) {
emitMessage(makeAnalyticsTraceAirbyteMessage(airbyteAnalyticsTraceMessage));
}

public static void emitErrorTrace(final Throwable e, final String displayMessage, final FailureType failureType) {
emitMessage(makeErrorTraceAirbyteMessage(e, displayMessage, failureType));
}
Expand Down Expand Up @@ -86,6 +92,14 @@ private static AirbyteMessage makeErrorTraceAirbyteMessage(
.withStackTrace(ExceptionUtils.getStackTrace(e))));
}

private static AirbyteMessage makeAnalyticsTraceAirbyteMessage(final AirbyteAnalyticsTraceMessage airbyteAnalyticsTraceMessage) {
return new AirbyteMessage().withType(Type.TRACE)
.withTrace(new AirbyteTraceMessage()
.withAnalytics(airbyteAnalyticsTraceMessage)
.withType(AirbyteTraceMessage.Type.ANALYTICS)
.withEmittedAt((double) Instant.now().toEpochMilli()));
}

private static AirbyteMessage makeStreamStatusTraceAirbyteMessage(final AirbyteStreamStatusHolder airbyteStreamStatusHolder) {
return makeAirbyteMessageFromTraceMessage(airbyteStreamStatusHolder.toTraceMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,9 @@ protected SshTunnel getTunnelInstance(final JsonNode config) throws Exception {
: getInstance(config, hostKey, portKey);
}

@Override
public boolean isV2Destination() {
return delegate.isV2Destination();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@
import io.airbyte.cdk.integrations.destination_async.buffers.StreamAwareQueue.MessageWithMeta;
import io.airbyte.cdk.integrations.destination_async.state.FlushFailure;
import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager;
import io.airbyte.cdk.integrations.destination_async.state.PartialStateWithDestinationStats;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -170,7 +167,7 @@ private void flush(final StreamDescriptor desc, final UUID flushWorkerId) {
AirbyteFileUtils.byteCountToDisplaySize(batch.getSizeInBytes()));

flusher.flush(desc, batch.getData().stream().map(MessageWithMeta::message));
emitStateMessages(batch.flushStates(stateIdToCount));
batch.flushStates(stateIdToCount, outputRecordCollector);
}

log.info("Flush Worker ({}) -- Worker finished flushing. Current queue size: {}",
Expand Down Expand Up @@ -220,7 +217,7 @@ public void close() throws Exception {
log.info("Closing flush workers -- all buffers flushed");

// before shutting down the supervisor, flush all state.
emitStateMessages(stateManager.flushStates());
stateManager.flushStates(outputRecordCollector);
supervisorThread.shutdown();
while (!supervisorThread.awaitTermination(5L, TimeUnit.MINUTES)) {
log.info("Waiting for flush worker supervisor to shut down");
Expand All @@ -237,14 +234,6 @@ public void close() throws Exception {
debugLoop.shutdownNow();
}

private void emitStateMessages(final List<PartialStateWithDestinationStats> partials) {
for (final PartialStateWithDestinationStats partial : partials) {
final AirbyteMessage message = Jsons.deserialize(partial.stateMessage().getSerialized(), AirbyteMessage.class);
message.getState().setDestinationStats(partial.stats());
outputRecordCollector.accept(message);
}
}

private static String humanReadableFlushWorkerId(final UUID flushWorkerId) {
return flushWorkerId.toString().substring(0, 5);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager;
import io.airbyte.cdk.integrations.destination_async.buffers.StreamAwareQueue.MessageWithMeta;
import io.airbyte.cdk.integrations.destination_async.state.GlobalAsyncStateManager;
import io.airbyte.cdk.integrations.destination_async.state.PartialStateWithDestinationStats;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -57,16 +58,13 @@ public void close() throws Exception {
}

/**
* For the batch, marks all the states that have now been flushed. Also returns states that can be
* flushed. This method is descriptrive, it assumes that whatever consumes the state messages emits
* them, internally it purges the states it returns. message that it can.
* For the batch, marks all the states that have now been flushed. Also writes the states that can
* be flushed back to platform via stateManager.
* <p>
*
* @return list of states that can be flushed
*/
public List<PartialStateWithDestinationStats> flushStates(final Map<Long, Long> stateIdToCount) {
public void flushStates(final Map<Long, Long> stateIdToCount, final Consumer<AirbyteMessage> outputRecordCollector) {
stateIdToCount.forEach(stateManager::decrement);
return stateManager.flushStates();
stateManager.flushStates(outputRecordCollector);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import com.google.common.base.Strings;
import io.airbyte.cdk.integrations.destination_async.GlobalMemoryManager;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;
import io.airbyte.protocol.models.v0.StreamDescriptor;
import java.util.ArrayList;
import java.time.Instant;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -25,6 +25,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
Expand Down Expand Up @@ -153,16 +154,12 @@ public void decrement(final long stateId, final long count) {
}

/**
* Returns state messages with no more inflight records i.e. counter = 0 across all streams.
* Flushes state messages with no more inflight records i.e. counter = 0 across all streams.
* Intended to be called by {@link io.airbyte.cdk.integrations.destination_async.FlushWorkers} after
* a worker has finished flushing its record batch.
* <p>
* The return list of states should be emitted back to the platform.
*
* @return list of state messages with no more inflight records.
*/
public List<PartialStateWithDestinationStats> flushStates() {
final List<PartialStateWithDestinationStats> output = new ArrayList<>();
public void flushStates(final Consumer<AirbyteMessage> outputRecordCollector) {
Long bytesFlushed = 0L;
synchronized (LOCK) {
for (final Map.Entry<StreamDescriptor, LinkedBlockingDeque<Long>> entry : descToStateIdQ.entrySet()) {
Expand Down Expand Up @@ -195,9 +192,13 @@ public List<PartialStateWithDestinationStats> flushStates() {
if (allRecordsCommitted) {
final StateMessageWithArrivalNumber stateMessage = oldestState.getLeft();
final double flushedRecordsAssociatedWithState = stateIdToCounterForPopulatingDestinationStats.get(oldestStateId).doubleValue();
LOGGER.info("State with arrival number {} emitted", stateMessage.arrivalNumber);
output.add(new PartialStateWithDestinationStats(stateMessage.partialAirbyteStateMessage(),
new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState)));

log.info("State with arrival number {} emitted from thread {} at {}", stateMessage.arrivalNumber(), Thread.currentThread().getName(),
Instant.now().toString());
final AirbyteMessage message = Jsons.deserialize(stateMessage.partialAirbyteStateMessage.getSerialized(), AirbyteMessage.class);
message.getState().setDestinationStats(new AirbyteStateStats().withRecordCount(flushedRecordsAssociatedWithState));
outputRecordCollector.accept(message);

bytesFlushed += oldestState.getRight();

// cleanup
Expand All @@ -213,7 +214,6 @@ public List<PartialStateWithDestinationStats> flushStates() {
}

freeBytes(bytesFlushed);
return output;
}

private Long getStateIdAndIncrement(final StreamDescriptor streamDescriptor, final long increment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteStateStats;

public record PartialStateWithDestinationStats(PartialAirbyteMessage stateMessage, AirbyteStateStats stats) {}
public record PartialStateWithDestinationStats(PartialAirbyteMessage stateMessage, AirbyteStateStats stats, long stateArrivalNumber) {}
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.20.1
version=0.20.9
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public void setUpOut() {
System.setOut(new PrintStream(outContent, true, StandardCharsets.UTF_8));
}

private void assertJsonNodeIsTraceMessage(JsonNode jsonNode) {
private void assertJsonNodeIsTraceMessage(final JsonNode jsonNode) {
// todo: this check could be better by actually trying to convert the JsonNode to an
// AirbyteTraceMessage instance
Assertions.assertEquals("TRACE", jsonNode.get("type").asText());
Expand All @@ -36,15 +36,15 @@ private void assertJsonNodeIsTraceMessage(JsonNode jsonNode) {
@Test
void testEmitSystemErrorTrace() {
AirbyteTraceMessageUtility.emitSystemErrorTrace(Mockito.mock(RuntimeException.class), "this is a system error");
JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
assertJsonNodeIsTraceMessage(outJson);
Assertions.assertEquals("system_error", outJson.get("trace").get("error").get("failure_type").asText());
}

@Test
void testEmitConfigErrorTrace() {
AirbyteTraceMessageUtility.emitConfigErrorTrace(Mockito.mock(RuntimeException.class), "this is a config error");
JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
assertJsonNodeIsTraceMessage(outJson);
Assertions.assertEquals("config_error", outJson.get("trace").get("error").get("failure_type").asText());
}
Expand All @@ -58,11 +58,11 @@ void testEmitErrorTrace() {
@Test
void testCorrectStacktraceFormat() {
try {
int x = 1 / 0;
} catch (Exception e) {
final int x = 1 / 0;
} catch (final Exception e) {
AirbyteTraceMessageUtility.emitSystemErrorTrace(e, "you exploded the universe");
}
JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
final JsonNode outJson = Jsons.deserialize(outContent.toString(StandardCharsets.UTF_8));
Assertions.assertTrue(outJson.get("trace").get("error").get("stack_trace").asText().contains("\n\tat"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class BufferDequeueTest {

private static final int RECORD_SIZE_20_BYTES = 20;
private static final String DEFAULT_NAMESPACE = "foo_namespace";
public static final String RECORD_20_BYTES = "abc";
private static final String STREAM_NAME = "stream1";
private static final StreamDescriptor STREAM_DESC = new StreamDescriptor().withName(STREAM_NAME);
private static final PartialAirbyteMessage RECORD_MSG_20_BYTES = new PartialAirbyteMessage()
Expand Down
Loading

0 comments on commit 156b5ad

Please sign in to comment.