Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map number(integer) into an integer rather than a float #20730

Merged
merged 10 commits into from
Jan 4, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,6 @@
dockerImageTag: 1.2.9
documentationUrl: https://docs.airbyte.com/integrations/destinations/bigquery
icon: bigquery.svg
normalizationConfig:
normalizationRepository: airbyte/normalization
normalizationTag: 0.2.25
normalizationIntegrationType: bigquery
resourceRequirements:
jobSpecific:
- jobType: sync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -68,15 +69,15 @@ private ArrayFormatter getArrayFormatter() {
return arrayFormatter;
}

public void setArrayFormatter(ArrayFormatter arrayFormatter) {
public void setArrayFormatter(final ArrayFormatter arrayFormatter) {
this.arrayFormatter = arrayFormatter;
this.jsonSchema = formatJsonSchema(this.originalJsonSchema.deepCopy());
this.bigQuerySchema = getBigQuerySchema(jsonSchema);
}

@Override
protected JsonNode formatJsonSchema(final JsonNode jsonSchema) {
var modifiedJsonSchema = jsonSchema.deepCopy(); // Issue #5912 is reopened (PR #11166) formatAllOfAndAnyOfFields(namingResolver, jsonSchema);
final var modifiedJsonSchema = jsonSchema.deepCopy(); // Issue #5912 is reopened (PR #11166) formatAllOfAndAnyOfFields(namingResolver, jsonSchema);
getArrayFormatter().populateEmptyArrays(modifiedJsonSchema);
getArrayFormatter().surroundArraysByObjects(modifiedJsonSchema);
return modifiedJsonSchema;
Expand Down Expand Up @@ -117,7 +118,7 @@ private JsonNode formatData(final FieldList fields, final JsonNode root) {
if (fields == null) {
return root;
}
JsonNode formattedData;
final JsonNode formattedData;
if (root.isObject()) {
formattedData = getObjectNode(fields, root);
} else if (root.isArray()) {
Expand Down Expand Up @@ -151,7 +152,7 @@ private JsonNode getArrayNode(final FieldList fields, final JsonNode root) {
} else {
subFields = arrayField.getSubFields();
}
List<JsonNode> arrayItems = MoreIterators.toList(root.elements()).stream()
final List<JsonNode> arrayItems = MoreIterators.toList(root.elements()).stream()
.map(p -> formatData(subFields, p))
.toList();

Expand Down Expand Up @@ -245,15 +246,15 @@ private static JsonNode getFileDefinition(final JsonNode fieldDefinition) {
}

private static JsonNode allOfAndAnyOfFieldProcessing(final String fieldName, final JsonNode fieldDefinition) {
ObjectReader reader = mapper.readerFor(new TypeReference<List<JsonNode>>() {});
List<JsonNode> list;
final ObjectReader reader = mapper.readerFor(new TypeReference<List<JsonNode>>() {});
final List<JsonNode> list;
try {
list = reader.readValue(fieldDefinition.get(fieldName));
} catch (IOException e) {
} catch (final IOException e) {
throw new IllegalStateException(
String.format("Failed to read and process the following field - %s", fieldDefinition));
}
ObjectNode objectNode = mapper.createObjectNode();
final ObjectNode objectNode = mapper.createObjectNode();
list.forEach(field -> {
objectNode.set("big_query_" + field.get("type").asText(), field);
});
Expand All @@ -268,8 +269,8 @@ private static JsonNode allOfAndAnyOfFieldProcessing(final String fieldName, fin
private static Builder getField(final StandardNameTransformer namingResolver, final String key, final JsonNode fieldDefinition) {
final String fieldName = namingResolver.getIdentifier(key);
final Builder builder = Field.newBuilder(fieldName, StandardSQLTypeName.STRING);
JsonNode updatedFileDefinition = getFileDefinition(fieldDefinition);
JsonNode type = updatedFileDefinition.get(TYPE_FIELD);
final JsonNode updatedFileDefinition = getFileDefinition(fieldDefinition);
final JsonNode type = updatedFileDefinition.get(TYPE_FIELD);
final JsonNode airbyteType = updatedFileDefinition.get(AIRBYTE_TYPE);
final List<JsonSchemaType> fieldTypes = getTypes(fieldName, type);
for (int i = 0; i < fieldTypes.size(); i++) {
Expand All @@ -288,9 +289,11 @@ private static Builder getField(final StandardNameTransformer namingResolver, fi
builder.setType(primaryType.getBigQueryType());
}
case NUMBER -> {
if (airbyteType != null && airbyteType.asText().equals("big_integer")) {
if (airbyteType != null
&& StringUtils.equalsAnyIgnoreCase(airbyteType.asText(),
"big_integer", "integer")) {
builder.setType(StandardSQLTypeName.INT64);
} else {
} else {
builder.setType(primaryType.getBigQueryType());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/bigquery",
"supportsIncremental": true,
"supportsNormalization": false,
"supportsDBT": true,
"supportsDBT": false,
"supported_destination_sync_modes": ["overwrite", "append"],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,6 @@ protected JsonNode getFailCheckConfig() {
return config;
}

@Override
protected boolean supportsDBT() {
return true;
}

@Override
protected boolean implementsNamespaces() {
return true;
Expand Down