Skip to content

Commit

Permalink
Merge pull request #32 from wesen/task/add-aggregations
Browse files Browse the repository at this point in the history
Add aggregation support and improve error reporting
  • Loading branch information
wesen authored Jan 29, 2025
2 parents 22f9091 + 4b6dba7 commit 3cc7a3e
Show file tree
Hide file tree
Showing 5 changed files with 510 additions and 12 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ dist/
dir-env

.envrc
.history
25 changes: 25 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [

{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${fileDirname}"
},
{
"name": "Run Index Stats",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}/cmd/escuse-me",
"args": ["mento", "index-stats", "--output", "yaml"],
"envFile": "${workspaceFolder}/.envrc"
}
]
}
3 changes: 2 additions & 1 deletion cmd/escuse-me/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"embed"
"fmt"
"os"

clay "github.com/go-go-golems/clay/pkg"
edit_command "github.com/go-go-golems/clay/pkg/cmds/edit-command"
ls_commands "github.com/go-go-golems/clay/pkg/cmds/ls-commands"
Expand All @@ -23,7 +25,6 @@ import (
"github.com/pkg/errors"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"os"
)

var rootCmd = &cobra.Command{
Expand Down
114 changes: 103 additions & 11 deletions pkg/cmds/cmd.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package cmds

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strings"

"github.com/elastic/go-elasticsearch/v8"
Expand Down Expand Up @@ -216,6 +218,53 @@ func (esc *ElasticSearchCommand) RenderQueryToJSON(parameters map[string]interfa
return string(js), nil
}

func (esc *ElasticSearchCommand) processAggregations(
ctx context.Context,
gp middlewares.Processor,
aggName string,
rawAgg json.RawMessage,
) error {
var agg map[string]interface{}
if err := json.Unmarshal(rawAgg, &agg); err != nil {
return errors.Wrap(err, "failed to unmarshal aggregation")
}

row := orderedmap.New[string, interface{}]()
row.Set("aggregation", aggName)

// Process buckets if they exist
if buckets, ok := agg["buckets"].([]interface{}); ok {
for _, bucket := range buckets {
bucketMap, ok := bucket.(map[string]interface{})
if !ok {
continue
}

bucketRow := orderedmap.New[string, interface{}]()
bucketRow.Set("aggregation", aggName)

// Add all bucket fields to the row
for k, v := range bucketMap {
bucketRow.Set(k, v)
}

if err := gp.AddRow(ctx, bucketRow); err != nil {
return err
}
}
return nil
}

// Process non-bucket aggregations
for k, v := range agg {
if k != "buckets" && k != "meta" {
row.Set(k, v)
}
}

return gp.AddRow(ctx, row)
}

func (esc *ElasticSearchCommand) RunQueryIntoGlaze(
ctx context.Context,
es *elasticsearch.Client,
Expand All @@ -240,22 +289,22 @@ func (esc *ElasticSearchCommand) RunQueryIntoGlaze(

queryReader := strings.NewReader(query)

os := []func(*esapi.SearchRequest){
os_ := []func(*esapi.SearchRequest){
es.Search.WithContext(ctx),
es.Search.WithBody(queryReader),
es.Search.WithTrackTotalHits(true),
}

os = append(os, es.Search.WithExplain(esHelperSettings.Explain))
os_ = append(os_, es.Search.WithExplain(esHelperSettings.Explain))
if esHelperSettings.Index != "" {
os = append(os, es.Search.WithIndex(esHelperSettings.Index))
os_ = append(os_, es.Search.WithIndex(esHelperSettings.Index))
} else if esc.DefaultIndex != "" {
os = append(os, es.Search.WithIndex(esc.DefaultIndex))
os_ = append(os_, es.Search.WithIndex(esc.DefaultIndex))
} else {
return errors.New("No index specified")
}

res, err := es.Search(os...)
res, err := es.Search(os_...)
if err != nil {
return errors.Wrapf(err, "Could not run query")
}
Expand All @@ -269,13 +318,42 @@ func (esc *ElasticSearchCommand) RunQueryIntoGlaze(

if res.IsError() {
var e map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&e); err != nil {
body, err := io.ReadAll(res.Body)
if err != nil {
return errors.Wrap(err, "Error reading response body")
}

if err := json.Unmarshal(body, &e); err != nil {
return errors.New("Error parsing the response body")
}

if esHelperSettings.RawResults {
// Parse and pretty print JSON error response to stderr
var prettyJSON bytes.Buffer
if err := json.Indent(&prettyJSON, body, "", " "); err != nil {
fmt.Fprintf(os.Stderr, "%s\n", string(body)) // Fallback to raw if JSON parsing fails
} else {
fmt.Fprintf(os.Stderr, "%s\n", prettyJSON.String())
}
} else {
// Print the response status and error information.
errMessage := fmt.Sprintf("[%s] %s: %s", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"])
return errors.New(errMessage)
// Extract and print just the error reason and root cause
if errorObj, ok := e["error"].(map[string]interface{}); ok {
if reason, ok := errorObj["reason"].(string); ok {
fmt.Fprintf(os.Stderr, "Error reason: %s\n", reason)
}
if rootCauses, ok := errorObj["root_cause"].([]interface{}); ok && len(rootCauses) > 0 {
if rootCause, ok := rootCauses[0].(map[string]interface{}); ok {
rootType, _ := rootCause["type"].(string)
rootReason, _ := rootCause["reason"].(string)
fmt.Fprintf(os.Stderr, "Root cause: [%s] %s\n", rootType, rootReason)
}
}
}
}

// Print the response status and error information.
errMessage := fmt.Sprintf("[%s] %s: %s", res.Status(), e["error"].(map[string]interface{})["type"], e["error"].(map[string]interface{})["reason"])
return errors.New(errMessage)
}

body, err := io.ReadAll(res.Body)
Expand All @@ -286,7 +364,12 @@ func (esc *ElasticSearchCommand) RunQueryIntoGlaze(
if esHelperSettings.RawResults {
// For raw results, just output the entire response as a single row
row := orderedmap.New[string, interface{}]()
row.Set("raw_results", json.RawMessage(body))
var obj interface{}
err := json.Unmarshal(body, &obj)
if err != nil {
return err
}
row.Set("raw_results", obj)
err = gp.AddRow(ctx, row)
if err != nil {
return err
Expand All @@ -299,15 +382,23 @@ func (esc *ElasticSearchCommand) RunQueryIntoGlaze(
return errors.New("Error parsing the response body")
}

// Process hits
for _, hit := range r.Hits.Hits {
row := hit.Source
row.Set("_score", hit.Score)
err = gp.AddRow(ctx, row)
if err != nil {
return err
}
}

// TODO(manuel, 2023-02-22) Add explain functionality
// Process aggregations if they exist
if len(r.Aggregations) > 0 {
for aggName, rawAgg := range r.Aggregations {
if err := esc.processAggregations(ctx, gp, aggName, rawAgg); err != nil {
return errors.Wrapf(err, "failed to process aggregation %s", aggName)
}
}
}

return nil
Expand All @@ -320,4 +411,5 @@ type ElasticSearchResult struct {
Score float64 `json:"_score"`
} `json:"hits,omitempty"`
} `json:"hits"`
Aggregations map[string]json.RawMessage `json:"aggregations,omitempty"`
}
Loading

0 comments on commit 3cc7a3e

Please sign in to comment.