From f723cbc281d18b57ff56c4d209afb7d825bcb7aa Mon Sep 17 00:00:00 2001 From: Alex Necula Date: Tue, 2 May 2023 10:30:49 +0300 Subject: [PATCH 1/3] Add disk removal endpoint --- Makefile | 4 +- deploy/cruisecontrol/capacityJBOD.json | 28 ++++++- deploy/docker-compose.yml | 7 +- integration_test/remove_disks_test.go | 104 +++++++++++++++++++++++++ pkg/api/remove_disks.go | 88 +++++++++++++++++++++ pkg/client/endpoints.go | 5 ++ pkg/types/broker_stats.go | 36 ++++----- 7 files changed, 247 insertions(+), 25 deletions(-) create mode 100644 integration_test/remove_disks_test.go create mode 100644 pkg/api/remove_disks.go diff --git a/Makefile b/Makefile index f22c1ea..c81b65c 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,8 @@ SHELL = /usr/bin/env bash -o pipefail .SHELLFLAGS = -ec -CRUISE_CONTROL_VERSION ?= 2.5.113 -export CRUISE_CONTROL_IMAGE ?= ghcr.io/banzaicloud/cruise-control:$(CRUISE_CONTROL_VERSION) +CRUISE_CONTROL_VERSION ?= 2.5.118-adbe-20230419-rc +export CRUISE_CONTROL_IMAGE ?= docker.io/adobe/cruise-control:$(CRUISE_CONTROL_VERSION) ##@ General diff --git a/deploy/cruisecontrol/capacityJBOD.json b/deploy/cruisecontrol/capacityJBOD.json index 9356fbb..20890b4 100644 --- a/deploy/cruisecontrol/capacityJBOD.json +++ b/deploy/cruisecontrol/capacityJBOD.json @@ -1,7 +1,33 @@ { "brokerCapacities":[ { - "brokerId": "-1", + "brokerId": "0", + "capacity": { + "DISK": { + "/var/lib/kafka/data0": "100000", + "/var/lib/kafka/data1": "100000" + }, + "CPU": "100", + "NW_IN": "10000", + "NW_OUT": "10000" + }, + "doc": "The default capacity for a broker with multiple logDirs each on a separate heterogeneous disk." + }, + { + "brokerId": "1", + "capacity": { + "DISK": { + "/var/lib/kafka/data0": "100000", + "/var/lib/kafka/data1": "100000" + }, + "CPU": "100", + "NW_IN": "10000", + "NW_OUT": "10000" + }, + "doc": "The default capacity for a broker with multiple logDirs each on a separate heterogeneous disk." + }, + { + "brokerId": "2", "capacity": { "DISK": { "/var/lib/kafka/data0": "100000", diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml index d39a356..062ac25 100644 --- a/deploy/docker-compose.yml +++ b/deploy/docker-compose.yml @@ -246,11 +246,10 @@ services: source: ./cruisecontrol target: /opt/cruise-control/config healthcheck: + # curl is not installed in the adobe image test: [ - "CMD", - "curl", - "-f", - "http://127.0.0.1:8090/" + "CMD-SHELL", + "bash -c 'echo > /dev/tcp/localhost/8090'", ] interval: 10s timeout: 5s diff --git a/integration_test/remove_disks_test.go b/integration_test/remove_disks_test.go new file mode 100644 index 0000000..6d22843 --- /dev/null +++ b/integration_test/remove_disks_test.go @@ -0,0 +1,104 @@ +/* +Copyright © 2021 Cisco and/or its affiliates. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration_test + +import ( + "github.com/banzaicloud/go-cruise-control/integration_test/helpers" + "github.com/banzaicloud/go-cruise-control/pkg/api" + "github.com/banzaicloud/go-cruise-control/pkg/types" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Remove Disks", + Label("api:remove_disks", "api:user_tasks", "api:kafka_cluster_load", "api:state"), + Serial, + func() { + const ( + brokerID = 0 + logDir = "/var/lib/kafka/data0" + pollIntervalSeconds = 15 + cruiseControlRemoveDiskTimeoutSeconds = 600 + ) + + BeforeEach(func(ctx SpecContext) { + By("waiting until Cruise Control is ready") + Eventually(ctx, func() bool { + ready, err := helpers.IsCruiseControlReady(ctx, cruisecontrol) + Expect(err).NotTo(HaveOccurred()) + return ready + }, CruiseControlReadyTimeout, pollIntervalSeconds).Should(BeTrue()) + }) + + Describe("Removing a disk in Kafka cluster", func() { + It("should return no error", func(ctx SpecContext) { + By("sending a remove request to Cruise Control") + req := &api.RemoveDisksRequest{} + + req.BrokerIDAndLogDirs = map[int32][]string{ + brokerID: {logDir}, + } + req.Reason = "integration testing" + + resp, err := cruisecontrol.RemoveDisks(ctx, req) + Expect(err).NotTo(HaveOccurred()) + Expect(resp.Failed()).To(BeFalse()) + + By("waiting until the remove task finished") + Eventually(ctx, func() bool { + finished, err := helpers.HasUserTaskFinished(ctx, cruisecontrol, resp.TaskID) + Expect(err).NotTo(HaveOccurred()) + return finished + }, cruiseControlRemoveDiskTimeoutSeconds, pollIntervalSeconds).Should(BeTrue()) + + By("checking that the disk has been drained") + req2 := api.KafkaClusterLoadRequestWithDefaults() + req2.PopulateDiskInfo = true + req2.Reason = "integration testing" + + resp2, err := cruisecontrol.KafkaClusterLoad(ctx, req2) + Expect(err).NotTo(HaveOccurred()) + Expect(resp2.Failed()).To(BeFalse()) + + Expect(resp2.Result.Brokers).ToNot(BeEmpty()) + + var affectedBroker types.BrokerLoadStats + for _, broker := range resp2.Result.Brokers { + if broker.Broker == brokerID { + affectedBroker = broker + break + } + } + + Expect(affectedBroker).ToNot(BeNil()) + + var affectedDiskState types.DiskStats + for logDir, state := range affectedBroker.DiskState { + if logDir == logDir { + affectedDiskState = state + break + } + } + + Expect(affectedDiskState).ToNot(BeNil()) + + replicas := affectedDiskState.NumReplicas + log.V(0).Info("partition replicas on broker disk", "broker_id", brokerID, "logDir", logDir, "replicas", replicas) + Expect(replicas).To(BeNumerically("==", 0)) + }) + }) + }) diff --git a/pkg/api/remove_disks.go b/pkg/api/remove_disks.go new file mode 100644 index 0000000..1a04318 --- /dev/null +++ b/pkg/api/remove_disks.go @@ -0,0 +1,88 @@ +/* +Copyright © 2021 Cisco and/or its affiliates. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package api + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/pkg/errors" + + "github.com/banzaicloud/go-cruise-control/pkg/types" +) + +const ( + EndpointRemoveDisks types.APIEndpoint = "REMOVE_DISKS" +) + +type RemoveDisksRequest struct { + types.GenericRequestWithReason + + // Map of broker id to list of disks to remove. + BrokerIDAndLogDirs map[int32][]string `param:"brokerid_and_logdirs"` + // Whether to dry-run the request or not. + DryRun bool `param:"dryrun"` +} + +type RemoveDisksResponse struct { + types.GenericResponse + + Result *types.OptimizationResult +} + +func (s RemoveDisksRequest) Validate() error { + if len(s.BrokerIDAndLogDirs) == 0 { + return errors.New("broker id and log dirs map must not be empty") + } + + return nil +} + +func (r *RemoveDisksResponse) UnmarshalResponse(resp *http.Response) error { + if err := r.GenericResponse.UnmarshalResponse(resp); err != nil { + return fmt.Errorf("failed to parse HTTP response metadata: %w", err) + } + + var bodyBytes []byte + var err error + + bodyBytes, err = io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read HTTP response body: %w", err) + } + + var d interface{} + switch resp.StatusCode { + case http.StatusOK: + r.Result = &types.OptimizationResult{} + d = r.Result + case http.StatusAccepted: + r.Progress = &types.ProgressResult{} + d = r.Progress + default: + r.Error = &types.APIError{} + d = r.Error + } + + if err = json.Unmarshal(bodyBytes, d); err != nil { + return fmt.Errorf("failed to parse JSON response: %w", err) + } + + return nil +} diff --git a/pkg/client/endpoints.go b/pkg/client/endpoints.go index 60938f1..465b580 100644 --- a/pkg/client/endpoints.go +++ b/pkg/client/endpoints.go @@ -83,6 +83,11 @@ func (c *Client) RemoveBroker(ctx context.Context, r *api.RemoveBrokerRequest) ( return resp, c.request(ctx, r, resp, api.EndpointRemoveBroker, http.MethodPost) } +func (c *Client) RemoveDisks(ctx context.Context, r *api.RemoveDisksRequest) (*api.RemoveDisksResponse, error) { + resp := &api.RemoveDisksResponse{} + return resp, c.request(ctx, r, resp, api.EndpointRemoveDisks, http.MethodPost) +} + func (c *Client) ResumeSampling(ctx context.Context, r *api.ResumeSamplingRequest) (*api.ResumeSamplingResponse, error) { resp := &api.ResumeSamplingResponse{} return resp, c.request(ctx, r, resp, api.EndpointResumeSampling, http.MethodPost) diff --git a/pkg/types/broker_stats.go b/pkg/types/broker_stats.go index 4589b57..18aa2c2 100644 --- a/pkg/types/broker_stats.go +++ b/pkg/types/broker_stats.go @@ -105,24 +105,24 @@ type HostLoadStats struct { } type BrokerLoadStats struct { - FollowerNwInRate float64 `json:"FollowerNwInRate"` - BrokerState BrokerState `json:"BrokerState"` - Broker int32 `json:"Broker"` - NwOutRate float64 `json:"NwOutRate"` - NumCore float64 `json:"NumCore"` - Host string `json:"Host"` - CPUPct float64 `json:"CpuPct"` - Replicas int32 `json:"Replicas"` - NetworkInCapacity float64 `json:"NetworkInCapacity"` - Rack string `json:"Rack"` - Leaders int32 `json:"Leaders"` - DiskCapacityMB float64 `json:"DiskCapacityMB"` - DiskMB float64 `json:"DiskMB"` - PnwOutRate float64 `json:"PnwOutRate"` - NetworkOutCapacity float64 `json:"NetworkOutCapacity"` - LeaderNwInRate float64 `json:"LeaderNwInRate"` - DiskPct float64 `json:"DiskPct"` - DiskState []DiskStats `json:"DiskState"` + FollowerNwInRate float64 `json:"FollowerNwInRate"` + BrokerState BrokerState `json:"BrokerState"` + Broker int32 `json:"Broker"` + NwOutRate float64 `json:"NwOutRate"` + NumCore float64 `json:"NumCore"` + Host string `json:"Host"` + CPUPct float64 `json:"CpuPct"` + Replicas int32 `json:"Replicas"` + NetworkInCapacity float64 `json:"NetworkInCapacity"` + Rack string `json:"Rack"` + Leaders int32 `json:"Leaders"` + DiskCapacityMB float64 `json:"DiskCapacityMB"` + DiskMB float64 `json:"DiskMB"` + PnwOutRate float64 `json:"PnwOutRate"` + NetworkOutCapacity float64 `json:"NetworkOutCapacity"` + LeaderNwInRate float64 `json:"LeaderNwInRate"` + DiskPct float64 `json:"DiskPct"` + DiskState map[string]DiskStats `json:"DiskState"` } const ( From c20cd79ee48e1cc74298fc64b7f94e3fd6feb679 Mon Sep 17 00:00:00 2001 From: Alex Necula Date: Thu, 4 May 2023 13:08:08 +0300 Subject: [PATCH 2/3] Update copyright and image --- Makefile | 2 +- integration_test/remove_broker_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index c81b65c..94e4395 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ SHELL = /usr/bin/env bash -o pipefail .SHELLFLAGS = -ec -CRUISE_CONTROL_VERSION ?= 2.5.118-adbe-20230419-rc +CRUISE_CONTROL_VERSION ?= 2.5.118-adbe-20230504 export CRUISE_CONTROL_IMAGE ?= docker.io/adobe/cruise-control:$(CRUISE_CONTROL_VERSION) ##@ General diff --git a/integration_test/remove_broker_test.go b/integration_test/remove_broker_test.go index 129c8d3..09b7f83 100644 --- a/integration_test/remove_broker_test.go +++ b/integration_test/remove_broker_test.go @@ -1,5 +1,5 @@ /* -Copyright © 2021 Cisco and/or its affiliates. All rights reserved. +Copyright © 2023 Adobe and/or its affiliates. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. From 8a3bf7a85e7db12bafde38be735d497b2cbe37d7 Mon Sep 17 00:00:00 2001 From: Alex Necula Date: Thu, 4 May 2023 13:11:06 +0300 Subject: [PATCH 3/3] Revert license change --- integration_test/remove_broker_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration_test/remove_broker_test.go b/integration_test/remove_broker_test.go index 09b7f83..129c8d3 100644 --- a/integration_test/remove_broker_test.go +++ b/integration_test/remove_broker_test.go @@ -1,5 +1,5 @@ /* -Copyright © 2023 Adobe and/or its affiliates. All rights reserved. +Copyright © 2021 Cisco and/or its affiliates. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.