diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d30a2afc..6ed6331b9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,7 +18,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) ### Added - Github workflow for changelog verification ([#172](https://github.com/opensearch-project/opensearch-go/pull/172)) - Add Go Documentation link for the client ([#182](https://github.com/opensearch-project/opensearch-go/pull/182)) -- Support for Amazon OpenSearch Serverless ([#216](https://github.com/opensearch-project/opensearch-go/pull/216)) +- Support for Amazon OpenSearch Serverless ([#216](https://github.com/opensearch-project/opensearch-go/pull/216), [#252](https://github.com/opensearch-project/opensearch-go/pull/252)) ### Dependencies - Bumps `github.com/stretchr/testify` from 1.8.0 to 1.8.1 diff --git a/USER_GUIDE.md b/USER_GUIDE.md index 7cf07b49b..faaf1e3da 100644 --- a/USER_GUIDE.md +++ b/USER_GUIDE.md @@ -184,37 +184,40 @@ func main() { // Create an AWS request Signer and load AWS configuration using default config folder or env vars. // See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/request-signing.html#request-signing-go - signer, err := requestsigner.NewSigner(session.Options{SharedConfigState: session.SharedConfigEnable}) + signer, err := requestsigner.NewSignerWithService( + session.Options{SharedConfigState: session.SharedConfigEnable}, + requestsigner.OpenSearchService, // Use requestsigner.OpenSearchServerless for Amazon OpenSearch Serverless. + ) if err != nil { - log.Fatal(err) // Do not log.fatal in a production ready app. + log.Fatalf("failed to create signer: %v", err) // Do not log.fatal in a production ready app. } - // Create an opensearch client and use the request-signer + // Create an opensearch client and use the request-signer. client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { - log.Fatal("client creation err", err) + log.Fatalf("failed to create new opensearch client: %v", err) } ping := opensearchapi.PingRequest{} resp, err := ping.Do(ctx, client) if err != nil { - log.Fatal(err) + log.Fatalf("failed to ping: %v", err) } defer resp.Body.Close() if resp.IsError() { - log.Println("ping response status ", resp.Status()) + log.Printf("ping response status: %q", resp.Status()) respBody, err := io.ReadAll(resp.Body) if err != nil { - log.Fatal("response body read err", err) + log.Fatalf("failed to read response body body: %v", err) } - log.Fatal("ping resp body", respBody) + log.Fatalf("ping resp body: %s", respBody) } log.Println("PING OK") @@ -231,6 +234,7 @@ package main import ( "context" "log" + "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" @@ -251,25 +255,25 @@ func main() { ), ) if err != nil { - log.Fatal(err) // Do not log.fatal in a production ready app. + log.Fatalf("failed to load aws configuraiton: %v", err) // Do not log.fatal in a production ready app. } // Create an AWS request Signer and load AWS configuration using default config folder or env vars. - signer, err := requestsigner.NewSignerWithService(awsCfg, "es") // "aoss" for Amazon OpenSearch Serverless + signer, err := requestsigner.NewSignerWithService(awsCfg, "es") // Use "aoss" for Amazon OpenSearch Serverless if err != nil { - log.Fatal(err) // Do not log.fatal in a production ready app. + log.Fatalf("failed to create signer: %v", err) } - // Create an opensearch client and use the request-signer + // Create an opensearch client and use the request-signer. client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { - log.Fatal("client creation err", err) + log.Fatalf("failed to create new opensearch client: %v", err) } - indexName = "go-test-index" + indexName := "go-test-index" // Define index mapping. mapping := strings.NewReader(`{ @@ -287,11 +291,9 @@ func main() { } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { - log.Println("Error ", err.Error()) - log.Println("failed to create index ", err) - log.Fatal("create response body read err", err) + log.Fatalf("failed to create index: %v", err) } - log.Println(createIndexResponse) + log.Printf("created index: %#v", createIndexResponse) // Delete previously created index. deleteIndex := opensearchapi.IndicesDeleteRequest{ @@ -300,10 +302,9 @@ func main() { deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) if err != nil { - log.Println("failed to delete index ", err) - log.Fatal("delete index response body read err", err) + log.Fatalf("failed to delete index: %v", err) } - log.Println("deleting index", deleteIndexResponse) + log.Printf("deleted index: %#v", deleteIndexResponse) } func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc { diff --git a/signer/aws/aws.go b/signer/aws/aws.go index 42abc33b7..3df651e07 100644 --- a/signer/aws/aws.go +++ b/signer/aws/aws.go @@ -11,6 +11,8 @@ package aws import ( "bytes" + "crypto/sha256" + "encoding/hex" "errors" "fmt" "io" @@ -22,28 +24,34 @@ import ( v4 "github.com/aws/aws-sdk-go/aws/signer/v4" ) -// OpenSearchService AWS OpenSearchService Name +// OpenSearchService Amazon OpenSearch Service Name const OpenSearchService = "es" -// Signer is a interface that will implement opensearchtransport.Signer +// OpenSearchServerless Amazon OpenSearch Serverless Name +const OpenSearchServerless = "aoss" + +const emptyBodySHA256 = "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824" + +// Signer is an interface that will implement opensearchtransport.Signer type Signer struct { session session.Session service string } -// NewSigner returns an instance of Signer for AWS OpenSearchService +// NewSigner returns an instance of Signer for configured for Amazon OpenSearch Service. +// Use NewSignerWithService to configure it for another service such as Amazon OpenSearch Serverless. func NewSigner(opts session.Options) (*Signer, error) { return NewSignerWithService(opts, OpenSearchService) } -// NewSignerWithService returns an instance of Signer for given service +// NewSignerWithService returns an instance of Signer for a given service. func NewSignerWithService(opts session.Options, service string) (*Signer, error) { if len(strings.TrimSpace(service)) < 1 { return nil, errors.New("service cannot be empty") } awsSession, err := session.NewSessionWithOptions(opts) if err != nil { - return nil, fmt.Errorf("failed to get session from given option %v due to %s", opts, err) + return nil, fmt.Errorf("failed to get session from given options %v: %v", opts, err) } return &Signer{ session: *awsSession, @@ -51,25 +59,50 @@ func NewSignerWithService(opts session.Options, service string) (*Signer, error) }, nil } -// SignRequest signs the request using SigV4 +// SignRequest signs the request using SigV4. func (s Signer) SignRequest(req *http.Request) error { - signer := v4.NewSigner(s.session.Config.Credentials) - return sign(req, s.session.Config.Region, s.service, signer) - + return sign(req, s.session.Config.Region, s.service, v4.NewSigner(s.session.Config.Credentials)) } -func sign(req *http.Request, region *string, serviceName string, signer *v4.Signer) (err error) { +func sign(req *http.Request, region *string, serviceName string, signer *v4.Signer) error { if region == nil || len(*region) == 0 { return fmt.Errorf("aws region cannot be empty") } - if req.Body == nil { - _, err = signer.Sign(req, nil, serviceName, *region, time.Now().UTC()) - return + + var body io.ReadSeeker + var contentSha256Hash = emptyBodySHA256 + + if req.Body != nil { + b, err := io.ReadAll(req.Body) + if err != nil { + return fmt.Errorf("failed to read request body: %v", err) + } + body = bytes.NewReader(b) + hash, err := hexEncodedSha256(b) + if err != nil { + return fmt.Errorf("failed to calculate hash of request body: %v", err) + } + contentSha256Hash = hash } - buf, err := io.ReadAll(req.Body) - if err != nil { + // Add the "X-Amz-Content-Sha256" header as required by Amazon OpenSearch Serverless. + req.Header.Set("X-Amz-Content-Sha256", contentSha256Hash) + + if _, err := signer.Sign(req, body, serviceName, *region, time.Now().UTC()); err != nil { return err } - _, err = signer.Sign(req, bytes.NewReader(buf), serviceName, *region, time.Now().UTC()) - return + + return nil +} + +func hexEncodedSha256(b []byte) (string, error) { + hasher := sha256.New() + + _, err := hasher.Write(b) + if err != nil { + return "", fmt.Errorf("failed to write: %v", err) + } + + digest := hasher.Sum(nil) + + return hex.EncodeToString(digest), nil } diff --git a/signer/aws/aws_test.go b/signer/aws/aws_test.go index 3c049afbf..48ef4d8d0 100644 --- a/signer/aws/aws_test.go +++ b/signer/aws/aws_test.go @@ -11,26 +11,29 @@ package aws import ( "bytes" + "errors" + "io" "net/http" - "os" "testing" "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" "github.com/stretchr/testify/assert" ) +func TestConstants(t *testing.T) { + assert.Equal(t, "es", OpenSearchService) + assert.Equal(t, "aoss", OpenSearchServerless) + assert.Equal(t, "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824", emptyBodySHA256) +} + func TestV4Signer(t *testing.T) { + t.Run("sign request failed due to no region found", func(t *testing.T) { req, err := http.NewRequest(http.MethodGet, "https://localhost:9200", nil) assert.NoError(t, err) - region := os.Getenv("AWS_REGION") - os.Setenv("AWS_REGION", "") - defer func() { - os.Setenv("AWS_REGION", region) - }() + sessionOptions := session.Options{ Config: aws.Config{ Credentials: credentials.NewStaticCredentials("AKID", "SECRET_KEY", "TOKEN"), @@ -38,19 +41,15 @@ func TestV4Signer(t *testing.T) { } signer, err := NewSigner(sessionOptions) assert.NoError(t, err) - err = signer.SignRequest(req) - assert.EqualErrorf( - t, err, "aws region cannot be empty", "unexpected error") + err = signer.SignRequest(req) + assert.EqualError(t, err, "aws region cannot be empty") }) + t.Run("sign request success", func(t *testing.T) { req, err := http.NewRequest(http.MethodGet, "https://localhost:9200", nil) assert.NoError(t, err) - region := os.Getenv("AWS_REGION") - os.Setenv("AWS_REGION", "us-west-2") - defer func() { - os.Setenv("AWS_REGION", region) - }() + sessionOptions := session.Options{ Config: aws.Config{ Region: aws.String("us-west-2"), @@ -59,11 +58,14 @@ func TestV4Signer(t *testing.T) { } signer, err := NewSigner(sessionOptions) assert.NoError(t, err) + err = signer.SignRequest(req) assert.NoError(t, err) + q := req.Header assert.NotEmpty(t, q.Get("Authorization")) assert.NotEmpty(t, q.Get("X-Amz-Date")) + assert.Equal(t, "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824", q.Get("X-Amz-Content-Sha256")) }) t.Run("sign request success with body", func(t *testing.T) { @@ -71,11 +73,7 @@ func TestV4Signer(t *testing.T) { http.MethodPost, "https://localhost:9200", bytes.NewBuffer([]byte(`some data`))) assert.NoError(t, err) - region := os.Getenv("AWS_REGION") - os.Setenv("AWS_REGION", "us-west-2") - defer func() { - os.Setenv("AWS_REGION", region) - }() + sessionOptions := session.Options{ Config: aws.Config{ Region: aws.String("us-west-2"), @@ -84,39 +82,41 @@ func TestV4Signer(t *testing.T) { } signer, err := NewSigner(sessionOptions) assert.NoError(t, err) + err = signer.SignRequest(req) assert.NoError(t, err) + q := req.Header assert.NotEmpty(t, q.Get("Authorization")) assert.NotEmpty(t, q.Get("X-Amz-Date")) + assert.Equal(t, "1307990e6ba5ca145eb35e99182a9bec46531bc54ddf656a602c780fa0240dee", q.Get("X-Amz-Content-Sha256")) }) - t.Run("sign request success with body for other AWS Services", func(t *testing.T) { + t.Run("sign request success with body for OpenSearch Service Serverless", func(t *testing.T) { req, err := http.NewRequest( http.MethodPost, "https://localhost:9200", bytes.NewBuffer([]byte(`some data`))) assert.NoError(t, err) - region := os.Getenv("AWS_REGION") - os.Setenv("AWS_REGION", "us-west-2") - defer func() { - os.Setenv("AWS_REGION", region) - }() + sessionOptions := session.Options{ Config: aws.Config{ Region: aws.String("us-west-2"), Credentials: credentials.NewStaticCredentials("AKID", "SECRET_KEY", "TOKEN"), }, } - signer, err := NewSignerWithService(sessionOptions, "ec") + signer, err := NewSignerWithService(sessionOptions, OpenSearchServerless) assert.NoError(t, err) + err = signer.SignRequest(req) assert.NoError(t, err) + q := req.Header assert.NotEmpty(t, q.Get("Authorization")) assert.NotEmpty(t, q.Get("X-Amz-Date")) + assert.Equal(t, "1307990e6ba5ca145eb35e99182a9bec46531bc54ddf656a602c780fa0240dee", q.Get("X-Amz-Content-Sha256")) }) - t.Run("sign request failed due to invalid service", func(t *testing.T) { + t.Run("new signer failed due to empty service", func(t *testing.T) { sessionOptions := session.Options{ Config: aws.Config{ Region: aws.String("us-west-2"), @@ -125,6 +125,41 @@ func TestV4Signer(t *testing.T) { } _, err := NewSignerWithService(sessionOptions, "") assert.EqualError(t, err, "service cannot be empty") + }) + t.Run("new signer failed due to blank service", func(t *testing.T) { + sessionOptions := session.Options{ + Config: aws.Config{ + Region: aws.String("us-west-2"), + Credentials: credentials.NewStaticCredentials("AKID", "SECRET_KEY", "TOKEN"), + }, + } + _, err := NewSignerWithService(sessionOptions, " ") + assert.EqualError(t, err, "service cannot be empty") + }) + + t.Run("sign request failed due to invalid body", func(t *testing.T) { + req, err := http.NewRequest(http.MethodPost, "https://localhost:9200", nil) + assert.NoError(t, err) + + req.Body = io.NopCloser(brokenReader("boom")) + + sessionOptions := session.Options{ + Config: aws.Config{ + Region: aws.String("us-west-2"), + Credentials: credentials.NewStaticCredentials("AKID", "SECRET_KEY", "TOKEN"), + }, + } + signer, err := NewSigner(sessionOptions) + assert.NoError(t, err) + + err = signer.SignRequest(req) + assert.EqualError(t, err, "failed to read request body: boom") }) } + +type brokenReader string + +func (br brokenReader) Read([]byte) (int, error) { + return 0, errors.New(string(br)) +}