diff --git a/CHANGES.md b/CHANGES.md index 4f9439efbec1..2025146d3044 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -84,6 +84,7 @@ ## Bugfixes * (Python) Fixed occasional pipeline stuckness that was affecting Python 3.11 users ([#33966](https://github.com/apache/beam/issues/33966)). +* (Java) Fixed TIME field encodings for BigQuery Storage API writes on GenericRecords ([#34059](https://github.com/apache/beam/pull/34059)). ## Security Fixes * Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)). diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java index d55501d3e583..3813b000a65b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProto.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.function.BiFunction; import java.util.function.Function; import java.util.stream.Collectors; @@ -174,19 +175,21 @@ static Integer convertDate(Object value) { static Long convertTime(Object value, boolean micros) { if (value instanceof org.joda.time.LocalTime) { - return 1_000L * (long) ((org.joda.time.LocalTime) value).getMillisOfDay(); + return CivilTimeEncoder.encodePacked64TimeMicros((org.joda.time.LocalTime) value); } else if (value instanceof java.time.LocalTime) { - return java.util.concurrent.TimeUnit.NANOSECONDS.toMicros( - ((java.time.LocalTime) value).toNanoOfDay()); + return CivilTimeEncoder.encodePacked64TimeMicros((java.time.LocalTime) value); } else { if (micros) { Preconditions.checkArgument( value instanceof Long, "Expecting a value as Long type (time)."); - return (Long) value; + return CivilTimeEncoder.encodePacked64TimeMicros( + java.time.LocalTime.ofNanoOfDay((TimeUnit.MICROSECONDS.toNanos((long) value)))); } else { Preconditions.checkArgument( value instanceof Integer, "Expecting a value as Integer type (time)."); - return 1_000L * (Integer) value; + return CivilTimeEncoder.encodePacked64TimeMicros( + java.time.LocalTime.ofNanoOfDay( + (TimeUnit.MILLISECONDS).toNanos(((Integer) value).longValue()))); } } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java index 371b867ffd5e..ecf49cd6d8bb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/AvroGenericRecordToStorageApiProtoTest.java @@ -40,6 +40,7 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import org.apache.avro.Conversions; import org.apache.avro.LogicalTypes; @@ -400,13 +401,14 @@ enum TestEnum { .set("uuidValue", uuid.toString()) .build(); + org.joda.time.LocalTime localTime = org.joda.time.LocalTime.fromMillisOfDay(42_000L); jodaTimeLogicalTypesRecord = new GenericRecordBuilder(LOGICAL_TYPES_SCHEMA) .set("numericValue", numeric) .set("bigNumericValue", bigNumeric) .set("dateValue", new org.joda.time.LocalDate(1970, 1, 1).plusDays(42)) - .set("timeMicrosValue", org.joda.time.LocalTime.fromMillisOfDay(42_000L)) - .set("timeMillisValue", org.joda.time.LocalTime.fromMillisOfDay(42_000L)) + .set("timeMicrosValue", localTime) + .set("timeMillisValue", localTime) .set("timestampMicrosValue", org.joda.time.Instant.ofEpochSecond(42L)) .set("timestampMillisValue", org.joda.time.Instant.ofEpochSecond(42L)) .set( @@ -423,8 +425,14 @@ enum TestEnum { .set("numericValue", numeric) .set("bigNumericValue", bigNumeric) .set("dateValue", java.time.LocalDate.ofEpochDay(42L)) - .set("timeMicrosValue", java.time.LocalTime.ofSecondOfDay(42L)) - .set("timeMillisValue", java.time.LocalTime.ofSecondOfDay(42L)) + .set( + "timeMicrosValue", + java.time.LocalTime.ofNanoOfDay( + TimeUnit.MILLISECONDS.toNanos(localTime.getMillisOfDay()))) + .set( + "timeMillisValue", + java.time.LocalTime.ofNanoOfDay( + TimeUnit.MILLISECONDS.toNanos(localTime.getMillisOfDay()))) .set("timestampMicrosValue", java.time.Instant.ofEpochSecond(42L)) .set("timestampMillisValue", java.time.Instant.ofEpochSecond(42L)) .set( @@ -456,8 +464,8 @@ enum TestEnum { .put("numericvalue", numericBytes) .put("bignumericvalue", bigNumericBytes) .put("datevalue", 42) - .put("timemicrosvalue", 42_000_000L) - .put("timemillisvalue", 42_000_000L) + .put("timemicrosvalue", CivilTimeEncoder.encodePacked64TimeMicros(localTime)) + .put("timemillisvalue", CivilTimeEncoder.encodePacked64TimeMicros(localTime)) .put("timestampmicrosvalue", 42_000_000L) .put("timestampmillisvalue", 42_000_000L) .put("localtimestampmicrosvalue", 42_000_000L)