Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add disk removal endpoint #1

Merged
merged 3 commits into from
May 4, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
SHELL = /usr/bin/env bash -o pipefail
.SHELLFLAGS = -ec

CRUISE_CONTROL_VERSION ?= 2.5.113
export CRUISE_CONTROL_IMAGE ?= ghcr.io/banzaicloud/cruise-control:$(CRUISE_CONTROL_VERSION)
CRUISE_CONTROL_VERSION ?= 2.5.118-adbe-20230419-rc
export CRUISE_CONTROL_IMAGE ?= docker.io/adobe/cruise-control:$(CRUISE_CONTROL_VERSION)

##@ General

Expand Down
28 changes: 27 additions & 1 deletion deploy/cruisecontrol/capacityJBOD.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,33 @@
{
"brokerCapacities":[
{
"brokerId": "-1",
"brokerId": "0",
"capacity": {
"DISK": {
"/var/lib/kafka/data0": "100000",
"/var/lib/kafka/data1": "100000"
},
"CPU": "100",
"NW_IN": "10000",
"NW_OUT": "10000"
},
"doc": "The default capacity for a broker with multiple logDirs each on a separate heterogeneous disk."
},
{
"brokerId": "1",
"capacity": {
"DISK": {
"/var/lib/kafka/data0": "100000",
"/var/lib/kafka/data1": "100000"
},
"CPU": "100",
"NW_IN": "10000",
"NW_OUT": "10000"
},
"doc": "The default capacity for a broker with multiple logDirs each on a separate heterogeneous disk."
},
{
"brokerId": "2",
"capacity": {
"DISK": {
"/var/lib/kafka/data0": "100000",
Expand Down
7 changes: 3 additions & 4 deletions deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,10 @@ services:
source: ./cruisecontrol
target: /opt/cruise-control/config
healthcheck:
# curl is not installed in the adobe image
test: [
"CMD",
"curl",
"-f",
"http://127.0.0.1:8090/"
"CMD-SHELL",
"bash -c 'echo > /dev/tcp/localhost/8090'",
]
interval: 10s
timeout: 5s
Expand Down
104 changes: 104 additions & 0 deletions integration_test/remove_disks_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
Copyright © 2021 Cisco and/or its affiliates. All rights reserved.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be Adobe


Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration_test

import (
"github.com/banzaicloud/go-cruise-control/integration_test/helpers"
"github.com/banzaicloud/go-cruise-control/pkg/api"
"github.com/banzaicloud/go-cruise-control/pkg/types"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Remove Disks",
Label("api:remove_disks", "api:user_tasks", "api:kafka_cluster_load", "api:state"),
Serial,
func() {
const (
brokerID = 0
logDir = "/var/lib/kafka/data0"
pollIntervalSeconds = 15
cruiseControlRemoveDiskTimeoutSeconds = 600
)

BeforeEach(func(ctx SpecContext) {
By("waiting until Cruise Control is ready")
Eventually(ctx, func() bool {
ready, err := helpers.IsCruiseControlReady(ctx, cruisecontrol)
Expect(err).NotTo(HaveOccurred())
return ready
}, CruiseControlReadyTimeout, pollIntervalSeconds).Should(BeTrue())
})

Describe("Removing a disk in Kafka cluster", func() {
It("should return no error", func(ctx SpecContext) {
By("sending a remove request to Cruise Control")
req := &api.RemoveDisksRequest{}

req.BrokerIDAndLogDirs = map[int32][]string{
brokerID: {logDir},
}
req.Reason = "integration testing"

resp, err := cruisecontrol.RemoveDisks(ctx, req)
Expect(err).NotTo(HaveOccurred())
Expect(resp.Failed()).To(BeFalse())

By("waiting until the remove task finished")
Eventually(ctx, func() bool {
finished, err := helpers.HasUserTaskFinished(ctx, cruisecontrol, resp.TaskID)
Expect(err).NotTo(HaveOccurred())
return finished
}, cruiseControlRemoveDiskTimeoutSeconds, pollIntervalSeconds).Should(BeTrue())

By("checking that the disk has been drained")
req2 := api.KafkaClusterLoadRequestWithDefaults()
req2.PopulateDiskInfo = true
req2.Reason = "integration testing"

resp2, err := cruisecontrol.KafkaClusterLoad(ctx, req2)
Expect(err).NotTo(HaveOccurred())
Expect(resp2.Failed()).To(BeFalse())

Expect(resp2.Result.Brokers).ToNot(BeEmpty())

var affectedBroker types.BrokerLoadStats
for _, broker := range resp2.Result.Brokers {
if broker.Broker == brokerID {
affectedBroker = broker
break
}
}

Expect(affectedBroker).ToNot(BeNil())

var affectedDiskState types.DiskStats
for logDir, state := range affectedBroker.DiskState {
if logDir == logDir {
affectedDiskState = state
break
}
}

Expect(affectedDiskState).ToNot(BeNil())

replicas := affectedDiskState.NumReplicas
log.V(0).Info("partition replicas on broker disk", "broker_id", brokerID, "logDir", logDir, "replicas", replicas)
Expect(replicas).To(BeNumerically("==", 0))
})
})
})
88 changes: 88 additions & 0 deletions pkg/api/remove_disks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
Copyright © 2021 Cisco and/or its affiliates. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package api

import (
"encoding/json"
"fmt"
"io"
"net/http"

"github.com/pkg/errors"

"github.com/banzaicloud/go-cruise-control/pkg/types"
)

const (
EndpointRemoveDisks types.APIEndpoint = "REMOVE_DISKS"
)

type RemoveDisksRequest struct {
types.GenericRequestWithReason

// Map of broker id to list of disks to remove.
BrokerIDAndLogDirs map[int32][]string `param:"brokerid_and_logdirs"`
// Whether to dry-run the request or not.
DryRun bool `param:"dryrun"`
}

type RemoveDisksResponse struct {
types.GenericResponse

Result *types.OptimizationResult
}

func (s RemoveDisksRequest) Validate() error {
if len(s.BrokerIDAndLogDirs) == 0 {
return errors.New("broker id and log dirs map must not be empty")
}

return nil
}

func (r *RemoveDisksResponse) UnmarshalResponse(resp *http.Response) error {
if err := r.GenericResponse.UnmarshalResponse(resp); err != nil {
return fmt.Errorf("failed to parse HTTP response metadata: %w", err)
}

var bodyBytes []byte
var err error

bodyBytes, err = io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read HTTP response body: %w", err)
}

var d interface{}
switch resp.StatusCode {
case http.StatusOK:
r.Result = &types.OptimizationResult{}
d = r.Result
case http.StatusAccepted:
r.Progress = &types.ProgressResult{}
d = r.Progress
default:
r.Error = &types.APIError{}
d = r.Error
}

if err = json.Unmarshal(bodyBytes, d); err != nil {
return fmt.Errorf("failed to parse JSON response: %w", err)
}

return nil
}
5 changes: 5 additions & 0 deletions pkg/client/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ func (c *Client) RemoveBroker(ctx context.Context, r *api.RemoveBrokerRequest) (
return resp, c.request(ctx, r, resp, api.EndpointRemoveBroker, http.MethodPost)
}

func (c *Client) RemoveDisks(ctx context.Context, r *api.RemoveDisksRequest) (*api.RemoveDisksResponse, error) {
resp := &api.RemoveDisksResponse{}
return resp, c.request(ctx, r, resp, api.EndpointRemoveDisks, http.MethodPost)
}

func (c *Client) ResumeSampling(ctx context.Context, r *api.ResumeSamplingRequest) (*api.ResumeSamplingResponse, error) {
resp := &api.ResumeSamplingResponse{}
return resp, c.request(ctx, r, resp, api.EndpointResumeSampling, http.MethodPost)
Expand Down
36 changes: 18 additions & 18 deletions pkg/types/broker_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,24 +105,24 @@ type HostLoadStats struct {
}

type BrokerLoadStats struct {
FollowerNwInRate float64 `json:"FollowerNwInRate"`
BrokerState BrokerState `json:"BrokerState"`
Broker int32 `json:"Broker"`
NwOutRate float64 `json:"NwOutRate"`
NumCore float64 `json:"NumCore"`
Host string `json:"Host"`
CPUPct float64 `json:"CpuPct"`
Replicas int32 `json:"Replicas"`
NetworkInCapacity float64 `json:"NetworkInCapacity"`
Rack string `json:"Rack"`
Leaders int32 `json:"Leaders"`
DiskCapacityMB float64 `json:"DiskCapacityMB"`
DiskMB float64 `json:"DiskMB"`
PnwOutRate float64 `json:"PnwOutRate"`
NetworkOutCapacity float64 `json:"NetworkOutCapacity"`
LeaderNwInRate float64 `json:"LeaderNwInRate"`
DiskPct float64 `json:"DiskPct"`
DiskState []DiskStats `json:"DiskState"`
FollowerNwInRate float64 `json:"FollowerNwInRate"`
BrokerState BrokerState `json:"BrokerState"`
Broker int32 `json:"Broker"`
NwOutRate float64 `json:"NwOutRate"`
NumCore float64 `json:"NumCore"`
Host string `json:"Host"`
CPUPct float64 `json:"CpuPct"`
Replicas int32 `json:"Replicas"`
NetworkInCapacity float64 `json:"NetworkInCapacity"`
Rack string `json:"Rack"`
Leaders int32 `json:"Leaders"`
DiskCapacityMB float64 `json:"DiskCapacityMB"`
DiskMB float64 `json:"DiskMB"`
PnwOutRate float64 `json:"PnwOutRate"`
NetworkOutCapacity float64 `json:"NetworkOutCapacity"`
LeaderNwInRate float64 `json:"LeaderNwInRate"`
DiskPct float64 `json:"DiskPct"`
DiskState map[string]DiskStats `json:"DiskState"`
}

const (
Expand Down