Skip to content

Commit

Permalink
[Improve][Connector-V2][Jdbc-Source] Support for Decimal types as spl…
Browse files Browse the repository at this point in the history
…ict keys (#4634)

* [Improve][Connector-V2][Jdbc-Source]Support Compatible Mysql bigint(20) used as a partition_column #4634

Co-authored-by: zhilinli <lzl15844876351@163.com>
  • Loading branch information
zhilinli123 and zhilinli authored Jun 7, 2023
1 parent 4d429ca commit d56bb1b
Show file tree
Hide file tree
Showing 10 changed files with 141 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.time.Duration;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -74,6 +75,10 @@ public TypedOptionBuilder<Integer> intType() {
public TypedOptionBuilder<Long> longType() {
return new TypedOptionBuilder<>(key, new TypeReference<Long>() {});
}
/** Defines that the value of the option should be of {@link BigDecimal} type. */
public TypedOptionBuilder<BigDecimal> bigDecimalType() {
return new TypedOptionBuilder<>(key, new TypeReference<BigDecimal>() {});
}

/** Defines that the value of the option should be of {@link Float} type. */
public TypedOptionBuilder<Float> floatType() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;

import java.math.BigDecimal;
import java.util.List;

@SuppressWarnings("checkstyle:MagicNumber")
Expand Down Expand Up @@ -122,14 +123,14 @@ public interface JdbcOptions {
.noDefaultValue()
.withDescription("partition column");

Option<Long> PARTITION_UPPER_BOUND =
Option<BigDecimal> PARTITION_UPPER_BOUND =
Options.key("partition_upper_bound")
.longType()
.bigDecimalType()
.noDefaultValue()
.withDescription("partition upper bound");
Option<Long> PARTITION_LOWER_BOUND =
Option<BigDecimal> PARTITION_LOWER_BOUND =
Options.key("partition_lower_bound")
.longType()
.bigDecimalType()
.noDefaultValue()
.withDescription("partition lower bound");
Option<Integer> PARTITION_NUM =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import lombok.Data;

import java.io.Serializable;
import java.math.BigDecimal;
import java.util.Optional;

@Data
Expand All @@ -33,8 +34,8 @@ public class JdbcSourceConfig implements Serializable {
private JdbcConnectionConfig jdbcConnectionConfig;
public String query;
private String partitionColumn;
private Long partitionUpperBound;
private Long partitionLowerBound;
private BigDecimal partitionUpperBound;
private BigDecimal partitionLowerBound;
private int fetchSize;
private Integer partitionNumber;

Expand All @@ -60,11 +61,11 @@ public Optional<String> getPartitionColumn() {
return Optional.ofNullable(partitionColumn);
}

public Optional<Long> getPartitionUpperBound() {
public Optional<BigDecimal> getPartitionUpperBound() {
return Optional.ofNullable(partitionUpperBound);
}

public Optional<Long> getPartitionLowerBound() {
public Optional<BigDecimal> getPartitionLowerBound() {
return Optional.ofNullable(partitionLowerBound);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.split;

import java.io.Serializable;
import java.math.BigDecimal;
import java.math.RoundingMode;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -39,8 +41,8 @@
*/
public class JdbcNumericBetweenParametersProvider implements JdbcParameterValuesProvider {

private final long minVal;
private final long maxVal;
private final BigDecimal minVal;
private final BigDecimal maxVal;

private long batchSize;
private int batchNum;
Expand All @@ -51,8 +53,8 @@ public class JdbcNumericBetweenParametersProvider implements JdbcParameterValues
* @param minVal the lower bound of the produced "from" values
* @param maxVal the upper bound of the produced "to" values
*/
public JdbcNumericBetweenParametersProvider(long minVal, long maxVal) {
checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal");
public JdbcNumericBetweenParametersProvider(BigDecimal minVal, BigDecimal maxVal) {
checkArgument(minVal.compareTo(maxVal) <= 0, "minVal must not be larger than maxVal");
this.minVal = minVal;
this.maxVal = maxVal;
}
Expand All @@ -64,8 +66,9 @@ public JdbcNumericBetweenParametersProvider(long minVal, long maxVal) {
* @param minVal the lower bound of the produced "from" values
* @param maxVal the upper bound of the produced "to" values
*/
public JdbcNumericBetweenParametersProvider(long fetchSize, long minVal, long maxVal) {
checkArgument(minVal <= maxVal, "minVal must not be larger than maxVal");
public JdbcNumericBetweenParametersProvider(
long fetchSize, BigDecimal minVal, BigDecimal maxVal) {
checkArgument(minVal.compareTo(maxVal) <= 0, "minVal must not be larger than maxVal");
this.minVal = minVal;
this.maxVal = maxVal;
ofBatchSize(fetchSize);
Expand All @@ -74,24 +77,33 @@ public JdbcNumericBetweenParametersProvider(long fetchSize, long minVal, long ma
public JdbcNumericBetweenParametersProvider ofBatchSize(long batchSize) {
checkArgument(batchSize > 0, "Batch size must be positive");

long maxElemCount = (maxVal - minVal) + 1;
if (batchSize > maxElemCount) {
batchSize = maxElemCount;
BigDecimal maxElemCount = (maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
if (BigDecimal.valueOf(batchSize).compareTo(maxElemCount) > 0) {
batchSize = maxElemCount.longValue();
}
this.batchSize = batchSize;
this.batchNum = new Double(Math.ceil((double) maxElemCount / batchSize)).intValue();
this.batchNum =
new Double(
Math.ceil(
(maxElemCount.divide(BigDecimal.valueOf(batchSize)))
.doubleValue()))
.intValue();
return this;
}

public JdbcNumericBetweenParametersProvider ofBatchNum(int batchNum) {
checkArgument(batchNum > 0, "Batch number must be positive");

long maxElemCount = (maxVal - minVal) + 1;
if (batchNum > maxElemCount) {
batchNum = (int) maxElemCount;
BigDecimal maxElemCount = (maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
if (BigDecimal.valueOf(batchNum).compareTo(maxElemCount) > 0) {
batchNum = maxElemCount.intValue();
}
this.batchNum = batchNum;
this.batchSize = new Double(Math.ceil((double) maxElemCount / batchNum)).longValue();
// For the presence of a decimal we take the integer up
this.batchSize =
(maxElemCount.divide(BigDecimal.valueOf(batchNum), 2, RoundingMode.HALF_UP))
.setScale(0, RoundingMode.CEILING)
.longValue();
return this;
}

Expand All @@ -101,15 +113,24 @@ public Serializable[][] getParameterValues() {
batchSize > 0,
"Batch size and batch number must be positive. Have you called `ofBatchSize` or `ofBatchNum`?");

long maxElemCount = (maxVal - minVal) + 1;
long bigBatchNum = maxElemCount - (batchSize - 1) * batchNum;
BigDecimal maxElemCount = (maxVal.subtract(minVal)).add(BigDecimal.valueOf(1));
BigDecimal bigBatchNum =
maxElemCount
.subtract(BigDecimal.valueOf(batchSize - 1))
.multiply(BigDecimal.valueOf(batchNum));

Serializable[][] parameters = new Serializable[batchNum][2];
long start = minVal;
BigDecimal start = minVal;
for (int i = 0; i < batchNum; i++) {
long end = start + batchSize - 1 - (i >= bigBatchNum ? 1 : 0);
parameters[i] = new Long[] {start, end};
start = end + 1;
BigDecimal end =
start.add(BigDecimal.valueOf(batchSize))
.subtract(BigDecimal.valueOf(1))
.subtract(
BigDecimal.valueOf(i).compareTo(bigBatchNum) >= 0
? BigDecimal.ONE
: BigDecimal.ZERO);
parameters[i] = new BigDecimal[] {start, end};
start = end.add(BigDecimal.valueOf(1));
}
return parameters;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.PluginType;
Expand All @@ -45,6 +46,7 @@
import lombok.extern.slf4j.Slf4j;

import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -135,8 +137,8 @@ public static Optional<PartitionParameter> createPartitionParameter(

static PartitionParameter createPartitionParameter(
JdbcSourceConfig config, String columnName, Connection connection) {
long max = Long.MAX_VALUE;
long min = Long.MIN_VALUE;
BigDecimal max = null;
BigDecimal min = null;
if (config.getPartitionLowerBound().isPresent()
&& config.getPartitionUpperBound().isPresent()) {
max = config.getPartitionUpperBound().get();
Expand All @@ -155,11 +157,11 @@ static PartitionParameter createPartitionParameter(
max =
config.getPartitionUpperBound().isPresent()
? config.getPartitionUpperBound().get()
: rs.getLong(1);
: rs.getBigDecimal(1);
min =
config.getPartitionLowerBound().isPresent()
? config.getPartitionLowerBound().get()
: rs.getLong(2);
: rs.getBigDecimal(2);
}
} catch (SQLException e) {
throw new PrepareFailException("jdbc", PluginType.SOURCE, e.toString());
Expand Down Expand Up @@ -200,7 +202,18 @@ static void validationPartitionColumn(String partitionColumn, SeaTunnelRowType r
}

private static boolean isNumericType(SeaTunnelDataType<?> type) {
return type.equals(BasicType.INT_TYPE) || type.equals(BasicType.LONG_TYPE);
int scale = 1;
if (type instanceof DecimalType) {
scale = ((DecimalType) type).getScale() == 0 ? 0 : ((DecimalType) type).getScale();
if (scale != 0) {
throw new JdbcConnectorException(
CommonErrorCode.ILLEGAL_ARGUMENT,
String.format(
"The current field is DecimalType containing decimals: %d Unable to support",
scale));
}
}
return type.equals(BasicType.INT_TYPE) || type.equals(BasicType.LONG_TYPE) || scale == 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@
import lombok.Data;

import java.io.Serializable;
import java.math.BigDecimal;

@Data
@AllArgsConstructor
public class PartitionParameter implements Serializable {

String partitionColumnName;
long minValue;
long maxValue;
BigDecimal minValue;
BigDecimal maxValue;
Integer partitionNumber;
}
Loading

0 comments on commit d56bb1b

Please sign in to comment.