Skip to content

Commit

Permalink
Add simple locking
Browse files Browse the repository at this point in the history
Move all ES query building into es package. Fixes #1.
  • Loading branch information
hdpe committed Jan 10, 2021
1 parent 84eea6e commit 53de3f7
Show file tree
Hide file tree
Showing 13 changed files with 306 additions and 69 deletions.
2 changes: 2 additions & 0 deletions cmd/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,15 @@ func newContext(envName string) *context.Context {
}

changelog := resource.NewChangelog(conf.Changelog, esClient)
lock := resource.NewLock(conf.Changelog, esClient)
proc := resource.NewPreprocessor(conf.Preprocess)

return &context.Context{
Conf: conf,
Schema: resSchema,
Es: esClient,
Changelog: changelog,
Lock: lock,
Proc: proc,
}
}
9 changes: 7 additions & 2 deletions cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var importCmd = &cobra.Command{
}
return validateEnv(args[2])
},
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
resourceType := args[0]
resourceIdentifier := args[1]
envName := args[2]
Expand All @@ -31,11 +31,16 @@ var importCmd = &cobra.Command{

i := imp.NewImporter(ctx.Changelog, ctx.Schema, ctx.Proc)

getLock(ctx, envName)
defer releaseLock(ctx, envName)

err := i.ImportResource(resourceType, resourceIdentifier)

if err != nil {
fatalError("couldn't import resource: %v", err)
return fmt.Errorf("couldn't import resource: %v", err)
}

return nil
},
}

Expand Down
17 changes: 9 additions & 8 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,44 +28,45 @@ var migrateCmd = &cobra.Command{
}
return validateEnv(args[0])
},
Run: func(cmd *cobra.Command, args []string) {
RunE: func(cmd *cobra.Command, args []string) error {
envName := args[0]

ctx := newContext(envName)

planner := plan.NewPlanner(ctx.Es, ctx.Conf, ctx.Changelog, ctx.Schema, ctx.Proc, &util.DefaultClock{})

getLock(ctx, envName)
defer releaseLock(ctx, envName)

resPlan, err := planner.Plan()

if err != nil {
fatalError("couldn't plan update: %v", err)
return fmt.Errorf("couldn't plan update: %w", err)
}

logPlan(resPlan, ctx.Conf.Server)

if len(resPlan) == 0 {
os.Exit(0)
}

if !approve {
reader := bufio.NewReader(os.Stdin)
fmt.Print("\nConfirm [Y/n]: ")
text, _ := reader.ReadString('\n')

if strings.ToLower(text) != "y\n" {
println("Cancelled")
os.Exit(0)
return nil
}
}

coll := plan.NewCollector()

for _, item := range resPlan {
if err = item.Execute(ctx.Es, ctx.Changelog, coll); err != nil {
fatalError("couldn't execute %v: %v", item, err)
return fmt.Errorf("couldn't execute %v: %v", item, err)
}
}

println("Complete")
return nil
},
}

Expand Down
13 changes: 13 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cmd

import (
"fmt"
"github.com/hdpe.me/esup/context"
"github.com/spf13/cobra"
"os"
)
Expand All @@ -24,3 +25,15 @@ func fatalError(format string, a ...interface{}) {
println(fmt.Sprintf(format, a...))
os.Exit(1)
}

func getLock(ctx *context.Context, envName string) {
if err := ctx.Lock.Get(envName); err != nil {
fatalError("couldn't get lock: %v", err)
}
}

func releaseLock(ctx *context.Context, envName string) {
if err := ctx.Lock.Release(envName); err != nil {
fatalError("couldn't release lock: %v", err)
}
}
9 changes: 7 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ func NewConfig() (Config, error) {
viper := viperlib.New()
viper.SetDefault("server.address", "http://localhost:9200")
viper.SetDefault("changelog.index", "esup-changelog0")
viper.SetDefault("changelog.lockIndex", "esup-lock0")
viper.SetDefault("pipelines.directory", "./pipelines")
viper.SetDefault("indexSets.directory", "./indexSets")
viper.SetDefault("documents.directory", "./documents")
Expand All @@ -35,7 +36,10 @@ func NewConfig() (Config, error) {
ApiKey: viper.GetString("server.apiKey"),
},
PrototypeConfig{Environment: viper.GetString("prototype.environment")},
ChangelogConfig{Index: viper.GetString("changelog.index")},
ChangelogConfig{
Index: viper.GetString("changelog.index"),
LockIndex: viper.GetString("changelog.lockIndex"),
},
IndexSetsConfig{Directory: viper.GetString("indexSets.directory")},
PipelinesConfig{Directory: viper.GetString("pipelines.directory")},
DocumentsConfig{Directory: viper.GetString("documents.directory")},
Expand Down Expand Up @@ -63,7 +67,8 @@ type PrototypeConfig struct {
}

type ChangelogConfig struct {
Index string
Index string
LockIndex string
}

type IndexSetsConfig struct {
Expand Down
1 change: 1 addition & 0 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ type Context struct {
Schema schema.Schema
Es *es.Client
Changelog *resource.Changelog
Lock *resource.Lock
Proc *resource.Preprocessor
}
57 changes: 54 additions & 3 deletions es/changelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/hdpe.me/esup/util"
"time"
)

type ChangelogEntry struct {
Expand All @@ -12,6 +13,36 @@ type ChangelogEntry struct {
Meta string
}

func CreateChangelogIndex(es *Client, indexName string) error {
return es.CreateIndex(indexName, `{
"mappings": {
"properties": {
"resource_type": {
"type": "keyword"
},
"resource_identifier": {
"type": "keyword"
},
"final_name": {
"type": "keyword"
},
"env_name": {
"type": "keyword"
},
"content": {
"type": "text"
},
"meta": {
"type": "text"
},
"timestamp": {
"type": "date"
}
}
}
}`)
}

func GetChangelogEntry(es *Client, indexName string, resourceType string, resourceIdentifier string,
envName string) (ChangelogEntry, error) {

Expand Down Expand Up @@ -56,11 +87,31 @@ func GetChangelogEntry(es *Client, indexName string, resourceType string, resour
return ChangelogEntry{}, nil
}

_source := res[0]
source := res[0].source

return ChangelogEntry{
IsPresent: true,
Content: _source.Get("content").String(),
Meta: _source.Get("meta").String(),
Content: source.Get("content").String(),
Meta: source.Get("meta").String(),
}, nil
}

func PutChangelogEntry(es *Client, indexName string, resourceType string, resourceIdentifier string, finalName string,
entry ChangelogEntry, envName string) error {

body := map[string]interface{}{
"resource_type": resourceType,
"resource_identifier": resourceIdentifier,
"final_name": finalName,
"content": entry.Content,
"meta": entry.Meta,
"env_name": envName,
"timestamp": time.Now().UTC().Format(systemTimestampLayout),
}

if err := es.IndexDocument(indexName, "", body); err != nil {
return fmt.Errorf("couldn't put changelog entry %v %v: %w", resourceType, resourceIdentifier, err)
}

return nil
}
46 changes: 38 additions & 8 deletions es/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ type Client struct {
client *elasticsearch.Client
}

func (r *Client) Search(indexName string, body map[string]interface{}, o ...func(*esapi.SearchRequest)) ([]gjson.Result, error) {
func (r *Client) Search(indexName string, body map[string]interface{}, o ...func(*esapi.SearchRequest)) ([]Document, error) {
var buf bytes.Buffer

if err := json.NewEncoder(&buf).Encode(body); err != nil {
return []gjson.Result{}, fmt.Errorf("couldn't encode JSON request: %w", err)
return []Document{}, fmt.Errorf("couldn't encode JSON request: %w", err)
}

o0 := func(req *esapi.SearchRequest) {
Expand All @@ -67,30 +67,40 @@ func (r *Client) Search(indexName string, body map[string]interface{}, o ...func
res, err := r.client.Search(req...)

if err != nil {
return []gjson.Result{}, err
return []Document{}, err
}

responseBody, err := getBodyAndVerifyResponse(res)

if err != nil {
return []gjson.Result{}, fmt.Errorf("couldn't get changelog entry: %w", err)
return []Document{}, fmt.Errorf("couldn't search index %v: %w", indexName, err)
}

return gjson.Get(responseBody, "hits.hits.#._source").Array(), nil
docs := make([]Document, 0)
for _, doc := range gjson.Get(responseBody, "hits.hits").Array() {
docs = append(docs, newDocument(doc))
}

return docs, nil
}

func (r *Client) IndexDocument(indexName string, id string, body map[string]interface{}) error {
func (r *Client) IndexDocument(indexName string, id string, body map[string]interface{}, o ...func(*esapi.IndexRequest)) error {
var buf bytes.Buffer

if err := json.NewEncoder(&buf).Encode(body); err != nil {
return fmt.Errorf("couldn't encode JSON request: %w", err)
}

res, err := r.client.Index(indexName, &buf, func(request *esapi.IndexRequest) {
o0 := func(request *esapi.IndexRequest) {
if id != "" {
request.DocumentID = id
}
})
}

req := []func(r *esapi.IndexRequest){o0}
req = append(req, o...)

res, err := r.client.Index(indexName, &buf, req...)

if err != nil {
return err
Expand All @@ -103,6 +113,26 @@ func (r *Client) IndexDocument(indexName string, id string, body map[string]inte
return nil
}

func (r *Client) GetDocument(indexName string, id string) (Document, error) {
res, err := r.client.Get(indexName, id)

if err != nil {
return Document{}, fmt.Errorf("couldn't get document: %w", err)
}

body, err := getBodyOrEmptyAndVerifyResponse(res)

if err != nil {
return Document{}, fmt.Errorf("couldn't get document: %w", err)
}

if body == "" {
return Document{}, nil
}

return newDocument(gjson.Parse(body)), nil
}

func (r *Client) GetIndicesForAlias(alias string) ([]string, error) {
res, err := r.client.Indices.GetAlias(func(req *esapi.IndicesGetAliasRequest) {
req.Name = []string{alias}
Expand Down
27 changes: 27 additions & 0 deletions es/document.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package es

import "github.com/tidwall/gjson"

func newDocument(doc gjson.Result) Document {
return Document{
id: doc.Get("_id").String(),
version: Version{
seqNo: int(doc.Get("_seq_no").Int()),
primaryTerm: int(doc.Get("_primary_term").Int()),
},
source: doc.Get("_source"),
isPresent: true,
}
}

type Document struct {
isPresent bool
id string
version Version
source gjson.Result
}

type Version struct {
seqNo int
primaryTerm int
}
Loading

0 comments on commit 53de3f7

Please sign in to comment.