diff --git a/README.md b/README.md index 731fbbcd7..eefd530df 100644 --- a/README.md +++ b/README.md @@ -243,7 +243,7 @@ See the [wiki](https://github.com/olivere/elastic/wiki) for more details. - [x] Nested - [x] Range - [x] Reverse Nested - - [ ] Sampler + - [x] Sampler - [x] Significant Terms - [x] Terms - Pipeline Aggregations diff --git a/search_aggs.go b/search_aggs.go index 6b0f2a01f..8e13a539a 100644 --- a/search_aggs.go +++ b/search_aggs.go @@ -323,6 +323,21 @@ func (a Aggregations) SignificantTerms(name string) (*AggregationBucketSignifica return nil, false } +// Sampler returns sampler aggregation results. +// See: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-sampler-aggregation.html +func (a Aggregations) Sampler(name string) (*AggregationSingleBucket, bool) { + if raw, found := a[name]; found { + agg := new(AggregationSingleBucket) + if raw == nil { + return agg, true + } + if err := json.Unmarshal(*raw, agg); err == nil { + return agg, true + } + } + return nil, false +} + // Range returns range aggregation results. // See: http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-aggregations-bucket-range-aggregation.html func (a Aggregations) Range(name string) (*AggregationBucketRangeItems, bool) { diff --git a/search_aggs_bucket_date_histogram.go b/search_aggs_bucket_date_histogram.go index 8399bfd6b..231c51ef8 100644 --- a/search_aggs_bucket_date_histogram.go +++ b/search_aggs_bucket_date_histogram.go @@ -10,6 +10,7 @@ package elastic type DateHistogramAggregation struct { field string script *Script + missing interface{} subAggregations map[string]Aggregation meta map[string]interface{} @@ -42,6 +43,12 @@ func (a *DateHistogramAggregation) Script(script *Script) *DateHistogramAggregat return a } +// Missing configures the value to use when documents miss a value. +func (a *DateHistogramAggregation) Missing(missing interface{}) *DateHistogramAggregation { + a.missing = missing + return a +} + func (a *DateHistogramAggregation) SubAggregation(name string, subAggregation Aggregation) *DateHistogramAggregation { a.subAggregations[name] = subAggregation return a @@ -219,6 +226,9 @@ func (a *DateHistogramAggregation) Source() (interface{}, error) { } opts["script"] = src } + if a.missing != nil { + opts["missing"] = a.missing + } opts["interval"] = a.interval if a.minDocCount != nil { diff --git a/search_aggs_bucket_date_histogram_test.go b/search_aggs_bucket_date_histogram_test.go index a400a02ef..3c826ce9e 100644 --- a/search_aggs_bucket_date_histogram_test.go +++ b/search_aggs_bucket_date_histogram_test.go @@ -30,3 +30,20 @@ func TestDateHistogramAggregation(t *testing.T) { t.Errorf("expected\n%s\n,got:\n%s", expected, got) } } + +func TestDateHistogramAggregationWithMissing(t *testing.T) { + agg := NewDateHistogramAggregation().Field("date").Interval("year").Missing("1900") + src, err := agg.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"date_histogram":{"field":"date","interval":"year","missing":"1900"}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} diff --git a/search_aggs_bucket_histogram.go b/search_aggs_bucket_histogram.go index 5cd4d8313..7821adbc0 100644 --- a/search_aggs_bucket_histogram.go +++ b/search_aggs_bucket_histogram.go @@ -12,6 +12,7 @@ package elastic type HistogramAggregation struct { field string script *Script + missing interface{} subAggregations map[string]Aggregation meta map[string]interface{} @@ -40,6 +41,12 @@ func (a *HistogramAggregation) Script(script *Script) *HistogramAggregation { return a } +// Missing configures the value to use when documents miss a value. +func (a *HistogramAggregation) Missing(missing interface{}) *HistogramAggregation { + a.missing = missing + return a +} + func (a *HistogramAggregation) SubAggregation(name string, subAggregation Aggregation) *HistogramAggregation { a.subAggregations[name] = subAggregation return a @@ -193,6 +200,9 @@ func (a *HistogramAggregation) Source() (interface{}, error) { } opts["script"] = src } + if a.missing != nil { + opts["missing"] = a.missing + } opts["interval"] = a.interval if a.order != "" { diff --git a/search_aggs_bucket_histogram_test.go b/search_aggs_bucket_histogram_test.go index d4d2da370..6a5d5fb92 100644 --- a/search_aggs_bucket_histogram_test.go +++ b/search_aggs_bucket_histogram_test.go @@ -42,3 +42,20 @@ func TestHistogramAggregationWithMetaData(t *testing.T) { t.Errorf("expected\n%s\n,got:\n%s", expected, got) } } + +func TestHistogramAggregationWithMissing(t *testing.T) { + agg := NewHistogramAggregation().Field("price").Interval(50).Missing("n/a") + src, err := agg.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"histogram":{"field":"price","interval":50,"missing":"n/a"}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} diff --git a/search_aggs_bucket_range.go b/search_aggs_bucket_range.go index b97ea2f35..bc017c60f 100644 --- a/search_aggs_bucket_range.go +++ b/search_aggs_bucket_range.go @@ -18,6 +18,7 @@ import ( type RangeAggregation struct { field string script *Script + missing interface{} subAggregations map[string]Aggregation meta map[string]interface{} keyed *bool @@ -48,6 +49,12 @@ func (a *RangeAggregation) Script(script *Script) *RangeAggregation { return a } +// Missing configures the value to use when documents miss a value. +func (a *RangeAggregation) Missing(missing interface{}) *RangeAggregation { + a.missing = missing + return a +} + func (a *RangeAggregation) SubAggregation(name string, subAggregation Aggregation) *RangeAggregation { a.subAggregations[name] = subAggregation return a @@ -163,6 +170,9 @@ func (a *RangeAggregation) Source() (interface{}, error) { } opts["script"] = src } + if a.missing != nil { + opts["missing"] = a.missing + } if a.keyed != nil { opts["keyed"] = *a.keyed diff --git a/search_aggs_bucket_range_test.go b/search_aggs_bucket_range_test.go index b6b7b91b8..f0fd5f5fd 100644 --- a/search_aggs_bucket_range_test.go +++ b/search_aggs_bucket_range_test.go @@ -134,3 +134,23 @@ func TestRangeAggregationWithMetaData(t *testing.T) { t.Errorf("expected\n%s\n,got:\n%s", expected, got) } } + +func TestRangeAggregationWithMissing(t *testing.T) { + agg := NewRangeAggregation().Field("price").Missing(0) + agg = agg.AddRange(nil, 50) + agg = agg.AddRange(50, 100) + agg = agg.AddRange(100, nil) + src, err := agg.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"range":{"field":"price","missing":0,"ranges":[{"to":50},{"from":50,"to":100},{"from":100}]}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} diff --git a/search_aggs_bucket_sampler.go b/search_aggs_bucket_sampler.go new file mode 100644 index 000000000..9a6df15ec --- /dev/null +++ b/search_aggs_bucket_sampler.go @@ -0,0 +1,145 @@ +// Copyright 2012-2016 Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +// SamplerAggregation is a filtering aggregation used to limit any +// sub aggregations' processing to a sample of the top-scoring documents. +// Optionally, diversity settings can be used to limit the number of matches +// that share a common value such as an "author". +// See: https://www.elastic.co/guide/en/elasticsearch/reference/2.x/search-aggregations-bucket-sampler-aggregation.html +type SamplerAggregation struct { + field string + script *Script + missing interface{} + subAggregations map[string]Aggregation + meta map[string]interface{} + + shardSize int + maxDocsPerValue int + executionHint string +} + +func NewSamplerAggregation() *SamplerAggregation { + return &SamplerAggregation{ + shardSize: -1, + maxDocsPerValue: -1, + subAggregations: make(map[string]Aggregation), + } +} + +func (a *SamplerAggregation) Field(field string) *SamplerAggregation { + a.field = field + return a +} + +func (a *SamplerAggregation) Script(script *Script) *SamplerAggregation { + a.script = script + return a +} + +// Missing configures the value to use when documents miss a value. +func (a *SamplerAggregation) Missing(missing interface{}) *SamplerAggregation { + a.missing = missing + return a +} + +func (a *SamplerAggregation) SubAggregation(name string, subAggregation Aggregation) *SamplerAggregation { + a.subAggregations[name] = subAggregation + return a +} + +// Meta sets the meta data to be included in the aggregation response. +func (a *SamplerAggregation) Meta(metaData map[string]interface{}) *SamplerAggregation { + a.meta = metaData + return a +} + +// ShardSize sets the maximum number of docs returned from each shard. +func (a *SamplerAggregation) ShardSize(shardSize int) *SamplerAggregation { + a.shardSize = shardSize + return a +} + +func (a *SamplerAggregation) MaxDocsPerValue(maxDocsPerValue int) *SamplerAggregation { + a.maxDocsPerValue = maxDocsPerValue + return a +} + +func (a *SamplerAggregation) ExecutionHint(hint string) *SamplerAggregation { + a.executionHint = hint + return a +} + +func (a *SamplerAggregation) Source() (interface{}, error) { + // Example: + // { + // "aggs" : { + // "sample" : { + // "sampler" : { + // "field" : "user.id", + // "shard_size" : 200 + // }, + // "aggs": { + // "keywords": { + // "significant_terms": { + // "field": "text" + // } + // } + // } + // } + // } + // } + // + // This method returns only the { "sampler" : { ... } } part. + + source := make(map[string]interface{}) + opts := make(map[string]interface{}) + source["sampler"] = opts + + // ValuesSourceAggregationBuilder + if a.field != "" { + opts["field"] = a.field + } + if a.script != nil { + src, err := a.script.Source() + if err != nil { + return nil, err + } + opts["script"] = src + } + if a.missing != nil { + opts["missing"] = a.missing + } + + if a.shardSize >= 0 { + opts["shard_size"] = a.shardSize + } + if a.maxDocsPerValue >= 0 { + opts["max_docs_per_value"] = a.maxDocsPerValue + } + if a.executionHint != "" { + opts["execution_hint"] = a.executionHint + } + + // AggregationBuilder (SubAggregations) + if len(a.subAggregations) > 0 { + aggsMap := make(map[string]interface{}) + source["aggregations"] = aggsMap + for name, aggregate := range a.subAggregations { + src, err := aggregate.Source() + if err != nil { + return nil, err + } + aggsMap[name] = src + } + } + + // Add Meta data if available + if len(a.meta) > 0 { + source["meta"] = a.meta + } + + return source, nil +} diff --git a/search_aggs_bucket_sampler_test.go b/search_aggs_bucket_sampler_test.go new file mode 100644 index 000000000..da4ca5534 --- /dev/null +++ b/search_aggs_bucket_sampler_test.go @@ -0,0 +1,52 @@ +// Copyright 2012-2016 Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "encoding/json" + "testing" +) + +func TestSamplerAggregation(t *testing.T) { + keywordsAgg := NewSignificantTermsAggregation().Field("text") + agg := NewSamplerAggregation(). + Field("user.id"). + ShardSize(200). + SubAggregation("keywords", keywordsAgg) + src, err := agg.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"aggregations":{"keywords":{"significant_terms":{"field":"text"}}},"sampler":{"field":"user.id","shard_size":200}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} + +func TestSamplerAggregationWithMissing(t *testing.T) { + keywordsAgg := NewSignificantTermsAggregation().Field("text") + agg := NewSamplerAggregation(). + Field("user.id"). + Missing("n/a"). + SubAggregation("keywords", keywordsAgg) + src, err := agg.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"aggregations":{"keywords":{"significant_terms":{"field":"text"}}},"sampler":{"field":"user.id","missing":"n/a"}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} diff --git a/search_aggs_bucket_terms.go b/search_aggs_bucket_terms.go index 1f921eb1c..2d3c0d1ad 100644 --- a/search_aggs_bucket_terms.go +++ b/search_aggs_bucket_terms.go @@ -10,6 +10,7 @@ package elastic type TermsAggregation struct { field string script *Script + missing interface{} subAggregations map[string]Aggregation meta map[string]interface{} @@ -50,6 +51,12 @@ func (a *TermsAggregation) Script(script *Script) *TermsAggregation { return a } +// Missing configures the value to use when documents miss a value. +func (a *TermsAggregation) Missing(missing interface{}) *TermsAggregation { + a.missing = missing + return a +} + func (a *TermsAggregation) SubAggregation(name string, subAggregation Aggregation) *TermsAggregation { a.subAggregations[name] = subAggregation return a @@ -244,6 +251,9 @@ func (a *TermsAggregation) Source() (interface{}, error) { } opts["script"] = src } + if a.missing != nil { + opts["missing"] = a.missing + } // TermsBuilder if a.size != nil && *a.size >= 0 { diff --git a/search_aggs_bucket_terms_test.go b/search_aggs_bucket_terms_test.go index 5dbfa7218..e5f979333 100644 --- a/search_aggs_bucket_terms_test.go +++ b/search_aggs_bucket_terms_test.go @@ -85,3 +85,20 @@ func TestTermsAggregationWithMetaData(t *testing.T) { t.Errorf("expected\n%s\n,got:\n%s", expected, got) } } + +func TestTermsAggregationWithMissing(t *testing.T) { + agg := NewTermsAggregation().Field("gender").Size(10).Missing("n/a") + src, err := agg.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"terms":{"field":"gender","missing":"n/a","size":10}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} diff --git a/search_aggs_test.go b/search_aggs_test.go index 99cd91d45..ef6ec2112 100644 --- a/search_aggs_test.go +++ b/search_aggs_test.go @@ -86,6 +86,7 @@ func TestAggs(t *testing.T) { percentileRanksRetweetsAgg := NewPercentileRanksAggregation().Field("retweets").Values(25, 50, 75) cardinalityAgg := NewCardinalityAggregation().Field("user") significantTermsAgg := NewSignificantTermsAggregation().Field("message") + samplerAgg := NewSamplerAggregation().Field("user").SubAggregation("tagged_with", NewTermsAggregation().Field("tags")) retweetsRangeAgg := NewRangeAggregation().Field("retweets").Lt(10).Between(10, 100).Gt(100) retweetsKeyedRangeAgg := NewRangeAggregation().Field("retweets").Keyed(true).Lt(10).Between(10, 100).Gt(100) dateRangeAgg := NewDateRangeAggregation().Field("created").Lt("2012-01-01").Between("2012-01-01", "2013-01-01").Gt("2013-01-01") @@ -119,6 +120,7 @@ func TestAggs(t *testing.T) { builder = builder.Aggregation("percentileRanksRetweets", percentileRanksRetweetsAgg) builder = builder.Aggregation("usersCardinality", cardinalityAgg) builder = builder.Aggregation("significantTerms", significantTermsAgg) + builder = builder.Aggregation("sample", samplerAgg) builder = builder.Aggregation("retweetsRange", retweetsRangeAgg) builder = builder.Aggregation("retweetsKeyedRange", retweetsKeyedRangeAgg) builder = builder.Aggregation("dateRange", dateRangeAgg) @@ -588,6 +590,25 @@ func TestAggs(t *testing.T) { t.Errorf("expected %v; got: %v", 0, len(stAggRes.Buckets)) } + // sampler + samplerAggRes, found := agg.Sampler("sample") + if !found { + t.Errorf("expected %v; got: %v", true, found) + } + if samplerAggRes == nil { + t.Fatalf("expected != nil; got: nil") + } + if samplerAggRes.DocCount != 2 { + t.Errorf("expected %v; got: %v", 2, samplerAggRes.DocCount) + } + sub, found := samplerAggRes.Aggregations["tagged_with"] + if !found { + t.Fatalf("expected sub aggregation %q", "tagged_with") + } + if sub == nil { + t.Fatalf("expected sub aggregation %q; got: %v", "tagged_with", sub) + } + // retweetsRange rangeAggRes, found := agg.Range("retweetsRange") if !found { @@ -2050,6 +2071,49 @@ func TestAggsBucketSignificantTerms(t *testing.T) { } } +func TestAggsBucketSampler(t *testing.T) { + s := `{ + "sample" : { + "doc_count": 1000, + "keywords": { + "doc_count": 1000, + "buckets" : [ + { + "key": "bend", + "doc_count": 58, + "score": 37.982536582524276, + "bg_count": 103 + } + ] + } + } +}` + + aggs := new(Aggregations) + err := json.Unmarshal([]byte(s), &aggs) + if err != nil { + t.Fatalf("expected no error decoding; got: %v", err) + } + + agg, found := aggs.Sampler("sample") + if !found { + t.Fatalf("expected aggregation to be found; got: %v", found) + } + if agg == nil { + t.Fatalf("expected aggregation != nil; got: %v", agg) + } + if agg.DocCount != 1000 { + t.Fatalf("expected aggregation DocCount != %d; got: %d", 1000, agg.DocCount) + } + sub, found := agg.Aggregations["keywords"] + if !found { + t.Fatal("expected sub aggregation %q", "keywords") + } + if sub == nil { + t.Fatalf("expected sub aggregation %q; got: %v", "keywords", sub) + } +} + func TestAggsBucketRange(t *testing.T) { s := `{ "price_ranges" : {