Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

golangci-lint: Solve Complains for opensearchtransport #353

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Moved @svencowart to emeritus maintainers ([#270](https://github.com/opensearch-project/opensearch-go/pull/270))
- Read, close and replace the http Reponse Body ([#300](https://github.com/opensearch-project/opensearch-go/pull/300))
- Updated and adjusted golangci-lint, solve linting complains for signer ([#352](https://github.com/opensearch-project/opensearch-go/pull/352))
- Solve linting complains for opensearchtransport ([#353](https://github.com/opensearch-project/opensearch-go/pull/353))

### Deprecated

Expand Down
86 changes: 47 additions & 39 deletions opensearchtransport/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,25 @@ import (
"time"
)

var (
const (
defaultResurrectTimeoutInitial = 60 * time.Second
defaultResurrectTimeoutFactorCutoff = 5
)

// Selector defines the interface for selecting connections from the pool.
//
type Selector interface {
Select([]*Connection) (*Connection, error)
}

// ConnectionPool defines the interface for the connection pool.
//
type ConnectionPool interface {
Next() (*Connection, error) // Next returns the next available connection.
OnSuccess(*Connection) error // OnSuccess reports that the connection was successful.
OnSuccess(*Connection) // OnSuccess reports that the connection was successful.
OnFailure(*Connection) error // OnFailure reports that the connection failed.
URLs() []*url.URL // URLs returns the list of URLs of available connections.
}

// Connection represents a connection to a node.
//
type Connection struct {
sync.Mutex

Expand All @@ -81,9 +78,11 @@ type singleConnectionPool struct {
type statusConnectionPool struct {
sync.Mutex

live []*Connection // List of live connections
dead []*Connection // List of dead connections
selector Selector
live []*Connection // List of live connections
dead []*Connection // List of dead connections
selector Selector
resurrectTimeoutInitial time.Duration
resurrectTimeoutFactorCutoff int

metrics *metrics
}
Expand All @@ -95,36 +94,40 @@ type roundRobinSelector struct {
}

// NewConnectionPool creates and returns a default connection pool.
//
func NewConnectionPool(conns []*Connection, selector Selector) (ConnectionPool, error) {
func NewConnectionPool(conns []*Connection, selector Selector) ConnectionPool {
if len(conns) == 1 {
return &singleConnectionPool{connection: conns[0]}, nil
return &singleConnectionPool{connection: conns[0]}
}

if selector == nil {
selector = &roundRobinSelector{curr: -1}
}
return &statusConnectionPool{live: conns, selector: selector}, nil

return &statusConnectionPool{
live: conns,
selector: selector,
resurrectTimeoutInitial: defaultResurrectTimeoutInitial,
resurrectTimeoutFactorCutoff: defaultResurrectTimeoutFactorCutoff,
}
}

// Next returns the connection from pool.
//
func (cp *singleConnectionPool) Next() (*Connection, error) {
return cp.connection, nil
}

// OnSuccess is a no-op for single connection pool.
func (cp *singleConnectionPool) OnSuccess(c *Connection) error { return nil }
func (cp *singleConnectionPool) OnSuccess(*Connection) {}

// OnFailure is a no-op for single connection pool.
func (cp *singleConnectionPool) OnFailure(c *Connection) error { return nil }
func (cp *singleConnectionPool) OnFailure(*Connection) error { return nil }

// URLs returns the list of URLs of available connections.
func (cp *singleConnectionPool) URLs() []*url.URL { return []*url.URL{cp.connection.URL} }

func (cp *singleConnectionPool) connections() []*Connection { return []*Connection{cp.connection} }

// Next returns a connection from pool, or an error.
//
func (cp *statusConnectionPool) Next() (*Connection, error) {
cp.Lock()
defer cp.Unlock()
Expand All @@ -141,29 +144,28 @@ func (cp *statusConnectionPool) Next() (*Connection, error) {
cp.resurrect(c, false)
return c, nil
}

return nil, errors.New("no connection available")
}

// OnSuccess marks the connection as successful.
//
func (cp *statusConnectionPool) OnSuccess(c *Connection) error {
func (cp *statusConnectionPool) OnSuccess(c *Connection) {
c.Lock()
defer c.Unlock()

// Short-circuit for live connection
if !c.IsDead {
return nil
return
}

c.markAsHealthy()

cp.Lock()
defer cp.Unlock()
return cp.resurrect(c, true)
cp.resurrect(c, true)
}

// OnFailure marks the connection as failed.
//
func (cp *statusConnectionPool) OnFailure(c *Connection) error {
cp.Lock()
defer cp.Unlock()
Expand All @@ -175,12 +177,14 @@ func (cp *statusConnectionPool) OnFailure(c *Connection) error {
debugLogger.Logf("Already removed %s\n", c.URL)
}
c.Unlock()

return nil
}

if debugLogger != nil {
debugLogger.Logf("Removing %s...\n", c.URL)
}

c.markAsDead()
cp.scheduleResurrect(c)
c.Unlock()
Expand All @@ -201,12 +205,16 @@ func (cp *statusConnectionPool) OnFailure(c *Connection) error {

// Check if connection exists in the list, return error if not.
index := -1

for i, conn := range cp.live {
if conn == c {
index = i
}
}

if index < 0 {
// Does this error even get raised? Under what conditions can the connection not be in the cp.live list?
// If the connection is marked dead the function already ended
return errors.New("connection not in live list")
}

Expand All @@ -218,15 +226,13 @@ func (cp *statusConnectionPool) OnFailure(c *Connection) error {
}

// URLs returns the list of URLs of available connections.
//
func (cp *statusConnectionPool) URLs() []*url.URL {
var urls []*url.URL

cp.Lock()
defer cp.Unlock()

for _, c := range cp.live {
urls = append(urls, c.URL)
urls := make([]*url.URL, len(cp.live))
for idx, c := range cp.live {
urls[idx] = c.URL
}

return urls
Expand All @@ -236,14 +242,14 @@ func (cp *statusConnectionPool) connections() []*Connection {
var conns []*Connection
conns = append(conns, cp.live...)
conns = append(conns, cp.dead...)

return conns
}

// resurrect adds the connection to the list of available connections.
// When removeDead is true, it also removes it from the dead list.
// The calling code is responsible for locking.
//
func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) error {
func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) {
if debugLogger != nil {
debugLogger.Logf("Resurrecting %s\n", c.URL)
}
Expand All @@ -253,28 +259,35 @@ func (cp *statusConnectionPool) resurrect(c *Connection, removeDead bool) error

if removeDead {
index := -1

for i, conn := range cp.dead {
if conn == c {
index = i
}
}

if index >= 0 {
// Remove item; https://github.com/golang/go/wiki/SliceTricks
copy(cp.dead[index:], cp.dead[index+1:])
cp.dead = cp.dead[:len(cp.dead)-1]
}
}

return nil
}

// scheduleResurrect schedules the connection to be resurrected.
//
func (cp *statusConnectionPool) scheduleResurrect(c *Connection) {
factor := math.Min(float64(c.Failures-1), float64(defaultResurrectTimeoutFactorCutoff))
timeout := time.Duration(defaultResurrectTimeoutInitial.Seconds() * math.Exp2(factor) * float64(time.Second))
factor := math.Min(float64(c.Failures-1), float64(cp.resurrectTimeoutFactorCutoff))
timeout := time.Duration(cp.resurrectTimeoutInitial.Seconds() * math.Exp2(factor) * float64(time.Second))

if debugLogger != nil {
debugLogger.Logf("Resurrect %s (failures=%d, factor=%1.1f, timeout=%s) in %s\n", c.URL, c.Failures, factor, timeout, c.DeadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second))
debugLogger.Logf(
"Resurrect %s (failures=%d, factor=%1.1f, timeout=%s) in %s\n",
c.URL,
c.Failures,
factor,
timeout,
c.DeadSince.Add(timeout).Sub(time.Now().UTC()).Truncate(time.Second),
)
}

time.AfterFunc(timeout, func() {
Expand All @@ -296,7 +309,6 @@ func (cp *statusConnectionPool) scheduleResurrect(c *Connection) {
}

// Select returns the connection in a round-robin fashion.
//
func (s *roundRobinSelector) Select(conns []*Connection) (*Connection, error) {
s.Lock()
defer s.Unlock()
Expand All @@ -306,7 +318,6 @@ func (s *roundRobinSelector) Select(conns []*Connection) (*Connection, error) {
}

// markAsDead marks the connection as dead.
//
func (c *Connection) markAsDead() {
c.IsDead = true
if c.DeadSince.IsZero() {
Expand All @@ -316,21 +327,18 @@ func (c *Connection) markAsDead() {
}

// markAsLive marks the connection as alive.
//
func (c *Connection) markAsLive() {
c.IsDead = false
}

// markAsHealthy marks the connection as healthy.
//
func (c *Connection) markAsHealthy() {
c.IsDead = false
c.DeadSince = time.Time{}
c.Failures = 0
}

// String returns a readable connection representation.
//
func (c *Connection) String() string {
c.Lock()
defer c.Unlock()
Expand Down
Loading