Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stdlib): added timezone support to hourSelection function #4757

Merged
merged 3 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions internal/date/date.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/internal/zoneinfo"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
)

Expand All @@ -25,3 +27,41 @@ func GetTimeInLocation(t values.Time, location string, offset values.Duration) (
}
return values.NewTime(t), nil
}

func GetLocationFromObjArgs(args values.Object) (string, values.Duration, error) {
a := interpreter.NewArguments(args)
return GetLocationFromFluxArgs(a)
}

func GetLocationFromFluxArgs(args interpreter.Arguments) (string, values.Duration, error) {
location, err := args.GetRequiredObject("location")
if err != nil {
return "UTC", values.ConvertDurationNsecs(0), err
}
return GetLocation(location)
}

func GetLocation(location values.Object) (string, values.Duration, error) {
var (
name, offset values.Value
ok bool
)

name, ok = location.Get("zone")
if !ok {
return "UTC", values.ConvertDurationNsecs(0), errors.New(codes.Invalid, "zone property missing from location record")
} else if got := name.Type().Nature(); got != semantic.String {
return "UTC", values.ConvertDurationNsecs(0), errors.Newf(codes.Invalid, "zone property for location must be of type %s, got %s", semantic.String, got)
}

if offset, ok = location.Get("offset"); ok {
if got := offset.Type().Nature(); got != semantic.Duration {
return "UTC", values.ConvertDurationNsecs(0), errors.Newf(codes.Invalid, "offset property for location must be of type %s, got %s", semantic.Duration, got)
}
}

if name.IsNull() {
return "UTC", offset.Duration(), nil
}
return name.Str(), offset.Duration(), nil
}
4 changes: 2 additions & 2 deletions interpreter/interpreter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -883,8 +883,8 @@ func TestStack(t *testing.T) {
FunctionName: "window",
Location: ast.SourceLocation{
File: "universe/universe.flux",
Start: ast.Position{Line: 3696, Column: 12},
End: ast.Position{Line: 3696, Column: 51},
Start: ast.Position{Line: 3716, Column: 12},
End: ast.Position{Line: 3716, Column: 51},
Source: `window(every: inf, timeColumn: timeDst)`,
},
},
Expand Down
4 changes: 2 additions & 2 deletions libflux/go/libflux/buildinfo.gen.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/histogram_test.flux": "a468e662ecf1adc1d9d035ff041a503cd08efe7354f096a88da12ff7edc347ef",
"stdlib/universe/holt_winters_panic_test.flux": "c7681aadb9e4584b0002e47782af18c0c70734234c24adc0a631686cc3e32cb2",
"stdlib/universe/holt_winters_test.flux": "47a236ef553000fb36cdf2756d32a9a923a1b478a974c1369d08d8c6ba3535f9",
"stdlib/universe/hour_selection_test.flux": "4713a9f12e529a237f9a61672577c0d09339c225658052cfc5d889a2c4bfefb3",
"stdlib/universe/hour_selection_test.flux": "b2eecf84519685941ce8fd7f873b4d85d5041c6aa5673a587f2db8cdc9c26ae3",
"stdlib/universe/increase_test.flux": "dc02027842468c99c3ff0dadadde6050a2e38c7ea4bd70c7cf0a7eb10e3f93d7",
"stdlib/universe/integral_columns_test.flux": "485b0319d5bbbd781cf6dbaa0cfcc56ae8d7b25ee9c02815d4b61fcfece993e9",
"stdlib/universe/integral_interpolate_test.flux": "adebcba18f65e119dd3ccb0143b4de9593628765686d8932e679e85f007e786f",
Expand Down Expand Up @@ -597,7 +597,7 @@ var sourceHashes = map[string]string{
"stdlib/universe/union_heterogeneous_test.flux": "5158d9efdf88ec10945e84f72cb7ed9669b25f967b7b4f778d2456d52ecd34da",
"stdlib/universe/union_test.flux": "8529f2f609d9876f975ad648dec24eb60b569e8ac1c38e4f980e2234e68149a9",
"stdlib/universe/unique_test.flux": "02180f651906f48cc57cc777d443a64bd2155394ac0ade68888f47f5900e5408",
"stdlib/universe/universe.flux": "49bf9e5b082b45dd2deec2a3281b24a068bc340797b7d466bc2a048a88d13cb6",
"stdlib/universe/universe.flux": "c8a3bbd18143113965ca29f94e8ff86c68eea21427fb1e971b4b8334ff2f8802",
"stdlib/universe/universe_truncateTimeColumn_test.flux": "7ca30c57336b22ae430a082d33589fa919f4558c3ed6dbe37ecde367d602435c",
"stdlib/universe/window_aggregate_test.flux": "e2dba18647b2a180cafe81dd00a43077eb489421d229785e63b270eb79ddb2b1",
"stdlib/universe/window_default_start_align_test.flux": "162f9d452fed411edfdcb11ef1b8ae0c565d87bbc351596d277c7db55fe67e8f",
Expand Down
56 changes: 10 additions & 46 deletions stdlib/date/date.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/date"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/interpreter"
"github.com/influxdata/flux/interval"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/semantic"
"github.com/influxdata/flux/values"
)

Expand All @@ -39,7 +37,7 @@ func init() {
if err != nil {
return nil, err
}
location, offset, err := getLocationFromArgs(args)
location, offset, err := date.GetLocationFromObjArgs(args)
if err != nil {
return nil, err
}
Expand All @@ -58,7 +56,7 @@ func init() {
if err != nil {
return nil, err
}
location, offset, err := getLocationFromArgs(args)
location, offset, err := date.GetLocationFromObjArgs(args)
if err != nil {
return nil, err
}
Expand All @@ -77,7 +75,7 @@ func init() {
if err != nil {
return nil, err
}
location, offset, err := getLocationFromArgs(args)
location, offset, err := date.GetLocationFromObjArgs(args)
if err != nil {
return nil, err
}
Expand All @@ -96,7 +94,7 @@ func init() {
if err != nil {
return nil, err
}
location, offset, err := getLocationFromArgs(args)
location, offset, err := date.GetLocationFromObjArgs(args)
if err != nil {
return nil, err
}
Expand All @@ -115,7 +113,7 @@ func init() {
if err != nil {
return nil, err
}
location, offset, err := getLocationFromArgs(args)
location, offset, err := date.GetLocationFromObjArgs(args)
if err != nil {
return nil, err
}
Expand All @@ -134,7 +132,7 @@ func init() {
if err != nil {
return nil, err
}
location, offset, err := getLocationFromArgs(args)
location, offset, err := date.GetLocationFromObjArgs(args)
if err != nil {
return nil, err
}
Expand All @@ -153,7 +151,7 @@ func init() {
if err != nil {
return nil, err
}
location, offset, err := getLocationFromArgs(args)
location, offset, err := date.GetLocationFromObjArgs(args)
if err != nil {
return nil, err
}
Expand All @@ -172,7 +170,7 @@ func init() {
if err != nil {
return nil, err
}
location, offset, err := getLocationFromArgs(args)
location, offset, err := date.GetLocationFromObjArgs(args)
if err != nil {
return nil, err
}
Expand All @@ -192,7 +190,7 @@ func init() {
if err != nil {
return nil, err
}
location, offset, err := getLocationFromArgs(args)
location, offset, err := date.GetLocationFromObjArgs(args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -262,7 +260,7 @@ func init() {
if err != nil {
return nil, err
}
location, _, err := getLocationFromArgs(args)
location, _, err := date.GetLocationFromObjArgs(args)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -307,40 +305,6 @@ func getTime(args values.Object) (values.Value, error) {
return tArg, nil
}

func getLocationFromArgs(args values.Object) (string, values.Duration, error) {
a := interpreter.NewArguments(args)
location, err := a.GetRequiredObject("location")
if err != nil {
return "UTC", values.ConvertDurationNsecs(0), err
}
return getLocation(location)
}

func getLocation(location values.Object) (string, values.Duration, error) {
var (
name, offset values.Value
ok bool
)

name, ok = location.Get("zone")
if !ok {
return "UTC", values.ConvertDurationNsecs(0), errors.New(codes.Invalid, "zone property missing from location record")
} else if got := name.Type().Nature(); got != semantic.String {
return "UTC", values.ConvertDurationNsecs(0), errors.Newf(codes.Invalid, "zone property for location must be of type %s, got %s", semantic.String, got)
}

if offset, ok = location.Get("offset"); ok {
if got := offset.Type().Nature(); got != semantic.Duration {
return "UTC", values.ConvertDurationNsecs(0), errors.Newf(codes.Invalid, "offset property for location must be of type %s, got %s", semantic.Duration, got)
}
}

if name.IsNull() {
return "UTC", offset.Duration(), nil
}
return name.Str(), offset.Duration(), nil
}

func getTimeableTime(ctx context.Context, args values.Object) (values.Time, error) {
var tm values.Time
t, err := getTime(args)
Expand Down
3 changes: 2 additions & 1 deletion stdlib/date/durations.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/date"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/internal/function"
"github.com/influxdata/flux/internal/zoneinfo"
Expand Down Expand Up @@ -63,7 +64,7 @@ func addDuration(ctx context.Context, t values.Value, d values.Duration, scale i
return nil, err
}

name, offset, err := getLocation(loc)
name, offset, err := date.GetLocation(loc)
if err != nil {
return nil, err
}
Expand Down
59 changes: 41 additions & 18 deletions stdlib/universe/hour_selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,25 @@ import (
"github.com/influxdata/flux"
"github.com/influxdata/flux/codes"
"github.com/influxdata/flux/execute"
"github.com/influxdata/flux/internal/date"
"github.com/influxdata/flux/internal/errors"
"github.com/influxdata/flux/plan"
"github.com/influxdata/flux/runtime"
"github.com/influxdata/flux/values"
)

const HourSelectionKind = "hourSelection"
const HourSelectionKind = "_hourSelection"

type HourSelectionOpSpec struct {
Start int64 `json:"start"`
Stop int64 `json:"stop"`
TimeColumn string `json:"timeColumn"`
Start int64 `json:"start"`
Stop int64 `json:"stop"`
Location string `json:"location"`
Offset values.Duration `json:"offset"`
TimeColumn string `json:"timeColumn"`
}

func init() {
hourSelectionSignature := runtime.MustLookupBuiltinType("universe", "hourSelection")
hourSelectionSignature := runtime.MustLookupBuiltinType("universe", "_hourSelection")

runtime.RegisterPackageValue("universe", HourSelectionKind, flux.MustValue(flux.FunctionValue(HourSelectionKind, createHourSelectionOpSpec, hourSelectionSignature)))
flux.RegisterOpSpec(HourSelectionKind, newHourSelectionOp)
Expand All @@ -45,6 +49,13 @@ func createHourSelectionOpSpec(args flux.Arguments, a *flux.Administration) (flu
}
spec.Stop = stop

location, offset, err := date.GetLocationFromFluxArgs(args)
if err != nil {
return nil, err
}
spec.Location = location
spec.Offset = offset

if label, ok, err := args.GetString("timeColumn"); err != nil {
return nil, err
} else if ok {
Expand All @@ -66,9 +77,11 @@ func (s *HourSelectionOpSpec) Kind() flux.OperationKind {

type HourSelectionProcedureSpec struct {
plan.DefaultCost
Start int64 `json:"start"`
Stop int64 `json:"stop"`
TimeColumn string `json:"timeColumn"`
Start int64 `json:"start"`
Stop int64 `json:"stop"`
Location string `json:"location"`
Offset values.Duration `json:"offset"`
TimeColumn string `json:"timeColumn"`
}

func newHourSelectionProcedure(qs flux.OperationSpec, pa plan.Administration) (plan.ProcedureSpec, error) {
Expand All @@ -80,6 +93,8 @@ func newHourSelectionProcedure(qs flux.OperationSpec, pa plan.Administration) (p
return &HourSelectionProcedureSpec{
Start: spec.Start,
Stop: spec.Stop,
Location: spec.Location,
Offset: spec.Offset,
TimeColumn: spec.TimeColumn,
}, nil
}
Expand Down Expand Up @@ -116,18 +131,22 @@ type hourSelectionTransformation struct {
d execute.Dataset
cache execute.TableBuilderCache

start int64
stop int64
timeCol string
start int64
stop int64
location string
offset values.Duration
timeCol string
}

func NewHourSelectionTransformation(d execute.Dataset, cache execute.TableBuilderCache, spec *HourSelectionProcedureSpec) *hourSelectionTransformation {
return &hourSelectionTransformation{
d: d,
cache: cache,
start: spec.Start,
stop: spec.Stop,
timeCol: spec.TimeColumn,
d: d,
cache: cache,
start: spec.Start,
stop: spec.Stop,
location: spec.Location,
offset: spec.Offset,
timeCol: spec.TimeColumn,
}
}

Expand Down Expand Up @@ -162,8 +181,12 @@ func (t *hourSelectionTransformation) Process(id execute.DatasetID, tbl flux.Tab
if nullCheck := cr.Times(colIdx); nullCheck.IsNull(i) {
continue
}
curr := execute.Time(cr.Times(colIdx).Value(i)).Time().Hour()
if (int64(curr) >= t.start && int64(curr) <= t.stop) || (t.start > t.stop && (int64(curr) >= t.start || int64(curr) <= t.stop)) {
lTime, err := date.GetTimeInLocation(execute.Time(cr.Times(colIdx).Value(i)), t.location, t.offset)
if err != nil {
return nil
}
lHour := int64(lTime.Time().Time().Hour())
if (lHour >= t.start && lHour <= t.stop) || (t.start > t.stop && (lHour >= t.start || lHour <= t.stop)) {
for k := range cr.Cols() {
if err := builder.AppendValue(k, execute.ValueForRow(cr, i, k)); err != nil {
return err
Expand Down
25 changes: 25 additions & 0 deletions stdlib/universe/hour_selection_test.flux
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package universe_test

import "testing"
import "csv"
import "timezone"

option now = () => 2030-01-01T00:00:00Z

Expand Down Expand Up @@ -65,3 +66,27 @@ testcase hour_selection_overnight_range {

testing.diff(got: got, want: want)
}

testcase hour_selection_location {
option location = timezone.location(name: "America/Los_Angeles")

got =
csv.from(csv: inData)
|> range(start: 2018-12-01T00:00:00Z)
|> hourSelection(start: 9, stop: 10, timeColumn: "_time")

want =
csv.from(
csv:
"
#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,string,string,dateTime:RFC3339,unsignedLong
#group,false,false,true,true,true,true,false,false
#default,_result,,,,,,,
,result,table,_start,_stop,_measurement,_field,_time,_value
,,0,2018-12-01T00:00:00Z,2030-01-01T00:00:00Z,Sgf,DlXwgrw,2018-12-18T17:11:25Z,33
,,0,2018-12-01T00:00:00Z,2030-01-01T00:00:00Z,Sgf,DlXwgrw,2018-12-19T18:11:35Z,63
",
)

testing.diff(got: got, want: want)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good. I checked that 2018-12-19T18:11:35Z is within the 9-10 range in LA time, and it turns out that is 10:11 in LA time. But the semantics of hourSelection is that the stop hour is inclusive. This was a surprise to me! So this looks good.

Loading