From 97c32f8479243bf92517929f45958d38c28c89f6 Mon Sep 17 00:00:00 2001 From: Jakob Hahn Date: Wed, 26 Jul 2023 11:57:32 +0200 Subject: [PATCH 1/3] opensearchtransport: remove unneded error returns from connection functions, make defaults const Signed-off-by: Jakob Hahn --- opensearchtransport/connection.go | 86 +++++++++++++++++-------------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/opensearchtransport/connection.go b/opensearchtransport/connection.go index d4214f6bc..58e16ce02 100644 --- a/opensearchtransport/connection.go +++ b/opensearchtransport/connection.go @@ -36,28 +36,25 @@ import ( "time" ) -var ( +const ( defaultResurrectTimeoutInitial = 60 * time.Second defaultResurrectTimeoutFactorCutoff = 5 ) // Selector defines the interface for selecting connections from the pool. -// type Selector interface { Select([]*Connection) (*Connection, error) } // ConnectionPool defines the interface for the connection pool. -// type ConnectionPool interface { Next() (*Connection, error) // Next returns the next available connection. - OnSuccess(*Connection) error // OnSuccess reports that the connection was successful. + OnSuccess(*Connection) // OnSuccess reports that the connection was successful. OnFailure(*Connection) error // OnFailure reports that the connection failed. URLs() []*url.URL // URLs returns the list of URLs of available connections. } // Connection represents a connection to a node. -// type Connection struct { sync.Mutex @@ -81,9 +78,11 @@ type singleConnectionPool struct { type statusConnectionPool struct { sync.Mutex - live []*Connection // List of live connections - dead []*Connection // List of dead connections - selector Selector + live []*Connection // List of live connections + dead []*Connection // List of dead connections + selector Selector + resurrectTimeoutInitial time.Duration + resurrectTimeoutFactorCutoff int metrics *metrics } @@ -95,28 +94,33 @@ type roundRobinSelector struct { } // NewConnectionPool creates and returns a default connection pool. -// -func NewConnectionPool(conns []*Connection, selector Selector) (ConnectionPool, error) { +func NewConnectionPool(conns []*Connection, selector Selector) ConnectionPool { if len(conns) == 1 { - return &singleConnectionPool{connection: conns[0]}, nil + return &singleConnectionPool{connection: conns[0]} } + if selector == nil { selector = &roundRobinSelector{curr: -1} } - return &statusConnectionPool{live: conns, selector: selector}, nil + + return &statusConnectionPool{ + live: conns, + selector: selector, + resurrectTimeoutInitial: defaultResurrectTimeoutInitial, + resurrectTimeoutFactorCutoff: defaultResurrectTimeoutFactorCutoff, + } } // Next returns the connection from pool. -// func (cp *singleConnectionPool) Next() (*Connection, error) { return cp.connection, nil } // OnSuccess is a no-op for single connection pool. -func (cp *singleConnectionPool) OnSuccess(c *Connection) error { return nil } +func (cp *singleConnectionPool) OnSuccess(*Connection) {} // OnFailure is a no-op for single connection pool. -func (cp *singleConnectionPool) OnFailure(c *Connection) error { return nil } +func (cp *singleConnectionPool) OnFailure(*Connection) error { return nil } // URLs returns the list of URLs of available connections. func (cp *singleConnectionPool) URLs() []*url.URL { return []*url.URL{cp.connection.URL} } @@ -124,7 +128,6 @@ func (cp *singleConnectionPool) URLs() []*url.URL { return []*url.URL{cp.connect func (cp *singleConnectionPool) connections() []*Connection { return []*Connection{cp.connection} } // Next returns a connection from pool, or an error. -// func (cp *statusConnectionPool) Next() (*Connection, error) { cp.Lock() defer cp.Unlock() @@ -141,29 +144,28 @@ func (cp *statusConnectionPool) Next() (*Connection, error) { cp.resurrect(c, false) return c, nil } + return nil, errors.New("no connection available") } // OnSuccess marks the connection as successful. -// -func (cp *statusConnectionPool) OnSuccess(c *Connection) error { +func (cp *statusConnectionPool) OnSuccess(c *Connection) { c.Lock() defer c.Unlock() // Short-circuit for live connection if !c.IsDead { - return nil + return } c.markAsHealthy() cp.Lock() defer cp.Unlock() - return cp.resurrect(c, true) + cp.resurrect(c, true) } // OnFailure marks the connection as failed. -// func (cp *statusConnectionPool) OnFailure(c *Connection) error { cp.Lock() defer cp.Unlock() @@ -175,12 +177,14 @@ func (cp *statusConnectionPool) OnFailure(c *Connection) error { debugLogger.Logf("Already removed %s\n", c.URL) } c.Unlock() + return nil } if debugLogger != nil { debugLogger.Logf("Removing %s...\n", c.URL) } + c.markAsDead() cp.scheduleResurrect(c) c.Unlock() @@ -201,12 +205,16 @@ func (cp *statusConnectionPool) OnFailure(c *Connection) error { // Check if connection exists in the list, return error if not. index := -1 + for i, conn := range cp.live { if conn == c { index = i } } + if index < 0 { + // Does this error even get raised? Under what conditions can the connection not be in the cp.live list? + // If the connection is marked dead the function already ended return errors.New("connection not in live list") } @@ -218,15 +226,13 @@ func (cp *statusConnectionPool) OnFailure(c *Connection) error { } // URLs returns the list of URLs of available connections. -// func (cp *statusConnectionPool) URLs() []*url.URL { - var urls []*url.URL - cp.Lock() defer cp.Unlock() - for _, c := range cp.live { - urls = append(urls, c.URL) + urls := make([]*url.URL, len(cp.live)) + for idx, c := range cp.live { + urls[idx] = c.URL } return urls @@ -236,14 +242,14 @@ func (cp *statusConnectionPool) connections() []*Connection { var conns []*Connection conns = append(conns, cp.live...) conns = append(conns, cp.dead...) + return conns } // resurrect adds the connection to the list of available connections. // When removeDead is true, it also removes it from the dead list. // The calling code is responsible for locking. -// -func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) error { +func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) { if debugLogger != nil { debugLogger.Logf("Resurrecting %s\n", c.URL) } @@ -253,28 +259,35 @@ func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) error if removeDead { index := -1 + for i, conn := range cp.dead { if conn == c { index = i } } + if index >= 0 { // Remove item; https://github.com/golang/go/wiki/SliceTricks copy(cp.dead[index:], cp.dead[index+1:]) cp.dead = cp.dead[:len(cp.dead)-1] } } - - return nil } // scheduleResurrect schedules the connection to be resurrected. -// func (cp *statusConnectionPool) scheduleResurrect(c *Connection) { - factor := math.Min(float64(c.Failures-1), float64(defaultResurrectTimeoutFactorCutoff)) - timeout := time.Duration(defaultResurrectTimeoutInitial.Seconds() * math.Exp2(factor) * float64(time.Second)) + factor := math.Min(float64(c.Failures-1), float64(cp.resurrectTimeoutFactorCutoff)) + timeout := time.Duration(cp.resurrectTimeoutInitial.Seconds() * math.Exp2(factor) * float64(time.Second)) + if debugLogger != nil { - debugLogger.Logf("Resurrect %s (failures=%d, factor=%1.1f, timeout=%s) in %s\n", c.URL, c.Failures, factor, timeout, c.DeadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second)) + debugLogger.Logf( + "Resurrect %s (failures=%d, factor=%1.1f, timeout=%s) in %s\n", + c.URL, + c.Failures, + factor, + timeout, + c.DeadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second), + ) } time.AfterFunc(timeout, func() { @@ -296,7 +309,6 @@ func (cp *statusConnectionPool) scheduleResurrect(c *Connection) { } // Select returns the connection in a round-robin fashion. -// func (s *roundRobinSelector) Select(conns []*Connection) (*Connection, error) { s.Lock() defer s.Unlock() @@ -306,7 +318,6 @@ func (s *roundRobinSelector) Select(conns []*Connection) (*Connection, error) { } // markAsDead marks the connection as dead. -// func (c *Connection) markAsDead() { c.IsDead = true if c.DeadSince.IsZero() { @@ -316,13 +327,11 @@ func (c *Connection) markAsDead() { } // markAsLive marks the connection as alive. -// func (c *Connection) markAsLive() { c.IsDead = false } // markAsHealthy marks the connection as healthy. -// func (c *Connection) markAsHealthy() { c.IsDead = false c.DeadSince = time.Time{} @@ -330,7 +339,6 @@ func (c *Connection) markAsHealthy() { } // String returns a readable connection representation. -// func (c *Connection) String() string { c.Lock() defer c.Unlock() From a08a537ba553edcf920eea273e3a051ce2b5fd65 Mon Sep 17 00:00:00 2001 From: Jakob Hahn Date: Wed, 26 Jul 2023 12:20:01 +0200 Subject: [PATCH 2/3] opensearchtransport: Solve linting complains Signed-off-by: Jakob Hahn --- .../connection_benchmark_test.go | 88 ++-- .../connection_integration_test.go | 6 +- .../connection_internal_test.go | 70 ++-- opensearchtransport/discovery.go | 77 ++-- .../discovery_internal_test.go | 19 +- opensearchtransport/doc.go | 4 +- opensearchtransport/logger.go | 42 +- opensearchtransport/logger_benchmark_test.go | 48 ++- opensearchtransport/logger_internal_test.go | 99 +++-- opensearchtransport/metrics.go | 11 - opensearchtransport/metrics_internal_test.go | 9 +- opensearchtransport/opensearchtransport.go | 119 +++--- .../opensearchtransport_benchmark_test.go | 37 +- ...rchtransport_integration_multinode_test.go | 2 +- .../opensearchtransport_integration_test.go | 1 - .../opensearchtransport_internal_test.go | 391 ++++++++---------- 16 files changed, 481 insertions(+), 542 deletions(-) diff --git a/opensearchtransport/connection_benchmark_test.go b/opensearchtransport/connection_benchmark_test.go index 04d4059cb..fcc45b8f5 100644 --- a/opensearchtransport/connection_benchmark_test.go +++ b/opensearchtransport/connection_benchmark_test.go @@ -24,29 +24,48 @@ // specific language governing permissions and limitations // under the License. -// +build !integration +//go:build !integration +//nolint:testpackage // Can't be testpackage, because it tests the function resurrect() package opensearchtransport import ( "fmt" "log" "net/http" + _ "net/http/pprof" "net/url" "testing" - - _ "net/http/pprof" + "time" ) func init() { - go func() { log.Fatalln(http.ListenAndServe("localhost:6060", nil)) }() + go func() { + server := &http.Server{ + Addr: "localhost:6060", + ReadTimeout: 5 * time.Second, + WriteTimeout: 5 * time.Second, + } + log.Fatalln(server.ListenAndServe()) + }() +} + +func initSingleConnectionPool() *singleConnectionPool { + return &singleConnectionPool{ + connection: &Connection{ + URL: &url.URL{ + Scheme: "http", + Host: "foo1", + }, + }, + } } func BenchmarkSingleConnectionPool(b *testing.B) { b.ReportAllocs() b.Run("Next()", func(b *testing.B) { - pool := &singleConnectionPool{connection: &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}}} + pool := initSingleConnectionPool() b.Run("Single ", func(b *testing.B) { for i := 0; i < b.N; i++ { @@ -71,7 +90,7 @@ func BenchmarkSingleConnectionPool(b *testing.B) { }) b.Run("OnFailure()", func(b *testing.B) { - pool := &singleConnectionPool{connection: &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}}} + pool := initSingleConnectionPool() b.Run("Single ", func(b *testing.B) { c, _ := pool.Next() @@ -98,19 +117,25 @@ func BenchmarkSingleConnectionPool(b *testing.B) { }) } +func createStatusConnectionPool(conns []*Connection) *statusConnectionPool { + return &statusConnectionPool{ + live: conns, + selector: &roundRobinSelector{curr: -1}, + resurrectTimeoutInitial: defaultResurrectTimeoutInitial, + resurrectTimeoutFactorCutoff: defaultResurrectTimeoutFactorCutoff, + } +} + func BenchmarkStatusConnectionPool(b *testing.B) { b.ReportAllocs() - var conns []*Connection + conns := make([]*Connection, 1000) for i := 0; i < 1000; i++ { - conns = append(conns, &Connection{URL: &url.URL{Scheme: "http", Host: fmt.Sprintf("foo%d", i)}}) + conns[i] = &Connection{URL: &url.URL{Scheme: "http", Host: fmt.Sprintf("foo%d", i)}} } b.Run("Next()", func(b *testing.B) { - pool := &statusConnectionPool{ - live: conns, - selector: &roundRobinSelector{curr: -1}, - } + pool := createStatusConnectionPool(conns) b.Run("Single ", func(b *testing.B) { for i := 0; i < b.N; i++ { @@ -147,10 +172,7 @@ func BenchmarkStatusConnectionPool(b *testing.B) { }) b.Run("OnFailure()", func(b *testing.B) { - pool := &statusConnectionPool{ - live: conns, - selector: &roundRobinSelector{curr: -1}, - } + pool := createStatusConnectionPool(conns) b.Run("Single ", func(b *testing.B) { c, err := pool.Next() @@ -199,10 +221,7 @@ func BenchmarkStatusConnectionPool(b *testing.B) { }) b.Run("OnSuccess()", func(b *testing.B) { - pool := &statusConnectionPool{ - live: conns, - selector: &roundRobinSelector{curr: -1}, - } + pool := createStatusConnectionPool(conns) b.Run("Single ", func(b *testing.B) { c, err := pool.Next() @@ -211,9 +230,7 @@ func BenchmarkStatusConnectionPool(b *testing.B) { } for i := 0; i < b.N; i++ { - if err := pool.OnSuccess(c); err != nil { - b.Errorf("Unexpected error: %v", err) - } + pool.OnSuccess(c) } }) @@ -226,9 +243,7 @@ func BenchmarkStatusConnectionPool(b *testing.B) { } for pb.Next() { - if err := pool.OnSuccess(c); err != nil { - b.Errorf("Unexpected error: %v", err) - } + pool.OnSuccess(c) } }) }) @@ -242,19 +257,14 @@ func BenchmarkStatusConnectionPool(b *testing.B) { } for pb.Next() { - if err := pool.OnSuccess(c); err != nil { - b.Errorf("Unexpected error: %v", err) - } + pool.OnSuccess(c) } }) }) }) b.Run("resurrect()", func(b *testing.B) { - pool := &statusConnectionPool{ - live: conns, - selector: &roundRobinSelector{curr: -1}, - } + pool := createStatusConnectionPool(conns) b.Run("Single", func(b *testing.B) { c, err := pool.Next() @@ -268,9 +278,7 @@ func BenchmarkStatusConnectionPool(b *testing.B) { for i := 0; i < b.N; i++ { pool.Lock() - if err := pool.resurrect(c, true); err != nil { - b.Errorf("Unexpected error: %v", err) - } + pool.resurrect(c, true) pool.Unlock() } }) @@ -289,9 +297,7 @@ func BenchmarkStatusConnectionPool(b *testing.B) { for pb.Next() { pool.Lock() - if err := pool.resurrect(c, true); err != nil { - b.Errorf("Unexpected error: %v", err) - } + pool.resurrect(c, true) pool.Unlock() } }) @@ -311,9 +317,7 @@ func BenchmarkStatusConnectionPool(b *testing.B) { for pb.Next() { pool.Lock() - if err := pool.resurrect(c, true); err != nil { - b.Errorf("Unexpected error: %v", err) - } + pool.resurrect(c, true) pool.Unlock() } }) diff --git a/opensearchtransport/connection_integration_test.go b/opensearchtransport/connection_integration_test.go index 59e22c5c1..b657a55ce 100644 --- a/opensearchtransport/connection_integration_test.go +++ b/opensearchtransport/connection_integration_test.go @@ -24,7 +24,7 @@ // specific language governing permissions and limitations // under the License. -// +build integration +//go:build integration package opensearchtransport @@ -42,9 +42,6 @@ func NewServer(addr string, handler http.Handler) *http.Server { } func TestStatusConnectionPool(t *testing.T) { - defaultResurrectTimeoutInitial = time.Second - defer func() { defaultResurrectTimeoutInitial = 60 * time.Second }() - var ( server *http.Server servers []*http.Server @@ -88,6 +85,7 @@ func TestStatusConnectionPool(t *testing.T) { transport, _ := New(cfg) pool := transport.pool.(*statusConnectionPool) + pool.resurrectTimeoutInitial = time.Second for i := 1; i <= 9; i++ { req, _ := http.NewRequest("GET", "/", nil) diff --git a/opensearchtransport/connection_internal_test.go b/opensearchtransport/connection_internal_test.go index 3cb34b492..46c9a3170 100644 --- a/opensearchtransport/connection_internal_test.go +++ b/opensearchtransport/connection_internal_test.go @@ -24,7 +24,7 @@ // specific language governing permissions and limitations // under the License. -// +build !integration +//go:build !integration package opensearchtransport @@ -81,8 +81,8 @@ func TestStatusConnectionPoolNext(t *testing.T) { pool := &statusConnectionPool{ live: []*Connection{ - &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}}, - &Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}}, + {URL: &url.URL{Scheme: "http", Host: "foo1"}}, + {URL: &url.URL{Scheme: "http", Host: "foo2"}}, }, selector: &roundRobinSelector{curr: -1}, } @@ -107,9 +107,9 @@ func TestStatusConnectionPoolNext(t *testing.T) { t.Run("Three URLs", func(t *testing.T) { pool := &statusConnectionPool{ live: []*Connection{ - &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}}, - &Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}}, - &Connection{URL: &url.URL{Scheme: "http", Host: "foo3"}}, + {URL: &url.URL{Scheme: "http", Host: "foo1"}}, + {URL: &url.URL{Scheme: "http", Host: "foo2"}}, + {URL: &url.URL{Scheme: "http", Host: "foo3"}}, }, selector: &roundRobinSelector{curr: -1}, } @@ -117,7 +117,6 @@ func TestStatusConnectionPoolNext(t *testing.T) { var expected string for i := 0; i < 11; i++ { c, err := pool.Next() - if err != nil { t.Errorf("Unexpected error: %s", err) } @@ -143,8 +142,8 @@ func TestStatusConnectionPoolNext(t *testing.T) { pool := &statusConnectionPool{ live: []*Connection{}, dead: []*Connection{ - &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}, Failures: 3}, - &Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}, Failures: 1}, + {URL: &url.URL{Scheme: "http", Host: "foo1"}, Failures: 3}, + {URL: &url.URL{Scheme: "http", Host: "foo2"}, Failures: 1}, }, selector: &roundRobinSelector{curr: -1}, } @@ -180,16 +179,14 @@ func TestStatusConnectionPoolOnSuccess(t *testing.T) { t.Run("Move connection to live list and mark it as healthy", func(t *testing.T) { pool := &statusConnectionPool{ dead: []*Connection{ - &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}, Failures: 3, IsDead: true}, + {URL: &url.URL{Scheme: "http", Host: "foo1"}, Failures: 3, IsDead: true}, }, selector: &roundRobinSelector{curr: -1}, } conn := pool.dead[0] - if err := pool.OnSuccess(conn); err != nil { - t.Fatalf("Unexpected error: %s", err) - } + pool.OnSuccess(conn) if conn.IsDead { t.Errorf("Expected the connection to be live; %s", conn) @@ -213,12 +210,12 @@ func TestStatusConnectionPoolOnFailure(t *testing.T) { t.Run("Remove connection, mark it, and sort dead connections", func(t *testing.T) { pool := &statusConnectionPool{ live: []*Connection{ - &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}}, - &Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}}, + {URL: &url.URL{Scheme: "http", Host: "foo1"}}, + {URL: &url.URL{Scheme: "http", Host: "foo2"}}, }, dead: []*Connection{ - &Connection{URL: &url.URL{Scheme: "http", Host: "foo3"}, Failures: 0}, - &Connection{URL: &url.URL{Scheme: "http", Host: "foo4"}, Failures: 99}, + {URL: &url.URL{Scheme: "http", Host: "foo3"}, Failures: 0}, + {URL: &url.URL{Scheme: "http", Host: "foo4"}, Failures: 99}, }, selector: &roundRobinSelector{curr: -1}, } @@ -228,7 +225,7 @@ func TestStatusConnectionPoolOnFailure(t *testing.T) { if err := pool.OnFailure(conn); err != nil { t.Fatalf("Unexpected error: %s", err) } - + conn.Lock() if !conn.IsDead { t.Errorf("Expected the connection to be dead; %s", conn) } @@ -236,7 +233,10 @@ func TestStatusConnectionPoolOnFailure(t *testing.T) { if conn.DeadSince.IsZero() { t.Errorf("Unexpected value for DeadSince: %s", conn.DeadSince) } + conn.Unlock() + pool.Lock() + defer pool.Unlock() if len(pool.live) != 1 { t.Errorf("Expected 1 live connection, got: %d", len(pool.live)) } @@ -261,9 +261,9 @@ func TestStatusConnectionPoolOnFailure(t *testing.T) { t.Run("Short circuit when the connection is already dead", func(t *testing.T) { pool := &statusConnectionPool{ live: []*Connection{ - &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}}, - &Connection{URL: &url.URL{Scheme: "http", Host: "foo2"}}, - &Connection{URL: &url.URL{Scheme: "http", Host: "foo3"}}, + {URL: &url.URL{Scheme: "http", Host: "foo1"}}, + {URL: &url.URL{Scheme: "http", Host: "foo2"}}, + {URL: &url.URL{Scheme: "http", Host: "foo3"}}, }, selector: &roundRobinSelector{curr: -1}, } @@ -285,15 +285,14 @@ func TestStatusConnectionPoolResurrect(t *testing.T) { t.Run("Mark the connection as dead and add/remove it to the lists", func(t *testing.T) { pool := &statusConnectionPool{ live: []*Connection{}, - dead: []*Connection{&Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}, IsDead: true}}, + dead: []*Connection{{URL: &url.URL{Scheme: "http", Host: "foo1"}, IsDead: true}}, selector: &roundRobinSelector{curr: -1}, } conn := pool.dead[0] - - if err := pool.resurrect(conn, true); err != nil { - t.Fatalf("Unexpected error: %s", err) - } + conn.Lock() + defer conn.Unlock() + pool.resurrect(conn, true) if conn.IsDead { t.Errorf("Expected connection to be dead, got: %s", conn) @@ -310,15 +309,14 @@ func TestStatusConnectionPoolResurrect(t *testing.T) { t.Run("Short circuit removal when the connection is not in the dead list", func(t *testing.T) { pool := &statusConnectionPool{ - dead: []*Connection{&Connection{URL: &url.URL{Scheme: "http", Host: "bar"}, IsDead: true}}, + dead: []*Connection{{URL: &url.URL{Scheme: "http", Host: "bar"}, IsDead: true}}, selector: &roundRobinSelector{curr: -1}, } conn := &Connection{URL: &url.URL{Scheme: "http", Host: "foo1"}, IsDead: true} - - if err := pool.resurrect(conn, true); err != nil { - t.Fatalf("Unexpected error: %s", err) - } + conn.Lock() + defer conn.Unlock() + pool.resurrect(conn, true) if len(pool.live) != 1 { t.Errorf("Expected 1 live connection, got: %s", pool.live) @@ -330,20 +328,19 @@ func TestStatusConnectionPoolResurrect(t *testing.T) { }) t.Run("Schedule resurrect", func(t *testing.T) { - defaultResurrectTimeoutInitial = 0 - defer func() { defaultResurrectTimeoutInitial = 60 * time.Second }() - pool := &statusConnectionPool{ live: []*Connection{}, dead: []*Connection{ - &Connection{ + { URL: &url.URL{Scheme: "http", Host: "foo1"}, Failures: 100, IsDead: true, DeadSince: time.Now().UTC(), }, }, - selector: &roundRobinSelector{curr: -1}, + selector: &roundRobinSelector{curr: -1}, + resurrectTimeoutInitial: 0, + resurrectTimeoutFactorCutoff: defaultResurrectTimeoutFactorCutoff, } conn := pool.dead[0] @@ -375,7 +372,6 @@ func TestConnection(t *testing.T) { ` dead=true failures=10`, conn.String(), ) - if err != nil { t.Fatalf("Unexpected error: %s", err) } diff --git a/opensearchtransport/discovery.go b/opensearchtransport/discovery.go index 37aafbef2..f4463f3f2 100644 --- a/opensearchtransport/discovery.go +++ b/opensearchtransport/discovery.go @@ -27,25 +27,23 @@ package opensearchtransport import ( + "context" "encoding/json" "fmt" - "io/ioutil" + "io" "net/http" "net/url" - "sort" "strings" "sync" "time" ) // Discoverable defines the interface for transports supporting node discovery. -// type Discoverable interface { DiscoverNodes() error } // nodeInfo represents the information about node in a cluster. -// type nodeInfo struct { ID string Name string @@ -58,27 +56,22 @@ type nodeInfo struct { } // DiscoverNodes reloads the client connections by fetching information from the cluster. -// func (c *Client) DiscoverNodes() error { - var conns []*Connection + conns := make([]*Connection, 0) nodes, err := c.getNodesInfo() if err != nil { if debugLogger != nil { debugLogger.Logf("Error getting nodes info: %s\n", err) } - return fmt.Errorf("discovery: get nodes: %s", err) + + return fmt.Errorf("discovery: get nodes: %w", err) } for _, node := range nodes { - var ( - isClusterManagerOnlyNode bool - ) - - roles := append(node.Roles[:0:0], node.Roles...) - sort.Strings(roles) + var isClusterManagerOnlyNode bool - if len(roles) == 1 && (roles[0] == "master" || roles[0] == "cluster_manager") { + if len(node.Roles) == 1 && (node.Roles[0] == "master" || node.Roles[0] == "cluster_manager") { isClusterManagerOnlyNode = true } @@ -87,6 +80,7 @@ func (c *Client) DiscoverNodes() error { if isClusterManagerOnlyNode { skip = "; [SKIP]" } + debugLogger.Logf("Discovered node [%s]; %s; roles=%s%s\n", node.Name, node.URL, node.Roles, skip) } @@ -117,24 +111,18 @@ func (c *Client) DiscoverNodes() error { c.pool = c.poolFunc(conns, c.selector) } else { // TODO: Replace only live connections, leave dead scheduled for resurrect? - c.pool, err = NewConnectionPool(conns, c.selector) - if err != nil { - return err - } + c.pool = NewConnectionPool(conns, c.selector) } return nil } func (c *Client) getNodesInfo() ([]nodeInfo, error) { - var ( - out []nodeInfo - scheme = c.urls[0].Scheme - ) + scheme := c.urls[0].Scheme - req, err := http.NewRequest("GET", "/_nodes/http", nil) + req, err := http.NewRequestWithContext(context.TODO(), http.MethodGet, "/_nodes/http", nil) if err != nil { - return out, err + return nil, err } c.Lock() @@ -142,7 +130,7 @@ func (c *Client) getNodesInfo() ([]nodeInfo, error) { c.Unlock() // TODO: If no connection is returned, fallback to original URLs if err != nil { - return out, err + return nil, err } c.setReqURL(conn.URL, req) @@ -151,39 +139,43 @@ func (c *Client) getNodesInfo() ([]nodeInfo, error) { res, err := c.transport.RoundTrip(req) if err != nil { - return out, err + return nil, err } defer res.Body.Close() - if res.StatusCode > 200 { - body, _ := ioutil.ReadAll(res.Body) - return out, fmt.Errorf("server error: %s: %s", res.Status, body) + if res.StatusCode > http.StatusOK { + body, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("server error: %s: %w", res.Status, err) + } + return nil, fmt.Errorf("server error: %s: %s", res.Status, body) } var env map[string]json.RawMessage if err := json.NewDecoder(res.Body).Decode(&env); err != nil { - return out, err + return nil, err } var nodes map[string]nodeInfo if err := json.Unmarshal(env["nodes"], &nodes); err != nil { - return out, err + return nil, err } + out := make([]nodeInfo, len(nodes)) + idx := 0 + for id, node := range nodes { node.ID = id - u, err := c.getNodeURL(node, scheme) - if err != nil { - return out, err - } + u := c.getNodeURL(node, scheme) node.URL = u - out = append(out, node) + out[idx] = node + idx++ } return out, nil } -func (c *Client) getNodeURL(node nodeInfo, scheme string) (*url.URL, error) { +func (c *Client) getNodeURL(node nodeInfo, scheme string) *url.URL { var ( host string port string @@ -197,25 +189,28 @@ func (c *Client) getNodeURL(node nodeInfo, scheme string) (*url.URL, error) { } else { host = strings.Split(addrs[0], ":")[0] } - port = ports[len(ports)-1] + port = ports[len(ports)-1] u := &url.URL{ Scheme: scheme, Host: host + ":" + port, } - return u, nil + return u } -func (c *Client) scheduleDiscoverNodes(d time.Duration) { +func (c *Client) scheduleDiscoverNodes() { + //nolint:errcheck // errors are logged inside the function go c.DiscoverNodes() c.Lock() defer c.Unlock() + if c.discoverNodesTimer != nil { c.discoverNodesTimer.Stop() } + c.discoverNodesTimer = time.AfterFunc(c.discoverNodesInterval, func() { - c.scheduleDiscoverNodes(c.discoverNodesInterval) + c.scheduleDiscoverNodes() }) } diff --git a/opensearchtransport/discovery_internal_test.go b/opensearchtransport/discovery_internal_test.go index 8218976a0..41ba462e0 100644 --- a/opensearchtransport/discovery_internal_test.go +++ b/opensearchtransport/discovery_internal_test.go @@ -24,7 +24,7 @@ // specific language governing permissions and limitations // under the License. -// +build !integration +//go:build !integration package opensearchtransport @@ -34,7 +34,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "net/url" "os" @@ -47,14 +46,14 @@ func TestDiscovery(t *testing.T) { defaultHandler := func(w http.ResponseWriter, r *http.Request) { f, err := os.Open("testdata/nodes.info.json") if err != nil { - http.Error(w, fmt.Sprintf("Fixture error: %s", err), 500) + http.Error(w, fmt.Sprintf("Fixture error: %s", err), http.StatusInternalServerError) return } io.Copy(w, f) } - srv := &http.Server{Addr: "localhost:10001", Handler: http.HandlerFunc(defaultHandler)} - srvTLS := &http.Server{Addr: "localhost:12001", Handler: http.HandlerFunc(defaultHandler)} + srv := &http.Server{Addr: "localhost:10001", Handler: http.HandlerFunc(defaultHandler), ReadTimeout: 1 * time.Second} + srvTLS := &http.Server{Addr: "localhost:12001", Handler: http.HandlerFunc(defaultHandler), ReadTimeout: 1 * time.Second} go func() { if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { @@ -485,12 +484,10 @@ func TestDiscovery(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - var names []string var urls []*url.URL - for name, node := range tt.args.Nodes { + for _, node := range tt.args.Nodes { u, _ := url.Parse(node.URL) urls = append(urls, u) - names = append(names, name) } newRoundTripper := func() http.RoundTripper { @@ -505,11 +502,11 @@ func TestDiscovery(t *testing.T) { b, _ := json.Marshal(nodes) return &http.Response{ - Status: "200 OK", - StatusCode: 200, + Status: fmt.Sprintf("%d %s", http.StatusOK, http.StatusText(http.StatusOK)), + StatusCode: http.StatusOK, ContentLength: int64(len(b)), Header: http.Header(map[string][]string{"Content-Type": {"application/json"}}), - Body: ioutil.NopCloser(bytes.NewReader(b)), + Body: io.NopCloser(bytes.NewReader(b)), }, nil }, } diff --git a/opensearchtransport/doc.go b/opensearchtransport/doc.go index abefe8614..721e27d5a 100644 --- a/opensearchtransport/doc.go +++ b/opensearchtransport/doc.go @@ -37,7 +37,7 @@ response status codes (by default 502, 503, 504). Use the RetryOnStatus option t The transport will not retry a timeout network error, unless enabled by setting EnableRetryOnTimeout to true. Use the MaxRetries option to configure the number of retries, and set DisableRetry to true -to disable the retry behaviour altogether. +to disable the retry behavior altogether. By default, the retry will be performed without any delay; to configure a backoff interval, implement the RetryBackoff option function; see an example in the package unit tests for information. @@ -45,7 +45,7 @@ implement the RetryBackoff option function; see an example in the package unit t When multiple addresses are passed in configuration, the package will use them in a round-robin fashion, and will keep track of live and dead nodes. The status of dead nodes is checked periodically. -To customize the node selection behaviour, provide a Selector implementation in the configuration. +To customize the node selection behavior, provide a Selector implementation in the configuration. To replace the connection pool entirely, provide a custom ConnectionPool implementation via the ConnectionPoolFunc option. diff --git a/opensearchtransport/logger.go b/opensearchtransport/logger.go index f089e9e63..f7994e9d3 100644 --- a/opensearchtransport/logger.go +++ b/opensearchtransport/logger.go @@ -32,7 +32,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "net/url" "strconv" @@ -43,7 +42,6 @@ import ( var debugLogger DebuggingLogger // Logger defines an interface for logging request and response. -// type Logger interface { // LogRoundTrip should not modify the request or response, except for consuming and closing the body. // Implementations have to check for nil values in request and response. @@ -55,14 +53,12 @@ type Logger interface { } // DebuggingLogger defines the interface for a debugging logger. -// type DebuggingLogger interface { Log(a ...interface{}) error Logf(format string, a ...interface{}) error } // TextLogger prints the log message in plain text. -// type TextLogger struct { Output io.Writer EnableRequestBody bool @@ -70,7 +66,6 @@ type TextLogger struct { } // ColorLogger prints the log message in a terminal-optimized plain text. -// type ColorLogger struct { Output io.Writer EnableRequestBody bool @@ -78,7 +73,6 @@ type ColorLogger struct { } // CurlLogger prints the log message as a runnable curl command. -// type CurlLogger struct { Output io.Writer EnableRequestBody bool @@ -86,7 +80,6 @@ type CurlLogger struct { } // JSONLogger prints the log message as JSON. -// type JSONLogger struct { Output io.Writer EnableRequestBody bool @@ -94,13 +87,11 @@ type JSONLogger struct { } // debuggingLogger prints debug messages as plain text. -// type debuggingLogger struct { Output io.Writer } // LogRoundTrip prints the information about request and response. -// func (l *TextLogger) LogRoundTrip(req *http.Request, res *http.Response, err error, start time.Time, dur time.Duration) error { fmt.Fprintf(l.Output, "%s %s %s [status:%d request:%s]\n", start.Format(time.RFC3339), @@ -138,8 +129,7 @@ func (l *TextLogger) RequestBodyEnabled() bool { return l.EnableRequestBody } func (l *TextLogger) ResponseBodyEnabled() bool { return l.EnableResponseBody } // LogRoundTrip prints the information about request and response. -// -func (l *ColorLogger) LogRoundTrip(req *http.Request, res *http.Response, err error, start time.Time, dur time.Duration) error { +func (l *ColorLogger) LogRoundTrip(req *http.Request, res *http.Response, err error, _ time.Time, dur time.Duration) error { query, _ := url.QueryUnescape(req.URL.RawQuery) if query != "" { query = "?" + query @@ -213,8 +203,7 @@ func (l *ColorLogger) RequestBodyEnabled() bool { return l.EnableRequestBody } func (l *ColorLogger) ResponseBodyEnabled() bool { return l.EnableResponseBody } // LogRoundTrip prints the information about request and response. -// -func (l *CurlLogger) LogRoundTrip(req *http.Request, res *http.Response, err error, start time.Time, dur time.Duration) error { +func (l *CurlLogger) LogRoundTrip(req *http.Request, res *http.Response, _ error, start time.Time, dur time.Duration) error { var b bytes.Buffer var query string @@ -232,7 +221,7 @@ func (l *CurlLogger) LogRoundTrip(req *http.Request, res *http.Response, err err } b.WriteString(`curl`) - if req.Method == "HEAD" { + if req.Method == http.MethodHead { b.WriteString(" --head") } else { fmt.Fprintf(&b, " -X %s", req.Method) @@ -277,8 +266,7 @@ func (l *CurlLogger) LogRoundTrip(req *http.Request, res *http.Response, err err b.WriteRune('\n') - var status string - status = res.Status + status := res.Status fmt.Fprintf(&b, "# => %s [%s] %s\n", start.UTC().Format(time.RFC3339), status, dur.Truncate(time.Millisecond)) if l.ResponseBodyEnabled() && res != nil && res.Body != nil && res.Body != http.NoBody { @@ -307,13 +295,12 @@ func (l *CurlLogger) RequestBodyEnabled() bool { return l.EnableRequestBody } func (l *CurlLogger) ResponseBodyEnabled() bool { return l.EnableResponseBody } // LogRoundTrip prints the information about request and response. -// func (l *JSONLogger) LogRoundTrip(req *http.Request, res *http.Response, err error, start time.Time, dur time.Duration) error { // TODO: Research performance optimization of using sync.Pool bsize := 200 - var b = bytes.NewBuffer(make([]byte, 0, bsize)) - var v = make([]byte, 0, bsize) + b := bytes.NewBuffer(make([]byte, 0, bsize)) + v := make([]byte, 0, bsize) appendTime := func(t time.Time) { v = v[:0] @@ -333,8 +320,6 @@ func (l *JSONLogger) LogRoundTrip(req *http.Request, res *http.Response, err err b.Write(v) } - port := req.URL.Port() - b.WriteRune('{') // -- Timestamp b.WriteString(`"@timestamp":"`) @@ -351,7 +336,7 @@ func (l *JSONLogger) LogRoundTrip(req *http.Request, res *http.Response, err err appendQuote(req.URL.Scheme) b.WriteString(`,"domain":`) appendQuote(req.URL.Hostname()) - if port != "" { + if port := req.URL.Port(); port != "" { b.WriteString(`,"port":`) b.WriteString(port) } @@ -415,14 +400,12 @@ func (l *JSONLogger) RequestBodyEnabled() bool { return l.EnableRequestBody } func (l *JSONLogger) ResponseBodyEnabled() bool { return l.EnableResponseBody } // Log prints the arguments to output in default format. -// func (l *debuggingLogger) Log(a ...interface{}) error { _, err := fmt.Fprint(l.Output, a...) return err } // Logf prints formats the arguments and prints them to output. -// func (l *debuggingLogger) Logf(format string, a ...interface{}) error { _, err := fmt.Fprintf(l.Output, format, a...) return err @@ -444,13 +427,14 @@ func duplicateBody(body io.ReadCloser) (io.ReadCloser, io.ReadCloser, error) { b2 bytes.Buffer tr = io.TeeReader(body, &b2) ) - _, err := b1.ReadFrom(tr) - if err != nil { - return ioutil.NopCloser(io.MultiReader(&b1, errorReader{err: err})), ioutil.NopCloser(io.MultiReader(&b2, errorReader{err: err})), err + + if _, err := b1.ReadFrom(tr); err != nil { + return io.NopCloser(io.MultiReader(&b1, errorReader{err: err})), io.NopCloser(io.MultiReader(&b2, errorReader{err: err})), err } + defer func() { body.Close() }() - return ioutil.NopCloser(&b1), ioutil.NopCloser(&b2), nil + return io.NopCloser(&b1), io.NopCloser(&b2), nil } func resStatusCode(res *http.Response) int { @@ -462,4 +446,4 @@ func resStatusCode(res *http.Response) int { type errorReader struct{ err error } -func (r errorReader) Read(p []byte) (int, error) { return 0, r.err } +func (r errorReader) Read(_ []byte) (int, error) { return 0, r.err } diff --git a/opensearchtransport/logger_benchmark_test.go b/opensearchtransport/logger_benchmark_test.go index 3ed31116d..6e176a1c2 100644 --- a/opensearchtransport/logger_benchmark_test.go +++ b/opensearchtransport/logger_benchmark_test.go @@ -24,13 +24,13 @@ // specific language governing permissions and limitations // under the License. -// +build !integration +//go:build !integration package opensearchtransport_test import ( "bytes" - "io/ioutil" + "io" "net/http" "net/url" "testing" @@ -46,14 +46,17 @@ func BenchmarkTransportLogger(b *testing.B) { tp, _ := opensearchtransport.New(opensearchtransport.Config{ URLs: []*url.URL{{Scheme: "http", Host: "foo"}}, Transport: newFakeTransport(b), - Logger: &opensearchtransport.TextLogger{Output: ioutil.Discard}, + Logger: &opensearchtransport.TextLogger{Output: io.Discard}, }) - req, _ := http.NewRequest("GET", "/abc", nil) - _, err := tp.Perform(req) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + resp, err := tp.Perform(req) if err != nil { b.Fatalf("Unexpected error: %s", err) } + if resp.Body != nil { + resp.Body.Close() + } } }) @@ -62,20 +65,25 @@ func BenchmarkTransportLogger(b *testing.B) { tp, _ := opensearchtransport.New(opensearchtransport.Config{ URLs: []*url.URL{{Scheme: "http", Host: "foo"}}, Transport: newFakeTransport(b), - Logger: &opensearchtransport.TextLogger{Output: ioutil.Discard, EnableRequestBody: true, EnableResponseBody: true}, + Logger: &opensearchtransport.TextLogger{ + Output: io.Discard, + EnableRequestBody: true, + EnableResponseBody: true, + }, }) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) res, err := tp.Perform(req) if err != nil { b.Fatalf("Unexpected error: %s", err) } + res.Body.Close() - body, err := ioutil.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) if err != nil { b.Fatalf("Error reading response body: %s", err) } - res.Body = ioutil.NopCloser(bytes.NewBuffer(body)) + res.Body = io.NopCloser(bytes.NewBuffer(body)) if len(body) < 13 { b.Errorf("Error reading response body bytes, want=13, got=%d", len(body)) } @@ -87,14 +95,17 @@ func BenchmarkTransportLogger(b *testing.B) { tp, _ := opensearchtransport.New(opensearchtransport.Config{ URLs: []*url.URL{{Scheme: "http", Host: "foo"}}, Transport: newFakeTransport(b), - Logger: &opensearchtransport.JSONLogger{Output: ioutil.Discard}, + Logger: &opensearchtransport.JSONLogger{Output: io.Discard}, }) - req, _ := http.NewRequest("GET", "/abc", nil) - _, err := tp.Perform(req) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + resp, err := tp.Perform(req) if err != nil { b.Fatalf("Unexpected error: %s", err) } + if resp != nil && resp.Body != nil { + resp.Body.Close() + } } }) @@ -103,14 +114,21 @@ func BenchmarkTransportLogger(b *testing.B) { tp, _ := opensearchtransport.New(opensearchtransport.Config{ URLs: []*url.URL{{Scheme: "http", Host: "foo"}}, Transport: newFakeTransport(b), - Logger: &opensearchtransport.JSONLogger{Output: ioutil.Discard, EnableRequestBody: true, EnableResponseBody: true}, + Logger: &opensearchtransport.JSONLogger{ + Output: io.Discard, + EnableRequestBody: true, + EnableResponseBody: true, + }, }) - req, _ := http.NewRequest("GET", "/abc", nil) - _, err := tp.Perform(req) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + resp, err := tp.Perform(req) if err != nil { b.Fatalf("Unexpected error: %s", err) } + if resp != nil && resp.Body != nil { + resp.Body.Close() + } } }) } diff --git a/opensearchtransport/logger_internal_test.go b/opensearchtransport/logger_internal_test.go index 27beae7b7..f862c8d35 100644 --- a/opensearchtransport/logger_internal_test.go +++ b/opensearchtransport/logger_internal_test.go @@ -24,7 +24,7 @@ // specific language governing permissions and limitations // under the License. -// +build !integration +//go:build !integration package opensearchtransport @@ -33,7 +33,6 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net/http" "net/url" "os" @@ -54,11 +53,11 @@ func TestTransportLogger(t *testing.T) { return &mockTransp{ RoundTripFunc: func(req *http.Request) (*http.Response, error) { return &http.Response{ - Status: "200 OK", - StatusCode: 200, + Status: fmt.Sprintf("%d %s", http.StatusOK, http.StatusText(http.StatusOK)), + StatusCode: http.StatusOK, ContentLength: 13, Header: http.Header(map[string][]string{"Content-Type": {"application/json"}}), - Body: ioutil.NopCloser(strings.NewReader(`{"foo":"bar"}`)), + Body: io.NopCloser(strings.NewReader(`{"foo":"bar"}`)), }, nil }, } @@ -70,7 +69,7 @@ func TestTransportLogger(t *testing.T) { tp, _ := New(Config{ URLs: []*url.URL{{Scheme: "http", Host: "foo"}}, Transport: newRoundTripper(), - // Logger: ioutil.Discard, + // Logger: io.Discard, }) for i := 0; i < 100; i++ { @@ -78,12 +77,13 @@ func TestTransportLogger(t *testing.T) { go func() { defer wg.Done() - req, _ := http.NewRequest("GET", "/abc", nil) - _, err := tp.Perform(req) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + resp, err := tp.Perform(req) if err != nil { t.Errorf("Unexpected error: %s", err) return } + defer resp.Body.Close() }() } wg.Wait() @@ -96,11 +96,12 @@ func TestTransportLogger(t *testing.T) { Logger: nil, }) - req, _ := http.NewRequest("GET", "/abc", nil) - _, err := tp.Perform(req) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + resp, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) } + defer resp.Body.Close() }) t.Run("No HTTP response", func(t *testing.T) { @@ -111,15 +112,16 @@ func TestTransportLogger(t *testing.T) { return nil, errors.New("Mock error") }, }, - Logger: &TextLogger{Output: ioutil.Discard}, + Logger: &TextLogger{Output: io.Discard}, }) - req, _ := http.NewRequest("GET", "/abc", nil) - res, err := tp.Perform(req) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + resp, err := tp.Perform(req) if err == nil { + defer resp.Body.Close() t.Errorf("Expected error: %v", err) } - if res != nil { + if resp != nil { t.Errorf("Expected nil response, got: %v", err) } }) @@ -133,15 +135,16 @@ func TestTransportLogger(t *testing.T) { Logger: &TextLogger{Output: &dst, EnableRequestBody: true, EnableResponseBody: true}, }) - req, _ := http.NewRequest("GET", "/abc?q=a,b", nil) - req.Body = ioutil.NopCloser(strings.NewReader(`{"query":"42"}`)) + req, _ := http.NewRequest(http.MethodGet, "/abc?q=a,b", nil) + req.Body = io.NopCloser(strings.NewReader(`{"query":"42"}`)) res, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) } + defer res.Body.Close() - body, err := ioutil.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) if err != nil { t.Fatalf("Error reading response body: %s", err) } @@ -164,15 +167,16 @@ func TestTransportLogger(t *testing.T) { Logger: &TextLogger{Output: &dst, EnableRequestBody: true, EnableResponseBody: true}, }) - req, _ := http.NewRequest("GET", "/abc?q=a,b", nil) - req.Body = ioutil.NopCloser(strings.NewReader(`{"query":"42"}`)) + req, _ := http.NewRequest(http.MethodGet, "/abc?q=a,b", nil) + req.Body = io.NopCloser(strings.NewReader(`{"query":"42"}`)) res, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) } + defer res.Body.Close() - _, err = ioutil.ReadAll(res.Body) + _, err = io.ReadAll(res.Body) if err != nil { t.Fatalf("Error reading response body: %s", err) } @@ -209,15 +213,16 @@ func TestTransportLogger(t *testing.T) { Logger: &ColorLogger{Output: &dst, EnableRequestBody: true, EnableResponseBody: true}, }) - req, _ := http.NewRequest("GET", "/abc?q=a,b", nil) - req.Body = ioutil.NopCloser(strings.NewReader(`{"query":"42"}`)) + req, _ := http.NewRequest(http.MethodGet, "/abc?q=a,b", nil) + req.Body = io.NopCloser(strings.NewReader(`{"query":"42"}`)) res, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) } + defer res.Body.Close() - _, err = ioutil.ReadAll(res.Body) + _, err = io.ReadAll(res.Body) if err != nil { t.Fatalf("Error reading response body: %s", err) } @@ -263,15 +268,16 @@ func TestTransportLogger(t *testing.T) { Logger: &CurlLogger{Output: &dst, EnableRequestBody: true, EnableResponseBody: true}, }) - req, _ := http.NewRequest("GET", "/abc?q=a,b", nil) - req.Body = ioutil.NopCloser(strings.NewReader(`{"query":"42"}`)) + req, _ := http.NewRequest(http.MethodGet, "/abc?q=a,b", nil) + req.Body = io.NopCloser(strings.NewReader(`{"query":"42"}`)) res, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) } + defer res.Body.Close() - _, err = ioutil.ReadAll(res.Body) + _, err = io.ReadAll(res.Body) if err != nil { t.Fatalf("Error reading response body: %s", err) } @@ -299,12 +305,13 @@ func TestTransportLogger(t *testing.T) { Logger: &JSONLogger{Output: &dst}, }) - req, _ := http.NewRequest("GET", "/abc?q=a,b", nil) - req.Body = ioutil.NopCloser(strings.NewReader(`{"query":"42"}`)) - _, err := tp.Perform(req) + req, _ := http.NewRequest(http.MethodGet, "/abc?q=a,b", nil) + req.Body = io.NopCloser(strings.NewReader(`{"query":"42"}`)) + resp, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) } + defer resp.Body.Close() output := dst.String() output = strings.TrimSuffix(output, "\n") @@ -336,15 +343,16 @@ func TestTransportLogger(t *testing.T) { Logger: &JSONLogger{Output: &dst, EnableRequestBody: true}, }) - req, _ := http.NewRequest("GET", "/abc?q=a,b", nil) - req.Body = ioutil.NopCloser(strings.NewReader(`{"query":"42"}`)) + req, _ := http.NewRequest(http.MethodGet, "/abc?q=a,b", nil) + req.Body = io.NopCloser(strings.NewReader(`{"query":"42"}`)) res, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) } + defer res.Body.Close() - _, err = ioutil.ReadAll(res.Body) + _, err = io.ReadAll(res.Body) if err != nil { t.Fatalf("Error reading response body: %s", err) } @@ -379,13 +387,14 @@ func TestTransportLogger(t *testing.T) { Logger: &CustomLogger{Output: &dst}, }) - req, _ := http.NewRequest("GET", "/abc?q=a,b", nil) - req.Body = ioutil.NopCloser(strings.NewReader(`{"query":"42"}`)) + req, _ := http.NewRequest(http.MethodGet, "/abc?q=a,b", nil) + req.Body = io.NopCloser(strings.NewReader(`{"query":"42"}`)) - _, err := tp.Perform(req) + res, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) } + defer res.Body.Close() if !strings.HasPrefix(dst.String(), "GET http://foo/abc?q=a,b") { t.Errorf("Unexpected output: %s", dst.String()) @@ -403,13 +412,13 @@ func TestTransportLogger(t *testing.T) { t.Errorf("Expected input to be closed: %#v", input) } - read, _ := ioutil.ReadAll(&input) + read, _ := io.ReadAll(&input) if len(read) > 0 { t.Errorf("Expected input to be drained: %#v", input.content) } - b1r, _ := ioutil.ReadAll(b1) - b2r, _ := ioutil.ReadAll(b2) + b1r, _ := io.ReadAll(b1) + b2r, _ := io.ReadAll(b2) if len(b1r) != 6 || len(b2r) != 6 { t.Errorf( "Unexpected duplicate content, b1=%q (%db), b2=%q (%db)", @@ -429,17 +438,17 @@ func TestTransportLogger(t *testing.T) { t.Errorf("Unexpected error value, expected [ERROR MOCK], got [%s]", err.Error()) } - read, _ := ioutil.ReadAll(&input) + read, _ := io.ReadAll(&input) if string(read) != "BAR" { t.Errorf("Unexpected undrained part: %q", read) } - b2r, _ := ioutil.ReadAll(b2) + b2r, _ := io.ReadAll(b2) if string(b2r) != "FOO" { t.Errorf("Unexpected value, b2=%q", string(b2r)) } - b1c, err := ioutil.ReadAll(b1) + b1c, err := io.ReadAll(b1) if string(b1c) != "FOO" { t.Errorf("Unexpected value, b1=%q", string(b1c)) } @@ -453,7 +462,7 @@ func TestTransportLogger(t *testing.T) { } func TestDebuggingLogger(t *testing.T) { - logger := &debuggingLogger{Output: ioutil.Discard} + logger := &debuggingLogger{Output: io.Discard} t.Run("Log", func(t *testing.T) { if err := logger.Log("Foo"); err != nil { @@ -474,9 +483,9 @@ type CustomLogger struct { func (l *CustomLogger) LogRoundTrip( req *http.Request, res *http.Response, - err error, - start time.Time, - dur time.Duration, + _ error, + _ time.Time, + _ time.Duration, ) error { fmt.Fprintln(l.Output, req.Method, req.URL, "->", res.Status) return nil diff --git a/opensearchtransport/metrics.go b/opensearchtransport/metrics.go index caef13e18..7a8252c61 100644 --- a/opensearchtransport/metrics.go +++ b/opensearchtransport/metrics.go @@ -36,19 +36,16 @@ import ( ) // Measurable defines the interface for transports supporting metrics. -// type Measurable interface { Metrics() (Metrics, error) } // connectionable defines the interface for transports returning a list of connections. -// type connectionable interface { connections() []*Connection } // Metrics represents the transport metrics. -// type Metrics struct { Requests int `json:"requests"` Failures int `json:"failures"` @@ -58,7 +55,6 @@ type Metrics struct { } // ConnectionMetric represents metric information for a connection. -// type ConnectionMetric struct { URL string `json:"url"` Failures int `json:"failures,omitempty"` @@ -73,19 +69,15 @@ type ConnectionMetric struct { } // metrics represents the inner state of metrics. -// type metrics struct { sync.RWMutex requests int failures int responses map[int]int - - connections []*Connection } // Metrics returns the transport metrics. -// func (c *Client) Metrics() (Metrics, error) { if c.metrics == nil { return Metrics{}, errors.New("transport metrics not enabled") @@ -139,7 +131,6 @@ func (c *Client) Metrics() (Metrics, error) { } // String returns the metrics as a string. -// func (m Metrics) String() string { var ( i int @@ -175,7 +166,6 @@ func (m Metrics) String() string { if i+1 < len(m.Connections) { b.WriteString(", ") } - i++ } b.WriteString("]") @@ -184,7 +174,6 @@ func (m Metrics) String() string { } // String returns the connection information as a string. -// func (cm ConnectionMetric) String() string { var b strings.Builder b.WriteString("{") diff --git a/opensearchtransport/metrics_internal_test.go b/opensearchtransport/metrics_internal_test.go index 609701562..d0265e579 100644 --- a/opensearchtransport/metrics_internal_test.go +++ b/opensearchtransport/metrics_internal_test.go @@ -24,7 +24,7 @@ // specific language governing permissions and limitations // under the License. -// +build !integration +//go:build !integration package opensearchtransport @@ -56,8 +56,11 @@ func TestMetrics(t *testing.T) { tp.metrics.responses[200] = 1 tp.metrics.responses[404] = 2 - req, _ := http.NewRequest("HEAD", "/", nil) - tp.Perform(req) + req, _ := http.NewRequest(http.MethodHead, "/", nil) + resp, err := tp.Perform(req) + if err == nil { + defer resp.Body.Close() + } m, err := tp.Metrics() if err != nil { diff --git a/opensearchtransport/opensearchtransport.go b/opensearchtransport/opensearchtransport.go index fe8e450f4..199d530b5 100644 --- a/opensearchtransport/opensearchtransport.go +++ b/opensearchtransport/opensearchtransport.go @@ -33,46 +33,27 @@ import ( "errors" "fmt" "io" - "io/ioutil" "net" "net/http" "net/url" "os" "regexp" "runtime" - "strconv" "strings" "sync" "time" - "github.com/opensearch-project/opensearch-go/v2/signer" - "github.com/opensearch-project/opensearch-go/v2/internal/version" + "github.com/opensearch-project/opensearch-go/v2/signer" ) const ( // Version returns the package version as a string. - Version = version.Client - - // esCompatHeader defines the env var for Compatibility header. - esCompatHeader = "ELASTIC_CLIENT_APIVERSIONING" -) - -var ( - userAgent string - compatibilityHeader bool - reGoVersion = regexp.MustCompile(`go(\d+\.\d+\..+)`) - - defaultMaxRetries = 3 - defaultRetryOnStatus = [...]int{502, 503, 504} + Version = version.Client + defaultMaxRetries = 3 ) -func init() { - userAgent = initUserAgent() - - compatHeaderEnv := os.Getenv(esCompatHeader) - compatibilityHeader, _ = strconv.ParseBool(compatHeaderEnv) -} +var reGoVersion = regexp.MustCompile(`go(\d+\.\d+\..+)`) // Interface defines the interface for HTTP client. type Interface interface { @@ -114,10 +95,11 @@ type Config struct { type Client struct { sync.Mutex - urls []*url.URL - username string - password string - header http.Header + urls []*url.URL + username string + password string + header http.Header + userAgent string signer signer.Signer @@ -164,17 +146,17 @@ func New(cfg Config) (*Client, error) { cfg.Transport = httpTransport } - if len(cfg.RetryOnStatus) == 0 { - cfg.RetryOnStatus = defaultRetryOnStatus[:] + if len(cfg.RetryOnStatus) == 0 && cfg.RetryOnStatus == nil { + cfg.RetryOnStatus = []int{502, 503, 504} } if cfg.MaxRetries == 0 { cfg.MaxRetries = defaultMaxRetries } - var conns []*Connection - for _, u := range cfg.URLs { - conns = append(conns, &Connection{URL: u}) + conns := make([]*Connection, len(cfg.URLs)) + for idx, u := range cfg.URLs { + conns[idx] = &Connection{URL: u} } client := Client{ @@ -200,10 +182,12 @@ func New(cfg Config) (*Client, error) { poolFunc: cfg.ConnectionPoolFunc, } + client.userAgent = initUserAgent() + if client.poolFunc != nil { client.pool = client.poolFunc(conns, client.selector) } else { - client.pool, _ = NewConnectionPool(conns, client.selector) + client.pool = NewConnectionPool(conns, client.selector) } if cfg.EnableDebugLogger { @@ -223,7 +207,7 @@ func New(cfg Config) (*Client, error) { if client.discoverNodesInterval > 0 { time.AfterFunc(client.discoverNodesInterval, func() { - client.scheduleDiscoverNodes(client.discoverNodesInterval) + client.scheduleDiscoverNodes() }) } @@ -237,14 +221,6 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { err error ) - // Compatibility Header - if compatibilityHeader { - if req.Body != nil { - req.Header.Set("Content-Type", "application/vnd.elasticsearch+json;compatible-with=7") - } - req.Header.Set("Accept", "application/vnd.elasticsearch+json;compatible-with=7") - } - // Record metrics, when enabled if c.metrics != nil { c.metrics.Lock() @@ -261,30 +237,31 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { var buf bytes.Buffer zw := gzip.NewWriter(&buf) if _, err := io.Copy(zw, req.Body); err != nil { - return nil, fmt.Errorf("failed to compress request body: %s", err) + return nil, fmt.Errorf("failed to compress request body: %w", err) } if err := zw.Close(); err != nil { - return nil, fmt.Errorf("failed to compress request body (during close): %s", err) + return nil, fmt.Errorf("failed to compress request body (during close): %w", err) } req.GetBody = func() (io.ReadCloser, error) { r := buf - return ioutil.NopCloser(&r), nil + return io.NopCloser(&r), nil } + //nolint:errcheck // error is always nil req.Body, _ = req.GetBody() req.Header.Set("Content-Encoding", "gzip") req.ContentLength = int64(buf.Len()) - } else if req.GetBody == nil { if !c.disableRetry || (c.logger != nil && c.logger.RequestBodyEnabled()) { var buf bytes.Buffer + //nolint:errcheck // ignored as this is only for logging buf.ReadFrom(req.Body) - req.GetBody = func() (io.ReadCloser, error) { r := buf - return ioutil.NopCloser(&r), nil + return io.NopCloser(&r), nil } + //nolint:errcheck // error is always nil req.Body, _ = req.GetBody() } } @@ -305,7 +282,7 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { if c.logger != nil { c.logRoundTrip(req, nil, err, time.Time{}, time.Duration(0)) } - return nil, fmt.Errorf("cannot get connection: %s", err) + return nil, fmt.Errorf("cannot get connection: %w", err) } // Update request @@ -313,13 +290,13 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { c.setReqAuth(conn.URL, req) if err = c.signRequest(req); err != nil { - return nil, fmt.Errorf("failed to sign request: %s", err) + return nil, fmt.Errorf("failed to sign request: %w", err) } if !c.disableRetry && i > 0 && req.Body != nil && req.Body != http.NoBody { body, err := req.GetBody() if err != nil { - return nil, fmt.Errorf("cannot get request body: %s", err) + return nil, fmt.Errorf("cannot get request body: %w", err) } req.Body = body } @@ -332,6 +309,7 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { // Log request and response if c.logger != nil { if c.logger.RequestBodyEnabled() && req.Body != nil && req.Body != http.NoBody { + //nolint:errcheck // ignored as this is only for logging req.Body, _ = req.GetBody() } c.logRoundTrip(req, res, err, start, dur) @@ -347,17 +325,19 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { // Report the connection as unsuccessful c.Lock() + //nolint:errcheck // Questionable if the function even returns an error c.pool.OnFailure(conn) c.Unlock() // Retry on EOF errors - if err == io.EOF { + if errors.Is(err, io.EOF) { shouldRetry = true } // Retry on network errors, but not on timeout errors, unless configured - if err, ok := err.(net.Error); ok { - if (!err.Timeout() || c.enableRetryOnTimeout) && !c.disableRetry { + var netError net.Error + if errors.As(err, &netError) { + if (!netError.Timeout() || c.enableRetryOnTimeout) && !c.disableRetry { shouldRetry = true } } @@ -392,7 +372,8 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { // Drain and close body when retrying after response if shouldCloseBody && i < c.maxRetries { if res.Body != nil { - io.Copy(ioutil.Discard, res.Body) + //nolint:errcheck // undexpected but okay if it failes + io.Copy(io.Discard, res.Body) res.Body.Close() } } @@ -402,7 +383,7 @@ func (c *Client) Perform(req *http.Request) (*http.Response, error) { time.Sleep(c.retryBackoff(i + 1)) } } - // Read, close and replace the http reponse body to close the connection + // Read, close and replace the http response body to close the connection if res != nil && res.Body != nil { body, err := io.ReadAll(res.Body) res.Body.Close() @@ -420,7 +401,7 @@ func (c *Client) URLs() []*url.URL { return c.pool.URLs() } -func (c *Client) setReqURL(u *url.URL, req *http.Request) *http.Request { +func (c *Client) setReqURL(u *url.URL, req *http.Request) { req.URL.Scheme = u.Scheme req.URL.Host = u.Host @@ -431,25 +412,21 @@ func (c *Client) setReqURL(u *url.URL, req *http.Request) *http.Request { b.WriteString(req.URL.Path) req.URL.Path = b.String() } - - return req } -func (c *Client) setReqAuth(u *url.URL, req *http.Request) *http.Request { +func (c *Client) setReqAuth(u *url.URL, req *http.Request) { if _, ok := req.Header["Authorization"]; !ok { if u.User != nil { password, _ := u.User.Password() req.SetBasicAuth(u.User.Username(), password) - return req + return } if c.username != "" && c.password != "" { req.SetBasicAuth(c.username, c.password) - return req + return } } - - return req } func (c *Client) signRequest(req *http.Request) error { @@ -459,12 +436,11 @@ func (c *Client) signRequest(req *http.Request) error { return nil } -func (c *Client) setReqUserAgent(req *http.Request) *http.Request { - req.Header.Set("User-Agent", userAgent) - return req +func (c *Client) setReqUserAgent(req *http.Request) { + req.Header.Set("User-Agent", c.userAgent) } -func (c *Client) setReqGlobalHeader(req *http.Request) *http.Request { +func (c *Client) setReqGlobalHeader(req *http.Request) { if len(c.header) > 0 { for k, v := range c.header { if req.Header.Get(k) != k { @@ -474,7 +450,6 @@ func (c *Client) setReqGlobalHeader(req *http.Request) *http.Request { } } } - return req } func (c *Client) logRoundTrip( @@ -488,14 +463,18 @@ func (c *Client) logRoundTrip( if res != nil { dupRes = *res } + if c.logger.ResponseBodyEnabled() { if res != nil && res.Body != nil && res.Body != http.NoBody { + //nolint:errcheck // ignored as this is only for logging b1, b2, _ := duplicateBody(res.Body) dupRes.Body = b1 res.Body = b2 } } - c.logger.LogRoundTrip(req, &dupRes, err, start, dur) // errcheck exclude + + //nolint:errcheck // ignored as this is only for logging + c.logger.LogRoundTrip(req, &dupRes, err, start, dur) } func initUserAgent() string { diff --git a/opensearchtransport/opensearchtransport_benchmark_test.go b/opensearchtransport/opensearchtransport_benchmark_test.go index 6730cab8a..977b3e518 100644 --- a/opensearchtransport/opensearchtransport_benchmark_test.go +++ b/opensearchtransport/opensearchtransport_benchmark_test.go @@ -24,12 +24,13 @@ // specific language governing permissions and limitations // under the License. -// +build !integration +//go:build !integration package opensearchtransport_test import ( - "io/ioutil" + "fmt" + "io" "net/http" "net/url" "strings" @@ -38,24 +39,24 @@ import ( "github.com/opensearch-project/opensearch-go/v2/opensearchtransport" ) -var defaultResponse = http.Response{ - Status: "200 OK", - StatusCode: 200, - ContentLength: 13, - Header: http.Header(map[string][]string{"Content-Type": {"application/json"}}), - Body: ioutil.NopCloser(strings.NewReader(`{"foo":"bar"}`)), -} - type FakeTransport struct { FakeResponse *http.Response } -func (t *FakeTransport) RoundTrip(req *http.Request) (*http.Response, error) { +func (t *FakeTransport) RoundTrip(_ *http.Request) (*http.Response, error) { return t.FakeResponse, nil } -func newFakeTransport(b *testing.B) *FakeTransport { - return &FakeTransport{FakeResponse: &defaultResponse} +func newFakeTransport(_ *testing.B) *FakeTransport { + return &FakeTransport{ + FakeResponse: &http.Response{ + Status: fmt.Sprintf("%d %s", http.StatusOK, http.StatusText(http.StatusOK)), + StatusCode: http.StatusOK, + ContentLength: 13, + Header: http.Header(map[string][]string{"Content-Type": {"application/json"}}), + Body: io.NopCloser(strings.NewReader(`{"foo":"bar"}`)), + }, + } } func BenchmarkTransport(b *testing.B) { @@ -68,11 +69,12 @@ func BenchmarkTransport(b *testing.B) { Transport: newFakeTransport(b), }) - req, _ := http.NewRequest("GET", "/abc", nil) - _, err := tp.Perform(req) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + res, err := tp.Perform(req) if err != nil { b.Fatalf("Unexpected error: %s", err) } + defer res.Body.Close() } }) @@ -87,11 +89,12 @@ func BenchmarkTransport(b *testing.B) { Transport: newFakeTransport(b), }) - req, _ := http.NewRequest("GET", "/abc", nil) - _, err := tp.Perform(req) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + res, err := tp.Perform(req) if err != nil { b.Fatalf("Unexpected error: %s", err) } + defer res.Body.Close() } }) } diff --git a/opensearchtransport/opensearchtransport_integration_multinode_test.go b/opensearchtransport/opensearchtransport_integration_multinode_test.go index d8dcb5b92..3173edcc5 100644 --- a/opensearchtransport/opensearchtransport_integration_multinode_test.go +++ b/opensearchtransport/opensearchtransport_integration_multinode_test.go @@ -24,7 +24,7 @@ // specific language governing permissions and limitations // under the License. -// +build integration,multinode +//go:build integration && multinode package opensearchtransport_test diff --git a/opensearchtransport/opensearchtransport_integration_test.go b/opensearchtransport/opensearchtransport_integration_test.go index bb9876afb..614c71ed0 100644 --- a/opensearchtransport/opensearchtransport_integration_test.go +++ b/opensearchtransport/opensearchtransport_integration_test.go @@ -25,7 +25,6 @@ // under the License. //go:build integration -// +build integration package opensearchtransport_test diff --git a/opensearchtransport/opensearchtransport_internal_test.go b/opensearchtransport/opensearchtransport_internal_test.go index 8e3a327ac..fbe1f26a3 100644 --- a/opensearchtransport/opensearchtransport_internal_test.go +++ b/opensearchtransport/opensearchtransport_internal_test.go @@ -24,7 +24,7 @@ // specific language governing permissions and limitations // under the License. -// +build !integration +//go:build !integration package opensearchtransport @@ -33,7 +33,6 @@ import ( "compress/gzip" "fmt" "io" - "io/ioutil" "math/rand" "net/http" "net/url" @@ -43,12 +42,10 @@ import ( "time" ) -var ( - _ = fmt.Print -) +var _ = fmt.Print func init() { - rand.Seed(time.Now().Unix()) + rand.New(rand.NewSource(time.Now().Unix())).Uint64() } type mockTransp struct { @@ -96,13 +93,15 @@ func TestTransport(t *testing.T) { }) t.Run("Custom", func(t *testing.T) { - tp, _ := New(Config{ - URLs: []*url.URL{{}}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { return &http.Response{Status: "MOCK"}, nil }, + tp, _ := New( + Config{ + URLs: []*url.URL{{}}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { return &http.Response{Status: "MOCK"}, nil }, + }, }, - }) - + ) + //nolint:bodyclose // Mock response does not have a body to close res, err := tp.transport.RoundTrip(&http.Request{URL: &url.URL{}}) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -240,7 +239,7 @@ func (cp *CustomConnectionPool) Next() (*Connection, error) { } func (cp *CustomConnectionPool) OnFailure(c *Connection) error { - var index = -1 + index := -1 for i, u := range cp.urls { if u == c.URL { index = i @@ -252,8 +251,8 @@ func (cp *CustomConnectionPool) OnFailure(c *Connection) error { } return fmt.Errorf("connection not found") } -func (cp *CustomConnectionPool) OnSuccess(c *Connection) error { return nil } -func (cp *CustomConnectionPool) URLs() []*url.URL { return cp.urls } +func (cp *CustomConnectionPool) OnSuccess(_ *Connection) {} +func (cp *CustomConnectionPool) URLs() []*url.URL { return cp.urls } func TestTransportCustomConnectionPool(t *testing.T) { t.Run("Run", func(t *testing.T) { @@ -291,14 +290,18 @@ func TestTransportCustomConnectionPool(t *testing.T) { func TestTransportPerform(t *testing.T) { t.Run("Executes", func(t *testing.T) { u, _ := url.Parse("https://foo.com/bar") - tp, _ := New(Config{ - URLs: []*url.URL{u}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { return &http.Response{Status: "MOCK"}, nil }, - }}) + tp, _ := New( + Config{ + URLs: []*url.URL{u}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { return &http.Response{Status: "MOCK"}, nil }, + }, + }, + ) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + //nolint:bodyclose // Mock response does not have a body to close res, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -313,7 +316,7 @@ func TestTransportPerform(t *testing.T) { u, _ := url.Parse("https://foo.com/bar") tp, _ := New(Config{URLs: []*url.URL{u}}) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) tp.setReqURL(u, req) expected := "https://foo.com/bar/abc" @@ -327,7 +330,7 @@ func TestTransportPerform(t *testing.T) { u, _ := url.Parse("https://foo:bar@example.com") tp, _ := New(Config{URLs: []*url.URL{u}}) - req, _ := http.NewRequest("GET", "/", nil) + req, _ := http.NewRequest(http.MethodGet, "/", nil) tp.setReqAuth(u, req) username, password, ok := req.BasicAuth() @@ -344,7 +347,7 @@ func TestTransportPerform(t *testing.T) { u, _ := url.Parse("http://example.com") tp, _ := New(Config{URLs: []*url.URL{u}, Username: "foo", Password: "bar"}) - req, _ := http.NewRequest("GET", "/", nil) + req, _ := http.NewRequest(http.MethodGet, "/", nil) tp.setReqAuth(u, req) username, password, ok := req.BasicAuth() @@ -361,7 +364,7 @@ func TestTransportPerform(t *testing.T) { u, _ := url.Parse("http://example.com") tp, _ := New(Config{URLs: []*url.URL{u}}) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) tp.setReqUserAgent(req) if !strings.HasPrefix(req.UserAgent(), "opensearch-go") { @@ -377,7 +380,7 @@ func TestTransportPerform(t *testing.T) { { // Set the global HTTP header - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) tp.setReqGlobalHeader(req) if req.Header.Get("X-Foo") != "bar" { @@ -387,7 +390,7 @@ func TestTransportPerform(t *testing.T) { { // Do NOT overwrite an existing request header - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) req.Header.Set("X-Foo", "baz") tp.setReqGlobalHeader(req) @@ -409,7 +412,7 @@ func TestTransportPerform(t *testing.T) { }, ) - req, _ := http.NewRequest("GET", "/", nil) + req, _ := http.NewRequest(http.MethodGet, "/", nil) tp.signRequest(req) if _, ok := req.Header["Sign-Status"]; !ok { @@ -418,14 +421,18 @@ func TestTransportPerform(t *testing.T) { }) t.Run("Error No URL", func(t *testing.T) { - tp, _ := New(Config{ - URLs: []*url.URL{}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { return &http.Response{Status: "MOCK"}, nil }, - }}) + tp, _ := New( + Config{ + URLs: []*url.URL{}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { return &http.Response{Status: "MOCK"}, nil }, + }, + }, + ) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + //nolint:bodyclose // Mock response does not have a body to close _, err := tp.Perform(req) if err.Error() != `cannot get connection: no connection available` { t.Fatalf("Expected error `cannot get URL`: but got error %q", err) @@ -441,25 +448,28 @@ func TestTransportPerformRetries(t *testing.T) { ) u, _ := url.Parse("http://foo.bar") - tp, _ := New(Config{ - URLs: []*url.URL{u, u, u}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { - i++ - fmt.Printf("Request #%d", i) - if i == numReqs { - fmt.Print(": OK\n") - return &http.Response{Status: "OK"}, nil - } - fmt.Print(": ERR\n") - return nil, &mockNetError{error: fmt.Errorf("Mock network error (%d)", i)} + tp, _ := New( + Config{ + URLs: []*url.URL{u, u, u}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + i++ + fmt.Printf("Request #%d", i) + if i == numReqs { + fmt.Print(": OK\n") + return &http.Response{Status: "OK"}, nil + } + fmt.Print(": ERR\n") + return nil, &mockNetError{error: fmt.Errorf("Mock network error (%d)", i)} + }, }, - }}) + }, + ) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + //nolint:bodyclose // Mock response does not have a body to close res, err := tp.Perform(req) - if err != nil { t.Fatalf("Unexpected error: %s", err) } @@ -480,25 +490,28 @@ func TestTransportPerformRetries(t *testing.T) { ) u, _ := url.Parse("http://foo.bar") - tp, _ := New(Config{ - URLs: []*url.URL{u, u, u}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { - i++ - fmt.Printf("Request #%d", i) - if i == numReqs { - fmt.Print(": OK\n") - return &http.Response{Status: "OK"}, nil - } - fmt.Print(": ERR\n") - return nil, io.EOF + tp, _ := New( + Config{ + URLs: []*url.URL{u, u, u}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + i++ + fmt.Printf("Request #%d", i) + if i == numReqs { + fmt.Print(": OK\n") + return &http.Response{Status: "OK"}, nil + } + fmt.Print(": ERR\n") + return nil, io.EOF + }, }, - }}) + }, + ) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + //nolint:bodyclose // Mock response does not have a body to close res, err := tp.Perform(req) - if err != nil { t.Fatalf("Unexpected error: %s", err) } @@ -519,30 +532,33 @@ func TestTransportPerformRetries(t *testing.T) { ) u, _ := url.Parse("http://foo.bar") - tp, _ := New(Config{ - URLs: []*url.URL{u, u, u}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { - i++ - fmt.Printf("Request #%d", i) - if i == numReqs { - fmt.Print(": 200\n") - return &http.Response{StatusCode: 200}, nil - } - fmt.Print(": 502\n") - return &http.Response{StatusCode: 502}, nil + tp, _ := New( + Config{ + URLs: []*url.URL{u, u, u}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + i++ + fmt.Printf("Request #%d", i) + if i == numReqs { + fmt.Print(": 200\n") + return &http.Response{StatusCode: http.StatusOK}, nil + } + fmt.Print(": 502\n") + return &http.Response{StatusCode: http.StatusBadGateway}, nil + }, }, - }}) + }, + ) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + //nolint:bodyclose // Mock response does not have a body to close res, err := tp.Perform(req) - if err != nil { t.Fatalf("Unexpected error: %s", err) } - if res.StatusCode != 200 { + if res.StatusCode != http.StatusOK { t.Errorf("Unexpected response: %+v", res) } @@ -558,23 +574,25 @@ func TestTransportPerformRetries(t *testing.T) { ) u, _ := url.Parse("http://foo.bar") - tp, _ := New(Config{ - URLs: []*url.URL{u, u, u}, - MaxRetries: numReqs, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { - i++ - fmt.Printf("Request #%d", i) - fmt.Print(": 502\n") - body := ioutil.NopCloser(strings.NewReader(`MOCK`)) - return &http.Response{StatusCode: 502, Body: body}, nil + tp, _ := New( + Config{ + URLs: []*url.URL{u, u, u}, + MaxRetries: numReqs, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + i++ + fmt.Printf("Request #%d", i) + fmt.Print(": 502\n") + body := io.NopCloser(strings.NewReader(`MOCK`)) + return &http.Response{StatusCode: http.StatusBadGateway, Body: body}, nil + }, }, - }}) + }, + ) - req, _ := http.NewRequest("GET", "/", nil) + req, _ := http.NewRequest(http.MethodGet, "/", nil) res, err := tp.Perform(req) - if err != nil { t.Fatalf("Unexpected error: %s", err) } @@ -583,11 +601,11 @@ func TestTransportPerformRetries(t *testing.T) { t.Errorf("Unexpected number of requests, want=%d, got=%d", numReqs, i) } - if res.StatusCode != 502 { + if res.StatusCode != http.StatusBadGateway { t.Errorf("Unexpected response: %+v", res) } - resBody, _ := ioutil.ReadAll(res.Body) + resBody, _ := io.ReadAll(res.Body) res.Body.Close() if string(resBody) != "MOCK" { @@ -602,21 +620,24 @@ func TestTransportPerformRetries(t *testing.T) { ) u, _ := url.Parse("http://foo.bar") - tp, _ := New(Config{ - URLs: []*url.URL{u, u, u}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { - i++ - fmt.Printf("Request #%d", i) - fmt.Print(": ERR\n") - return nil, &mockNetError{error: fmt.Errorf("Mock network error (%d)", i)} + tp, _ := New( + Config{ + URLs: []*url.URL{u, u, u}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + i++ + fmt.Printf("Request #%d", i) + fmt.Print(": ERR\n") + return nil, &mockNetError{error: fmt.Errorf("Mock network error (%d)", i)} + }, }, - }}) + }, + ) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + //nolint:bodyclose // Mock response does not have a body to close res, err := tp.Perform(req) - if err == nil { t.Fatalf("Expected error, got: %v", err) } @@ -634,21 +655,24 @@ func TestTransportPerformRetries(t *testing.T) { t.Run("Reset request body during retry", func(t *testing.T) { var bodies []string u, _ := url.Parse("https://foo.com/bar") - tp, _ := New(Config{ - URLs: []*url.URL{u}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { - body, err := ioutil.ReadAll(req.Body) - if err != nil { - panic(err) - } - bodies = append(bodies, string(body)) - return &http.Response{Status: "MOCK", StatusCode: 502}, nil + tp, _ := New( + Config{ + URLs: []*url.URL{u}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + body, err := io.ReadAll(req.Body) + if err != nil { + panic(err) + } + bodies = append(bodies, string(body)) + return &http.Response{Status: "MOCK", StatusCode: http.StatusBadGateway}, nil + }, }, - }}, + }, ) - req, _ := http.NewRequest("POST", "/abc", strings.NewReader("FOOBAR")) + req, _ := http.NewRequest(http.MethodPost, "/abc", strings.NewReader("FOOBAR")) + //nolint:bodyclose // Mock response does not have a body to close res, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -669,21 +693,24 @@ func TestTransportPerformRetries(t *testing.T) { var i int u, _ := url.Parse("http://foo.bar") - tp, _ := New(Config{ - URLs: []*url.URL{u, u, u}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { - i++ - fmt.Printf("Request #%d", i) - fmt.Print(": ERR\n") - return nil, fmt.Errorf("Mock regular error (%d)", i) + tp, _ := New( + Config{ + URLs: []*url.URL{u, u, u}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + i++ + fmt.Printf("Request #%d", i) + fmt.Print(": ERR\n") + return nil, fmt.Errorf("Mock regular error (%d)", i) + }, }, - }}) + }, + ) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + //nolint:bodyclose // Mock response does not have a body to close res, err := tp.Perform(req) - if err == nil { t.Fatalf("Expected error, got: %v", err) } @@ -701,20 +728,23 @@ func TestTransportPerformRetries(t *testing.T) { var i int u, _ := url.Parse("http://foo.bar") - tp, _ := New(Config{ - URLs: []*url.URL{u, u, u}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { - i++ - fmt.Printf("Request #%d", i) - fmt.Print(": ERR\n") - return nil, &mockNetError{error: fmt.Errorf("Mock network error (%d)", i)} + tp, _ := New( + Config{ + URLs: []*url.URL{u, u, u}, + Transport: &mockTransp{ + RoundTripFunc: func(req *http.Request) (*http.Response, error) { + i++ + fmt.Printf("Request #%d", i) + fmt.Print(": ERR\n") + return nil, &mockNetError{error: fmt.Errorf("Mock network error (%d)", i)} + }, }, + DisableRetry: true, }, - DisableRetry: true, - }) + ) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + //nolint:bodyclose // Mock response does not have a body to close tp.Perform(req) if i != 1 { @@ -756,11 +786,11 @@ func TestTransportPerformRetries(t *testing.T) { }, }) - req, _ := http.NewRequest("GET", "/abc", nil) + req, _ := http.NewRequest(http.MethodGet, "/abc", nil) + //nolint:bodyclose // Mock response does not have a body to close res, err := tp.Perform(req) end := time.Since(start) - if err != nil { t.Fatalf("Unexpected error: %s", err) } @@ -862,6 +892,7 @@ func TestMaxRetries(t *testing.T) { DisableRetry: test.disableRetry, }) + //nolint:bodyclose // Mock response does not have a body to close c.Perform(&http.Request{URL: &url.URL{}, Header: make(http.Header)}) // errcheck ignore if test.expectedCallCount != callCount { @@ -871,74 +902,7 @@ func TestMaxRetries(t *testing.T) { } } -func TestCompatibilityHeader(t *testing.T) { - tests := []struct { - name string - compatibilityHeader bool - bodyPresent bool - expectsHeader []string - }{ - { - name: "Compatibility header disabled", - compatibilityHeader: false, - bodyPresent: false, - expectsHeader: []string{"application/json"}, - }, - { - name: "Compatibility header enabled", - compatibilityHeader: true, - bodyPresent: false, - expectsHeader: []string{"application/vnd.elasticsearch+json;compatible-with=7"}, - }, - { - name: "Compatibility header enabled with body", - compatibilityHeader: true, - bodyPresent: true, - expectsHeader: []string{"application/vnd.elasticsearch+json;compatible-with=7"}, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - compatibilityHeader = test.compatibilityHeader - - c, _ := New(Config{ - URLs: []*url.URL{{}}, - Transport: &mockTransp{ - RoundTripFunc: func(req *http.Request) (*http.Response, error) { - if test.compatibilityHeader { - if !reflect.DeepEqual(req.Header["Accept"], test.expectsHeader) { - t.Errorf("Compatibility header enabled but header is, not in request headers, got: %s, want: %s", req.Header["Accept"], test.expectsHeader) - } - } - if test.bodyPresent { - if !reflect.DeepEqual(req.Header["Content-Type"], test.expectsHeader) { - t.Errorf("Compatibility header with Body enabled, not in request headers, got: %s, want: %s", req.Header["Content-Type"], test.expectsHeader) - } - } - - return &http.Response{ - StatusCode: http.StatusOK, - Status: "MOCK", - }, nil - }, - }, - }) - - req := &http.Request{URL: &url.URL{}, Header: make(http.Header)} - if test.bodyPresent { - req.Body = ioutil.NopCloser(strings.NewReader("{}")) - } - - _, _ = c.Perform(req) - - compatibilityHeader = false - }) - } -} - func TestRequestCompression(t *testing.T) { - tests := []struct { name string compressionFlag bool @@ -978,7 +942,7 @@ func TestRequestCompression(t *testing.T) { var unBuf bytes.Buffer zr, err := gzip.NewReader(&buf) if err != nil { - return nil, fmt.Errorf("decompression error: %v", err) + return nil, fmt.Errorf("decompression error: %w", err) } unBuf.ReadFrom(zr) buf = unBuf @@ -993,8 +957,9 @@ func TestRequestCompression(t *testing.T) { }, }) - req, _ := http.NewRequest("POST", "/abc", bytes.NewBufferString(test.inputBody)) + req, _ := http.NewRequest(http.MethodPost, "/abc", bytes.NewBufferString(test.inputBody)) + //nolint:bodyclose // Mock response does not have a body to close res, err := tp.Perform(req) if err != nil { t.Fatalf("Unexpected error: %s", err) @@ -1008,7 +973,6 @@ func TestRequestCompression(t *testing.T) { } func TestRequestSigning(t *testing.T) { - t.Run("Sign request fails", func(t *testing.T) { u, _ := url.Parse("https://foo:bar@example.com") tp, _ := New( @@ -1024,7 +988,8 @@ func TestRequestSigning(t *testing.T) { }, }, ) - req, _ := http.NewRequest("GET", "/", nil) + req, _ := http.NewRequest(http.MethodGet, "/", nil) + //nolint:bodyclose // Mock response does not have a body to close _, err := tp.Perform(req) if err == nil { t.Fatal("Expected error, but, no error found") From b3cd5b8b50dc5e67dc628ffb23e41df773122478 Mon Sep 17 00:00:00 2001 From: Jakob Hahn Date: Wed, 26 Jul 2023 16:46:38 +0200 Subject: [PATCH 3/3] changelog: Solve linting complains for opensearchtransport Signed-off-by: Jakob Hahn --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7adb27aa4..dbdb36d4c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,6 +41,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Moved @svencowart to emeritus maintainers ([#270](https://github.com/opensearch-project/opensearch-go/pull/270)) - Read, close and replace the http Reponse Body ([#300](https://github.com/opensearch-project/opensearch-go/pull/300)) - Updated and adjusted golangci-lint, solve linting complains for signer ([#352](https://github.com/opensearch-project/opensearch-go/pull/352)) +- Solve linting complains for opensearchtransport ([#353](https://github.com/opensearch-project/opensearch-go/pull/353)) ### Deprecated