Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for the desired nodes API #5650

Merged
merged 24 commits into from
May 30, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/apis/elasticsearch/v1/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ type ConditionType string
const (
ElasticsearchIsReachable ConditionType = "ElasticsearchIsReachable"
ReconciliationComplete ConditionType = "ReconciliationComplete"
ResourcesAwareManagement ConditionType = "ResourcesAwareManagement"
RunningDesiredVersion ConditionType = "RunningDesiredVersion"
)

Expand Down
1 change: 1 addition & 0 deletions pkg/controller/elasticsearch/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type Role struct {
type Client interface {
AllocationSetter
AutoscalingClient
DesiredNodesClient
ShardLister
LicenseClient
// Close idle connections in the underlying http client.
Expand Down
61 changes: 61 additions & 0 deletions pkg/controller/elasticsearch/client/desired_nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package client

import (
"context"
"fmt"

"github.com/elastic/cloud-on-k8s/pkg/controller/common/version"
)

var desiredNodesMinVersion = version.MustParse("8.1.0")
barkbay marked this conversation as resolved.
Show resolved Hide resolved

type DesiredNodesClient interface {
IsDesiredNodesSupported() bool
// UpdateDesiredNodes updates the desired nodes API.
UpdateDesiredNodes(ctx context.Context, historyID string, version int64, desiredNodes DesiredNodes) error
// DeleteDesiredNodes deletes the desired nodes from the cluster state.
DeleteDesiredNodes(ctx context.Context) error
}

type DesiredNodes struct {
DesiredNodes []DesiredNode `json:"nodes"`
}

type DesiredNode struct {
Settings map[string]interface{} `json:"settings"`
Processors int `json:"processors"`
Memory string `json:"memory"`
Storage string `json:"storage"`
NodeVersion string `json:"node_version"`
}

func (c *baseClient) UpdateDesiredNodes(_ context.Context, _ string, _ int64, _ DesiredNodes) error {
return c.desiredNodesNotAvailable()
}

func (c *baseClient) DeleteDesiredNodes(_ context.Context) error {
return c.desiredNodesNotAvailable()
}

func (c *baseClient) desiredNodesNotAvailable() error {
return fmt.Errorf("the desired node API is not available in Elasticsearch %s, it requires %s", c.version, desiredNodesMinVersion)
barkbay marked this conversation as resolved.
Show resolved Hide resolved
}

func (c *baseClient) IsDesiredNodesSupported() bool {
return c.version.GTE(desiredNodesMinVersion)
}

func (c *clientV8) UpdateDesiredNodes(ctx context.Context, historyID string, version int64, desiredNodes DesiredNodes) error {
return c.put(
ctx,
fmt.Sprintf("/_internal/desired_nodes/%s/%d", historyID, version),
&desiredNodes, nil)
}

func (c *clientV8) DeleteDesiredNodes(ctx context.Context) error {
return c.delete(ctx, "/_internal/desired_nodes")
}
79 changes: 79 additions & 0 deletions pkg/controller/elasticsearch/driver/desired_nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package driver

import (
"context"
"errors"
"fmt"

"go.elastic.co/apm"
corev1 "k8s.io/api/core/v1"

esv1 "github.com/elastic/cloud-on-k8s/pkg/apis/elasticsearch/v1"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/pkg/controller/common/tracing"
esclient "github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/client"
"github.com/elastic/cloud-on-k8s/pkg/controller/elasticsearch/nodespec"
"github.com/elastic/cloud-on-k8s/pkg/utils/k8s"
)

func (d *defaultDriver) updateDesiredNodes(
ctx context.Context,
k8sClient k8s.Client,
esClient esclient.Client,
esReachable bool,
expectedResources nodespec.ResourcesList,
) *reconciler.Results {
span, ctx := apm.StartSpan(ctx, "update_desired_nodes", tracing.SpanTypeApp)
defer span.End()
results := &reconciler.Results{}
// We compute the desired nodes state to update the condition
var resourceNotAvailableErr *nodespec.ResourceNotAvailable
nodes, requeue, err := expectedResources.ToDesiredNodes(ctx, k8sClient, d.ES.Spec.Version)
switch {
case err == nil:
d.ReconcileState.ReportCondition(
esv1.ResourcesAwareManagement,
corev1.ConditionTrue,
fmt.Sprintf("Successfully calculated compute and storage resources from Elasticsearch resource generation %d", d.ES.Generation),
)
case errors.As(err, &resourceNotAvailableErr):
// It is not possible to build the desired node spec because of the Elasticsearch specification
d.ReconcileState.ReportCondition(
esv1.ResourcesAwareManagement,
corev1.ConditionFalse,
fmt.Sprintf("Cannot get compute and storage resources from Elasticsearch resource generation %d: %s", d.ES.Generation, err.Error()),
)
// It is fine to continue, error is only reported through the condition.
// We should however clear the desired nodes API since we are a degraded (not resources aware) mode.
barkbay marked this conversation as resolved.
Show resolved Hide resolved
if esReachable {
return results.WithError(esClient.DeleteDesiredNodes(ctx))
barkbay marked this conversation as resolved.
Show resolved Hide resolved
}
return results.WithReconciliationState(defaultRequeue.WithReason("Desired nodes API must be cleared"))
default:
// Unknown error: not nil and not ResourceNotAvailable
d.ReconcileState.ReportCondition(
esv1.ResourcesAwareManagement,
corev1.ConditionUnknown,
fmt.Sprintf("Error while calculating compute and storage resources from Elasticsearch resource generation %d: %s", d.ES.Generation, err.Error()),
)
return results.WithError(err)
}
if requeue {
results.WithReconciliationState(defaultRequeue.WithReason("Desired nodes API must be updated"))
barkbay marked this conversation as resolved.
Show resolved Hide resolved
}
if esReachable {
err := esClient.UpdateDesiredNodes(ctx, string(d.ES.UID), d.ES.Generation, esclient.DesiredNodes{DesiredNodes: nodes})
if err != nil {
return results.WithReconciliationState(
defaultRequeue.WithReason(fmt.Sprintf("error while updating desired nodes state in Elasticsearch: %s", err)),
)
}
} else {
results.WithReconciliationState(defaultRequeue.WithReason("Waiting for Elasticsearch to be available to update the desired nodes API"))
}
return results
}
7 changes: 7 additions & 0 deletions pkg/controller/elasticsearch/driver/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,13 @@ func (d *defaultDriver) reconcileNodeSpecs(
return results.WithError(err)
}

if esClient.IsDesiredNodesSupported() {
results.WithResults(d.updateDesiredNodes(ctx, d.Client, esClient, esReachable, expectedResources))
if results.HasError() {
return results
}
}

esState := NewMemoizingESState(ctx, esClient)
// Phase 1: apply expected StatefulSets resources and scale up.
upscaleCtx := upscaleCtx{
Expand Down
Loading