Skip to content

Commit

Permalink
(fix apache#34038) Use CivilTimeEncoder to encode Time values in Avro…
Browse files Browse the repository at this point in the history
…GenericRecordToStorageApiProto (apache#34059)

* (fix apache#34038) Use CivilTimeEncoder to encode Time values in AvroGenericRecordToStorageApiProto

* Update CHANGES

* Don't throw away precision
  • Loading branch information
clairemcginty authored Feb 28, 2025
1 parent 9d92dd7 commit 6fbdaa2
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
## Bugfixes

* (Python) Fixed occasional pipeline stuckness that was affecting Python 3.11 users ([#33966](https://github.com/apache/beam/issues/33966)).
* (Java) Fixed TIME field encodings for BigQuery Storage API writes on GenericRecords ([#34059](https://github.com/apache/beam/pull/34059)).

## Security Fixes
* Fixed (CVE-YYYY-NNNN)[https://www.cve.org/CVERecord?id=CVE-YYYY-NNNN] (Java/Python/Go) ([#X](https://github.com/apache/beam/issues/X)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -174,19 +175,21 @@ static Integer convertDate(Object value) {

static Long convertTime(Object value, boolean micros) {
if (value instanceof org.joda.time.LocalTime) {
return 1_000L * (long) ((org.joda.time.LocalTime) value).getMillisOfDay();
return CivilTimeEncoder.encodePacked64TimeMicros((org.joda.time.LocalTime) value);
} else if (value instanceof java.time.LocalTime) {
return java.util.concurrent.TimeUnit.NANOSECONDS.toMicros(
((java.time.LocalTime) value).toNanoOfDay());
return CivilTimeEncoder.encodePacked64TimeMicros((java.time.LocalTime) value);
} else {
if (micros) {
Preconditions.checkArgument(
value instanceof Long, "Expecting a value as Long type (time).");
return (Long) value;
return CivilTimeEncoder.encodePacked64TimeMicros(
java.time.LocalTime.ofNanoOfDay((TimeUnit.MICROSECONDS.toNanos((long) value))));
} else {
Preconditions.checkArgument(
value instanceof Integer, "Expecting a value as Integer type (time).");
return 1_000L * (Integer) value;
return CivilTimeEncoder.encodePacked64TimeMicros(
java.time.LocalTime.ofNanoOfDay(
(TimeUnit.MILLISECONDS).toNanos(((Integer) value).longValue())));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.Conversions;
import org.apache.avro.LogicalTypes;
Expand Down Expand Up @@ -400,13 +401,14 @@ enum TestEnum {
.set("uuidValue", uuid.toString())
.build();

org.joda.time.LocalTime localTime = org.joda.time.LocalTime.fromMillisOfDay(42_000L);
jodaTimeLogicalTypesRecord =
new GenericRecordBuilder(LOGICAL_TYPES_SCHEMA)
.set("numericValue", numeric)
.set("bigNumericValue", bigNumeric)
.set("dateValue", new org.joda.time.LocalDate(1970, 1, 1).plusDays(42))
.set("timeMicrosValue", org.joda.time.LocalTime.fromMillisOfDay(42_000L))
.set("timeMillisValue", org.joda.time.LocalTime.fromMillisOfDay(42_000L))
.set("timeMicrosValue", localTime)
.set("timeMillisValue", localTime)
.set("timestampMicrosValue", org.joda.time.Instant.ofEpochSecond(42L))
.set("timestampMillisValue", org.joda.time.Instant.ofEpochSecond(42L))
.set(
Expand All @@ -423,8 +425,14 @@ enum TestEnum {
.set("numericValue", numeric)
.set("bigNumericValue", bigNumeric)
.set("dateValue", java.time.LocalDate.ofEpochDay(42L))
.set("timeMicrosValue", java.time.LocalTime.ofSecondOfDay(42L))
.set("timeMillisValue", java.time.LocalTime.ofSecondOfDay(42L))
.set(
"timeMicrosValue",
java.time.LocalTime.ofNanoOfDay(
TimeUnit.MILLISECONDS.toNanos(localTime.getMillisOfDay())))
.set(
"timeMillisValue",
java.time.LocalTime.ofNanoOfDay(
TimeUnit.MILLISECONDS.toNanos(localTime.getMillisOfDay())))
.set("timestampMicrosValue", java.time.Instant.ofEpochSecond(42L))
.set("timestampMillisValue", java.time.Instant.ofEpochSecond(42L))
.set(
Expand Down Expand Up @@ -456,8 +464,8 @@ enum TestEnum {
.put("numericvalue", numericBytes)
.put("bignumericvalue", bigNumericBytes)
.put("datevalue", 42)
.put("timemicrosvalue", 42_000_000L)
.put("timemillisvalue", 42_000_000L)
.put("timemicrosvalue", CivilTimeEncoder.encodePacked64TimeMicros(localTime))
.put("timemillisvalue", CivilTimeEncoder.encodePacked64TimeMicros(localTime))
.put("timestampmicrosvalue", 42_000_000L)
.put("timestampmillisvalue", 42_000_000L)
.put("localtimestampmicrosvalue", 42_000_000L)
Expand Down

0 comments on commit 6fbdaa2

Please sign in to comment.