Skip to content

Commit

Permalink
[cdk, source-mysql/postgres/mssql/mongodb-v2] log record count to sta…
Browse files Browse the repository at this point in the history
…te message (#42869)

Co-authored-by: Yue Li <yue.li@airbyte.io>
Co-authored-by: Yue Li <61070669+theyueli@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 7, 2024
1 parent 7ab66a8 commit 97d822f
Show file tree
Hide file tree
Showing 15 changed files with 41 additions and 23 deletions.
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ corresponds to that version.

| Version | Date | Pull Request | Subject |
|:-----------|:-----------|:------------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.44.2 | 2024-08-06 | [\#42869](https://github.com/airbytehq/airbyte/pull/42869) | Add logs about counting info to state message. |
| 0.44.1 | 2024-08-01 | [\#42550](https://github.com/airbytehq/airbyte/pull/42550) | Fix error on reporting counts. |
| 0.44.0 | 2024-08-01 | [\#42405](https://github.com/airbytehq/airbyte/pull/42405) | s3-destinations: Use async framework, adapt to support refreshes |
| 0.43.6 | 2024-07-30 | [\#42540](https://github.com/airbytehq/airbyte/pull/42540) | Fix generationId handling for destinations |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.44.1
version=0.44.2
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.integrations.source.relationaldb.state
import com.google.common.collect.AbstractIterator
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteStateStats
import io.airbyte.protocol.models.v0.AirbyteStreamNameNamespacePair
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.github.oshai.kotlinlogging.KotlinLogging
import java.time.Duration
Expand All @@ -22,6 +23,7 @@ open class SourceStateIterator<T>(
) : AbstractIterator<AirbyteMessage>(), MutableIterator<AirbyteMessage> {
private var hasEmittedFinalState = false
private var recordCount = 0L
private var streamRecordCount = mutableMapOf<AirbyteStreamNameNamespacePair, Long>()
private var lastCheckpoint: Instant = Instant.now()

override fun computeNext(): AirbyteMessage? {
Expand All @@ -45,8 +47,11 @@ open class SourceStateIterator<T>(
stateMessage!!.withSourceStats(
AirbyteStateStats().withRecordCount(recordCount.toDouble())
)
LOGGER.info { "sending state message, with count per stream: $streamRecordCount " }

recordCount = 0L
streamRecordCount.clear()

lastCheckpoint = Instant.now()
return AirbyteMessage().withType(AirbyteMessage.Type.STATE).withState(stateMessage)
}
Expand All @@ -56,7 +61,15 @@ open class SourceStateIterator<T>(
val message = messageIterator.next()
val processedMessage =
sourceStateMessageProducer.processRecordMessage(stream, message)
recordCount++
if (processedMessage.type == AirbyteMessage.Type.RECORD) {
val pair =
AirbyteStreamNameNamespacePair(
processedMessage.record.stream,
processedMessage.record.namespace
)
streamRecordCount[pair] = streamRecordCount.getOrPut(pair) { 0 } + 1
recordCount++
}
return processedMessage
} catch (e: Exception) {
throw FailedRecordIteratorException(e)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.43.4'
cdkVersionRequired = '0.44.2'
features = ['db-sources', 'datastore-mongo']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e
dockerImageTag: 1.5.1
dockerImageTag: 1.5.2
dockerRepository: airbyte/source-mongodb-v2
documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2
githubIssueLabel: source-mongodb-v2
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.1'
cdkVersionRequired = '0.44.2'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.1.6
dockerImageTag: 4.1.7
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.1'
cdkVersionRequired = '0.44.2'
features = ['db-sources']
useLocalCdk = false
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.6.7
dockerImageTag: 3.6.8
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ java {
}

airbyteJavaConnector {
cdkVersionRequired = '0.44.1'
cdkVersionRequired = '0.44.2'
features = ['db-sources', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerImageTag: 3.6.12
dockerImageTag: 3.6.13
dockerRepository: airbyte/source-postgres
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
githubIssueLabel: source-postgres
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mongodb-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ For more information regarding configuration parameters, please see [MongoDb Doc
<summary>Expand to review</summary>

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------- |
|:--------| :--------- | :------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------- |
| 1.5.2 | 2024-08-06 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
| 1.5.1 | 2024-08-01 | [42549](https://github.com/airbytehq/airbyte/pull/42549) | Centered the connector icon. |
| 1.5.0 | 2024-07-26 | [42561](https://github.com/airbytehq/airbyte/pull/42561) | Implement WASS algorithm. |
| 1.4.3 | 2024-07-22 | [39145](https://github.com/airbytehq/airbyte/pull/39145) | Warn (vs fail) on different \_id types in collection. |
Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mssql.md
Original file line number Diff line number Diff line change
Expand Up @@ -422,10 +422,11 @@ WHERE actor_definition_id ='b5ea17b1-f170-46dc-bc31-cc744ca984c1' AND (configura

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 4.1.7 | 2024-08-06 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
| 4.1.6 | 2024-07-30 | [42550](https://github.com/airbytehq/airbyte/pull/42550) | Correctly report stream states. |
| 4.1.5 | 2024-07-29 | [42852](https://github.com/airbytehq/airbyte/pull/42852) | Bump CDK version to latest to use new bug fixes on error translation. |
| 4.1.4 | 2024-07-23 | [42421](https://github.com/airbytehq/airbyte/pull/42421) | Remove final transient error emitter iterators. |
| 4.1.3 | |2024-07-22 | [42411](https://github.com/airbytehq/airbyte/pull/42411) | Hide the "initial load timeout in hours" field by default in UI |
| 4.1.3 | |2024-07-22 | [42411](https://github.com/airbytehq/airbyte/pull/42411) | Hide the "initial load timeout in hours" field by default in UI |
| 4.1.2 | 2024-07-22 | [42024](https://github.com/airbytehq/airbyte/pull/42024) | Fix a NPE bug on resuming from a failed attempt. |
| 4.1.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. |
| 4.1.0 | 2024-07-17 | [42078](https://github.com/airbytehq/airbyte/pull/42078) | WASS analytics + bug fixes. |
Expand Down
23 changes: 12 additions & 11 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,18 @@ Any database or table encoding combination of charset and collation is supported

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.6.7 | 2024-07-30 | [42550](https://github.com/airbytehq/airbyte/pull/42550) | Correctly report stream states. |
| 3.6.6 | 2024-07-29 | [42852](https://github.com/airbytehq/airbyte/pull/42852) | Bump CDK version to latest to use new bug fixes on error translation. |
| 3.6.5 | 2024-07-24 | [42417](https://github.com/airbytehq/airbyte/pull/42417) | Handle null error message in ConnectorExceptionHandler. |
| 3.6.4 | 2024-07-23 | [42421](https://github.com/airbytehq/airbyte/pull/42421) | Remove final transient error emitter iterators. |
| 3.6.3 | 2024-07-22 | [42024](https://github.com/airbytehq/airbyte/pull/42024) | Fix a NPE bug on resuming from a failed attempt. |
| 3.6.2 | 2024-07-17 | [42087](https://github.com/airbytehq/airbyte/pull/42087) | Adding more error translations for MySql source. |
| 3.6.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. |
| 3.6.0 | 2024-07-17 | [40208](https://github.com/airbytehq/airbyte/pull/40208) | Start using the new error MySql source error handler that comes with a new error translation layer. |
| 3.5.1 | 2024-07-17 | [42043](https://github.com/airbytehq/airbyte/pull/42043) | Adopt latest CDK + fixes. |
| 3.5.0 | 2024-07-11 | [38240](https://github.com/airbytehq/airbyte/pull/38240) | Implement WASS. |
| 3.4.12 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz heartbeat. |
| 3.6.8 | 2024-07-30 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
| 3.6.7 | 2024-07-30 | [42550](https://github.com/airbytehq/airbyte/pull/42550) | Correctly report stream states. |
| 3.6.6 | 2024-07-29 | [42852](https://github.com/airbytehq/airbyte/pull/42852) | Bump CDK version to latest to use new bug fixes on error translation. |
| 3.6.5 | 2024-07-24 | [42417](https://github.com/airbytehq/airbyte/pull/42417) | Handle null error message in ConnectorExceptionHandler. |
| 3.6.4 | 2024-07-23 | [42421](https://github.com/airbytehq/airbyte/pull/42421) | Remove final transient error emitter iterators. |
| 3.6.3 | 2024-07-22 | [42024](https://github.com/airbytehq/airbyte/pull/42024) | Fix a NPE bug on resuming from a failed attempt. |
| 3.6.2 | 2024-07-17 | [42087](https://github.com/airbytehq/airbyte/pull/42087) | Adding more error translations for MySql source. |
| 3.6.1 | 2024-07-19 | [42122](https://github.com/airbytehq/airbyte/pull/42122) | Improve wass error message + logging. |
| 3.6.0 | 2024-07-17 | [40208](https://github.com/airbytehq/airbyte/pull/40208) | Start using the new error MySql source error handler that comes with a new error translation layer. |
| 3.5.1 | 2024-07-17 | [42043](https://github.com/airbytehq/airbyte/pull/42043) | Adopt latest CDK + fixes. |
| 3.5.0 | 2024-07-11 | [38240](https://github.com/airbytehq/airbyte/pull/38240) | Implement WASS. |
| 3.4.12 | 2024-07-01 | [40516](https://github.com/airbytehq/airbyte/pull/40516) | Remove dbz heartbeat. |
| 3.4.11 | 2024-06-26 | [40561](https://github.com/airbytehq/airbyte/pull/40561) | Support PlanetScale MySQL's per-query row limit. |
| 3.4.10 | 2024-06-14 | [39349](https://github.com/airbytehq/airbyte/pull/39349) | Full refresh stream sending internal count metadata. |
| 3.4.9 | 2024-06-11 | [39405](https://github.com/airbytehq/airbyte/pull/39405) | Adopt latest CDK. |
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp

| Version | Date | Pull Request | Subject |
|---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.6.13 | 2024-07-30 | [42869](https://github.com/airbytehq/airbyte/pull/42869) | Adopt latest CDK. |
| 3.6.12 | 2024-07-30 | [42550](https://github.com/airbytehq/airbyte/pull/42550) | Correctly report stream states. |
| 3.6.11 | 2024-07-29 | [42852](https://github.com/airbytehq/airbyte/pull/42852) | Bump CDK version to latest to use new bug fixes on error translation. |
| 3.6.10 | 2024-07-23 | [42417](https://github.com/airbytehq/airbyte/pull/42417) | Handle null error message in ConnectorExceptionHandler. |
Expand Down

0 comments on commit 97d822f

Please sign in to comment.