Skip to content

Commit

Permalink
Add missing value and sampler aggregation
Browse files Browse the repository at this point in the history
All aggregations derived from ValuesSourceAggregationBuilder in Java
have a missing field that will be used when a document has a missing
value [1].

Also added the missing Sampler aggregation [2].

[1]
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-terms-aggregation.html#_missing_value_12
[2]
https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-sampler-aggregation.html
  • Loading branch information
olivere committed Feb 3, 2016
1 parent b51c3b3 commit aaba83e
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ See the [wiki](https://github.com/olivere/elastic/wiki) for more details.
- [x] Nested
- [x] Range
- [x] Reverse Nested
- [ ] Sampler
- [x] Sampler
- [x] Significant Terms
- [x] Terms
- Pipeline Aggregations
Expand Down
15 changes: 15 additions & 0 deletions search_aggs.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,21 @@ func (a Aggregations) SignificantTerms(name string) (*AggregationBucketSignifica
return nil, false
}

// Sampler returns sampler aggregation results.
// See: https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-sampler-aggregation.html
func (a Aggregations) Sampler(name string) (*AggregationSingleBucket, bool) {
if raw, found := a[name]; found {
agg := new(AggregationSingleBucket)
if raw == nil {
return agg, true
}
if err := json.Unmarshal(*raw, agg); err == nil {
return agg, true
}
}
return nil, false
}

// Range returns range aggregation results.
// See: http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/search-aggregations-bucket-range-aggregation.html
func (a Aggregations) Range(name string) (*AggregationBucketRangeItems, bool) {
Expand Down
10 changes: 10 additions & 0 deletions search_aggs_bucket_date_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package elastic
type DateHistogramAggregation struct {
field string
script *Script
missing interface{}
subAggregations map[string]Aggregation
meta map[string]interface{}

Expand Down Expand Up @@ -42,6 +43,12 @@ func (a *DateHistogramAggregation) Script(script *Script) *DateHistogramAggregat
return a
}

// Missing configures the value to use when documents miss a value.
func (a *DateHistogramAggregation) Missing(missing interface{}) *DateHistogramAggregation {
a.missing = missing
return a
}

func (a *DateHistogramAggregation) SubAggregation(name string, subAggregation Aggregation) *DateHistogramAggregation {
a.subAggregations[name] = subAggregation
return a
Expand Down Expand Up @@ -219,6 +226,9 @@ func (a *DateHistogramAggregation) Source() (interface{}, error) {
}
opts["script"] = src
}
if a.missing != nil {
opts["missing"] = a.missing
}

opts["interval"] = a.interval
if a.minDocCount != nil {
Expand Down
17 changes: 17 additions & 0 deletions search_aggs_bucket_date_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,20 @@ func TestDateHistogramAggregation(t *testing.T) {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestDateHistogramAggregationWithMissing(t *testing.T) {
agg := NewDateHistogramAggregation().Field("date").Interval("year").Missing("1900")
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"date_histogram":{"field":"date","interval":"year","missing":"1900"}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}
10 changes: 10 additions & 0 deletions search_aggs_bucket_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package elastic
type HistogramAggregation struct {
field string
script *Script
missing interface{}
subAggregations map[string]Aggregation
meta map[string]interface{}

Expand Down Expand Up @@ -40,6 +41,12 @@ func (a *HistogramAggregation) Script(script *Script) *HistogramAggregation {
return a
}

// Missing configures the value to use when documents miss a value.
func (a *HistogramAggregation) Missing(missing interface{}) *HistogramAggregation {
a.missing = missing
return a
}

func (a *HistogramAggregation) SubAggregation(name string, subAggregation Aggregation) *HistogramAggregation {
a.subAggregations[name] = subAggregation
return a
Expand Down Expand Up @@ -193,6 +200,9 @@ func (a *HistogramAggregation) Source() (interface{}, error) {
}
opts["script"] = src
}
if a.missing != nil {
opts["missing"] = a.missing
}

opts["interval"] = a.interval
if a.order != "" {
Expand Down
17 changes: 17 additions & 0 deletions search_aggs_bucket_histogram_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,3 +42,20 @@ func TestHistogramAggregationWithMetaData(t *testing.T) {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestHistogramAggregationWithMissing(t *testing.T) {
agg := NewHistogramAggregation().Field("price").Interval(50).Missing("n/a")
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"histogram":{"field":"price","interval":50,"missing":"n/a"}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}
10 changes: 10 additions & 0 deletions search_aggs_bucket_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
type RangeAggregation struct {
field string
script *Script
missing interface{}
subAggregations map[string]Aggregation
meta map[string]interface{}
keyed *bool
Expand Down Expand Up @@ -48,6 +49,12 @@ func (a *RangeAggregation) Script(script *Script) *RangeAggregation {
return a
}

// Missing configures the value to use when documents miss a value.
func (a *RangeAggregation) Missing(missing interface{}) *RangeAggregation {
a.missing = missing
return a
}

func (a *RangeAggregation) SubAggregation(name string, subAggregation Aggregation) *RangeAggregation {
a.subAggregations[name] = subAggregation
return a
Expand Down Expand Up @@ -163,6 +170,9 @@ func (a *RangeAggregation) Source() (interface{}, error) {
}
opts["script"] = src
}
if a.missing != nil {
opts["missing"] = a.missing
}

if a.keyed != nil {
opts["keyed"] = *a.keyed
Expand Down
20 changes: 20 additions & 0 deletions search_aggs_bucket_range_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,3 +134,23 @@ func TestRangeAggregationWithMetaData(t *testing.T) {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestRangeAggregationWithMissing(t *testing.T) {
agg := NewRangeAggregation().Field("price").Missing(0)
agg = agg.AddRange(nil, 50)
agg = agg.AddRange(50, 100)
agg = agg.AddRange(100, nil)
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"range":{"field":"price","missing":0,"ranges":[{"to":50},{"from":50,"to":100},{"from":100}]}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}
145 changes: 145 additions & 0 deletions search_aggs_bucket_sampler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright 2012-2016 Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package elastic

// SamplerAggregation is a filtering aggregation used to limit any
// sub aggregations' processing to a sample of the top-scoring documents.
// Optionally, diversity settings can be used to limit the number of matches
// that share a common value such as an "author".
// See: https://www.elastic.co/guide/en/elasticsearch/reference/2.x/search-aggregations-bucket-sampler-aggregation.html
type SamplerAggregation struct {
field string
script *Script
missing interface{}
subAggregations map[string]Aggregation
meta map[string]interface{}

shardSize int
maxDocsPerValue int
executionHint string
}

func NewSamplerAggregation() *SamplerAggregation {
return &SamplerAggregation{
shardSize: -1,
maxDocsPerValue: -1,
subAggregations: make(map[string]Aggregation),
}
}

func (a *SamplerAggregation) Field(field string) *SamplerAggregation {
a.field = field
return a
}

func (a *SamplerAggregation) Script(script *Script) *SamplerAggregation {
a.script = script
return a
}

// Missing configures the value to use when documents miss a value.
func (a *SamplerAggregation) Missing(missing interface{}) *SamplerAggregation {
a.missing = missing
return a
}

func (a *SamplerAggregation) SubAggregation(name string, subAggregation Aggregation) *SamplerAggregation {
a.subAggregations[name] = subAggregation
return a
}

// Meta sets the meta data to be included in the aggregation response.
func (a *SamplerAggregation) Meta(metaData map[string]interface{}) *SamplerAggregation {
a.meta = metaData
return a
}

// ShardSize sets the maximum number of docs returned from each shard.
func (a *SamplerAggregation) ShardSize(shardSize int) *SamplerAggregation {
a.shardSize = shardSize
return a
}

func (a *SamplerAggregation) MaxDocsPerValue(maxDocsPerValue int) *SamplerAggregation {
a.maxDocsPerValue = maxDocsPerValue
return a
}

func (a *SamplerAggregation) ExecutionHint(hint string) *SamplerAggregation {
a.executionHint = hint
return a
}

func (a *SamplerAggregation) Source() (interface{}, error) {
// Example:
// {
// "aggs" : {
// "sample" : {
// "sampler" : {
// "field" : "user.id",
// "shard_size" : 200
// },
// "aggs": {
// "keywords": {
// "significant_terms": {
// "field": "text"
// }
// }
// }
// }
// }
// }
//
// This method returns only the { "sampler" : { ... } } part.

source := make(map[string]interface{})
opts := make(map[string]interface{})
source["sampler"] = opts

// ValuesSourceAggregationBuilder
if a.field != "" {
opts["field"] = a.field
}
if a.script != nil {
src, err := a.script.Source()
if err != nil {
return nil, err
}
opts["script"] = src
}
if a.missing != nil {
opts["missing"] = a.missing
}

if a.shardSize >= 0 {
opts["shard_size"] = a.shardSize
}
if a.maxDocsPerValue >= 0 {
opts["max_docs_per_value"] = a.maxDocsPerValue
}
if a.executionHint != "" {
opts["execution_hint"] = a.executionHint
}

// AggregationBuilder (SubAggregations)
if len(a.subAggregations) > 0 {
aggsMap := make(map[string]interface{})
source["aggregations"] = aggsMap
for name, aggregate := range a.subAggregations {
src, err := aggregate.Source()
if err != nil {
return nil, err
}
aggsMap[name] = src
}
}

// Add Meta data if available
if len(a.meta) > 0 {
source["meta"] = a.meta
}

return source, nil
}
52 changes: 52 additions & 0 deletions search_aggs_bucket_sampler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2012-2016 Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

package elastic

import (
"encoding/json"
"testing"
)

func TestSamplerAggregation(t *testing.T) {
keywordsAgg := NewSignificantTermsAggregation().Field("text")
agg := NewSamplerAggregation().
Field("user.id").
ShardSize(200).
SubAggregation("keywords", keywordsAgg)
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"aggregations":{"keywords":{"significant_terms":{"field":"text"}}},"sampler":{"field":"user.id","shard_size":200}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestSamplerAggregationWithMissing(t *testing.T) {
keywordsAgg := NewSignificantTermsAggregation().Field("text")
agg := NewSamplerAggregation().
Field("user.id").
Missing("n/a").
SubAggregation("keywords", keywordsAgg)
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"aggregations":{"keywords":{"significant_terms":{"field":"text"}}},"sampler":{"field":"user.id","missing":"n/a"}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}
Loading

0 comments on commit aaba83e

Please sign in to comment.