Skip to content

Commit

Permalink
fix: delimited format should write decimals in a format it can read (#…
Browse files Browse the repository at this point in the history
…6238)

* fix: delimited format should write decimals in a format it can read

fixes: #6234

The `DELIMITED` format no longer writes `DECIMAL` numbers using thousand separators.  Decimals are simply written as numbers.  For example, what was previously serialized as `"1,234,567.89"` is now serialized as `1234567.89`.

Co-authored-by: Andy Coates <big-andy-coates@users.noreply.github.com>
  • Loading branch information
big-andy-coates and big-andy-coates authored Sep 22, 2020
1 parent 0ee402f commit 626965e
Show file tree
Hide file tree
Showing 18 changed files with 492 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.math.BigDecimal;
import java.math.MathContext;
import java.math.RoundingMode;
import java.text.DecimalFormat;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Schema.Type;
Expand Down Expand Up @@ -113,19 +112,6 @@ public static int precision(final Schema schema) {
}
}

/**
* Formats the decimal string, adding trailing zeros if necessary.
*
* @param value the value
* @return the formatted string
*/
public static String format(final int precision, final int scale, final BigDecimal value) {
final DecimalFormat format = new DecimalFormat();
format.setMinimumFractionDigits(scale);

return format.format(value);
}

/**
* @see #ensureFit(BigDecimal, SqlDecimal)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,33 +318,6 @@ public void shouldGetSchemaFromDecimal10_5() {
assertThat(schema, is(SqlTypes.decimal(10, 5)));
}

@Test
public void shouldConvertString() {
// When:
final String decimal = DecimalUtil.format(3, 1, new BigDecimal("12.1"));

// Then:
assertThat(decimal, is("12.1"));
}

@Test
public void shouldConvertToStringAndAddTrailingZeros() {
// When:
final String decimal = DecimalUtil.format(4, 2, new BigDecimal("12.1"));

// Then:
assertThat(decimal, is("12.10"));
}

@Test
public void shouldConvertToStringButNotAddLeadingZeros() {
// When:
final String decimal = DecimalUtil.format(100, 1, new BigDecimal("12.1"));

// Then:
assertThat(decimal, is("12.1"));
}

@Test
public void shouldFailIfBuilderWithZeroPrecision() {
// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.zookeeper.ZooKeeperClientException;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -939,10 +939,7 @@ private static Pair<String, SqlType> castString(
final SqlType schema = expr.getRight();
final String exprStr;
if (schema.baseType() == SqlBaseType.DECIMAL) {
final SqlDecimal decimal = (SqlDecimal) schema;
final int precision = decimal.getPrecision();
final int scale = decimal.getScale();
exprStr = String.format("DecimalUtil.format(%d, %d, %s)", precision, scale, expr.getLeft());
exprStr = expr.getLeft() + ".toPlainString()";
} else {
if (ksqlConfig.getBoolean(KsqlConfig.KSQL_STRING_CASE_CONFIG_TOGGLE)) {
exprStr = "Objects.toString(" + expr.getLeft() + ", null)";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,7 +891,7 @@ public void shouldGenerateCorrectCodeForDecimalToStringCast() {
final String java = sqlToJavaVisitor.process(cast);

// Then:
assertThat(java, is("DecimalUtil.format(2, 1, COL8)"));
assertThat(java, is("COL8.toPlainString()"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
{
"plan" : [ {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INPUT (ID STRING KEY, V DECIMAL(33, 16)) WITH (KAFKA_TOPIC='test_topic', VALUE_FORMAT='DELIMITED');",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INPUT",
"schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)",
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM INTERMEDIATE AS SELECT *\nFROM INPUT INPUT\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "INTERMEDIATE",
"schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)",
"topicName" : "INTERMEDIATE",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "INPUT" ],
"sink" : "INTERMEDIATE",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "INTERMEDIATE"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "test_topic",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"sourceSchema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "V AS V" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"topicName" : "INTERMEDIATE"
},
"queryId" : "CSAS_INTERMEDIATE_0"
}
}, {
"@type" : "ksqlPlanV1",
"statementText" : "CREATE STREAM OUTPUT AS SELECT *\nFROM INTERMEDIATE INTERMEDIATE\nEMIT CHANGES",
"ddlCommand" : {
"@type" : "createStreamV1",
"sourceName" : "OUTPUT",
"schema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)",
"topicName" : "OUTPUT",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"orReplace" : false
},
"queryPlan" : {
"sources" : [ "INTERMEDIATE" ],
"sink" : "OUTPUT",
"physicalPlan" : {
"@type" : "streamSinkV1",
"properties" : {
"queryContext" : "OUTPUT"
},
"source" : {
"@type" : "streamSelectV1",
"properties" : {
"queryContext" : "Project"
},
"source" : {
"@type" : "streamSourceV1",
"properties" : {
"queryContext" : "KsqlTopic/Source"
},
"topicName" : "INTERMEDIATE",
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"sourceSchema" : "`ID` STRING KEY, `V` DECIMAL(33, 16)"
},
"keyColumnNames" : [ "ID" ],
"selectExpressions" : [ "V AS V" ]
},
"formats" : {
"keyFormat" : {
"format" : "KAFKA"
},
"valueFormat" : {
"format" : "DELIMITED"
}
},
"topicName" : "OUTPUT"
},
"queryId" : "CSAS_OUTPUT_1"
}
} ],
"configs" : {
"ksql.extension.dir" : "ext",
"ksql.streams.cache.max.bytes.buffering" : "0",
"ksql.security.extension.class" : null,
"metric.reporters" : "",
"ksql.transient.prefix" : "transient_",
"ksql.query.status.running.threshold.seconds" : "300",
"ksql.streams.default.deserialization.exception.handler" : "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler",
"ksql.output.topic.name.prefix" : "",
"ksql.query.pull.enable.standby.reads" : "false",
"ksql.query.error.max.queue.size" : "10",
"ksql.internal.topic.min.insync.replicas" : "1",
"ksql.streams.shutdown.timeout.ms" : "300000",
"ksql.internal.topic.replicas" : "1",
"ksql.insert.into.values.enabled" : "true",
"ksql.query.pull.max.allowed.offset.lag" : "9223372036854775807",
"ksql.query.pull.max.qps" : "2147483647",
"ksql.access.validator.enable" : "auto",
"ksql.streams.bootstrap.servers" : "localhost:0",
"ksql.query.pull.metrics.enabled" : "false",
"ksql.create.or.replace.enabled" : "true",
"ksql.metrics.extension" : null,
"ksql.hidden.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.cast.strings.preserve.nulls" : "true",
"ksql.authorization.cache.max.entries" : "10000",
"ksql.pull.queries.enable" : "true",
"ksql.suppress.enabled" : "false",
"ksql.sink.window.change.log.additional.retention" : "1000000",
"ksql.readonly.topics" : "_confluent.*,__confluent.*,_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets,connect-status,connect-statuses",
"ksql.query.persistent.active.limit" : "2147483647",
"ksql.persistence.wrap.single.values" : null,
"ksql.authorization.cache.expiry.time.secs" : "30",
"ksql.query.retry.backoff.initial.ms" : "15000",
"ksql.schema.registry.url" : "",
"ksql.properties.overrides.denylist" : "",
"ksql.streams.auto.offset.reset" : "earliest",
"ksql.connect.url" : "http://localhost:8083",
"ksql.service.id" : "some.ksql.service.id",
"ksql.streams.default.production.exception.handler" : "io.confluent.ksql.errors.ProductionExceptionHandlerUtil$LogAndFailProductionExceptionHandler",
"ksql.enable.metastore.backup" : "false",
"ksql.streams.commit.interval.ms" : "2000",
"ksql.streams.auto.commit.interval.ms" : "0",
"ksql.streams.topology.optimization" : "all",
"ksql.query.retry.backoff.max.ms" : "900000",
"ksql.streams.num.stream.threads" : "4",
"ksql.timestamp.throw.on.invalid" : "false",
"ksql.metrics.tags.custom" : "",
"ksql.udfs.enabled" : "true",
"ksql.udf.enable.security.manager" : "true",
"ksql.connect.worker.config" : "",
"ksql.udf.collect.metrics" : "false",
"ksql.persistent.prefix" : "query_",
"ksql.metastore.backup.location" : "",
"ksql.error.classifier.regex" : "",
"ksql.suppress.buffer.size.bytes" : "-1"
}
}
Loading

0 comments on commit 626965e

Please sign in to comment.