Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jun 6, 2024
1 parent 1be53b3 commit eef7413
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ airbyteJavaConnector {
'gcs-destinations',
'core',
]
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
dockerImageTag: 2.6.1
dockerImageTag: 2.6.2
dockerRepository: airbyte/destination-bigquery
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
githubIssueLabel: destination-bigquery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ object BigQueryConsumerFactory {
catalog = catalog,
bufferManager =
BufferManager(
defaultNamespace,
(Runtime.getRuntime().maxMemory() * 0.4).toLong(),
),
defaultNamespace = Optional.of(defaultNamespace),
)
}

Expand All @@ -59,9 +59,9 @@ object BigQueryConsumerFactory {
catalog = catalog,
bufferManager =
BufferManager(
defaultNamespace,
(Runtime.getRuntime().maxMemory() * 0.5).toLong(),
),
defaultNamespace = Optional.of(defaultNamespace),
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ import java.io.ByteArrayInputStream
import java.io.IOException
import java.util.*
import java.util.function.Consumer
import org.apache.commons.lang3.StringUtils

private val log = KotlinLogging.logger {}

Expand Down Expand Up @@ -190,7 +189,6 @@ class BigQueryDestination : BaseConnector(), Destination {
): SerializedAirbyteMessageConsumer {
val uploadingMethod = getLoadingMethod(config)
val defaultNamespace = getDatasetId(config)
setDefaultStreamNamespace(catalog, defaultNamespace)
val disableTypeDedupe = getDisableTypeDedupFlag(config)
val datasetLocation = getDatasetLocation(config)
val projectId = config[bqConstants.CONFIG_PROJECT_ID].asText()
Expand Down Expand Up @@ -222,6 +220,7 @@ class BigQueryDestination : BaseConnector(), Destination {
val parsedCatalog =
parseCatalog(
sqlGenerator,
defaultNamespace,
rawNamespaceOverride.orElse(JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE),
catalog,
)
Expand Down Expand Up @@ -309,28 +308,18 @@ class BigQueryDestination : BaseConnector(), Destination {
)
}

private fun setDefaultStreamNamespace(catalog: ConfiguredAirbyteCatalog, namespace: String) {
// Set the default originalNamespace on streams with null originalNamespace. This means we
// don't
// need to repeat this
// logic in the rest of the connector.
// (record messages still need to handle null namespaces though, which currently happens in
// e.g.
// AsyncStreamConsumer#accept)
// This probably should be shared logic amongst destinations eventually.
for (stream in catalog.streams) {
if (StringUtils.isEmpty(stream.stream.namespace)) {
stream.stream.withNamespace(namespace)
}
}
}

private fun parseCatalog(
sqlGenerator: BigQuerySqlGenerator,
defaultNamespace: String,
rawNamespaceOverride: String,
catalog: ConfiguredAirbyteCatalog
): ParsedCatalog {
val catalogParser = CatalogParser(sqlGenerator, rawNamespaceOverride)
val catalogParser =
CatalogParser(
sqlGenerator,
defaultNamespace = defaultNamespace,
rawNamespace = rawNamespaceOverride,
)

return catalogParser.parseCatalog(catalog)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void testBuildColumnId() {

@Test
void columnCollision() {
final CatalogParser parser = new CatalogParser(generator);
final CatalogParser parser = new CatalogParser(generator, "default_ns");
assertEquals(
new StreamConfig(
new StreamId("bar", "foo", "airbyte_internal", "bar_raw__stream_foo", "bar", "foo"),
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/bigquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ tutorials:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 2.6.2 | 2024-05-30 | [38331](https://github.com/airbytehq/airbyte/pull/38331) | Internal code changes in preparation for future feature release |
| 2.6.1 | 2024-05-29 | [38770](https://github.com/airbytehq/airbyte/pull/38770) | Internal code change (switch to CDK artifact) |
| 2.6.0 | 2024-05-28 | [38359](https://github.com/airbytehq/airbyte/pull/38359) | Propagate airbyte_meta from sources; add generation_id column |
| 2.5.1 | 2024-05-22 | [38591](https://github.com/airbytehq/airbyte/pull/38591) | Bugfix to include forward-slash when cleaning up stage |
Expand Down

0 comments on commit eef7413

Please sign in to comment.