Skip to content

Commit

Permalink
core: add restore mon quorum in go
Browse files Browse the repository at this point in the history
this commits add a go command for
restore the mon quorum from single
good mon. In the background we,
scale down the operator first, and run some
ceph commands like remove bad mans from
mon map, extracting/injecting monmap.
And, in the end, we scale up the operator which
bring back the mon quorum.

Signed-off-by: subhamkrai <srai@redhat.com>
  • Loading branch information
subhamkrai committed Apr 27, 2023
1 parent a368ac0 commit 7f32adb
Show file tree
Hide file tree
Showing 15 changed files with 524 additions and 86 deletions.
21 changes: 21 additions & 0 deletions .github/workflows/collect-logs/action.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# This isn't to be used for the go integration tests because their logs are placed in a different location and require few extra steps.
name: Log Collector
description: Log collector for canary test
inputs:
name:
description: Name to use for the workflow
required: true

runs:
using: "composite"
steps:
- name: collect common logs
shell: bash --noprofile --norc -eo pipefail -x {0}
run: |
tests/collect-logs.sh
- name: Upload canary test result
uses: actions/upload-artifact@v2
with:
name: ${{ inputs.name }}
path: test
25 changes: 24 additions & 1 deletion .github/workflows/go-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defaults:
shell: bash --noprofile --norc -eo pipefail -x {0}

jobs:
with-krew:
go-test:
runs-on: ubuntu-20.04
steps:
- name: checkout
Expand Down Expand Up @@ -36,6 +36,17 @@ jobs:
run: |
set -e
kubectl rook-ceph ceph status
# test the mon restore to restore to mon a, delete mons b and c, then add d and e
export ROOK_PLUGIN_SKIP_PROMPTS=true
kubectl rook-ceph mons restore-quorum a
kubectl -n rook-ceph wait pod -l app=rook-ceph-mon-b --for=delete --timeout=90s
kubectl -n rook-ceph wait pod -l app=rook-ceph-mon-c --for=delete --timeout=90s
tests/github-action-helper.sh wait_for_three_mons rook-ceph
kubectl -n rook-ceph wait deployment rook-ceph-mon-d --for condition=Available=True --timeout=90s
kubectl -n rook-ceph wait deployment rook-ceph-mon-e --for condition=Available=True --timeout=90s
kubectl rook-ceph mons
kubectl rook-ceph rbd ls replicapool
Expand All @@ -55,3 +66,15 @@ jobs:
kubectl rook-ceph rook status all
kubectl rook-ceph rook status cephobjectstores
kubectl rook-ceph rook purge-osd 0 --force
- name: collect common logs
if: always()
uses: ./.github/workflows/collect-logs
with:
name: go-test

- name: consider debugging
if: failure()
uses: mxschmitt/action-tmate@v3
with:
use-tmate: ${{ secrets.USE_TMATE }}
2 changes: 1 addition & 1 deletion cmd/commands/ceph.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ var CephCmd = &cobra.Command{
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
context := GetContext()
fmt.Println(exec.RunCommandInOperatorPod(context, cmd.Use, args, OperatorNamespace, CephClusterNamespace))
fmt.Println(exec.RunCommandInOperatorPod(context, cmd.Use, args, OperatorNamespace, CephClusterNamespace, true))
},
}
19 changes: 18 additions & 1 deletion cmd/commands/mons.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,27 @@ var MonCmd = &cobra.Command{
Use: "mons",
Short: "Output mon endpoints",
DisableFlagParsing: true,
Run: func(cmd *cobra.Command, args []string) {
Args: cobra.MaximumNArgs(1),
Run: func(_ *cobra.Command, args []string) {
if len(args) == 0 {
context := GetContext()
fmt.Println(mons.GetMonEndpoint(context, CephClusterNamespace))
}
},
}

// RestoreQuorum represents the mons command
var RestoreQuorum = &cobra.Command{
Use: "restore-quorum",
Short: "When quorum is lost, restore quorum to the remaining healthy mon",
DisableFlagParsing: true,
Args: cobra.ExactArgs(1),
Run: func(_ *cobra.Command, args []string) {
context := GetContext()
mons.RestoreQuorum(context, OperatorNamespace, CephClusterNamespace, args[0])
},
}

func init() {
MonCmd.AddCommand(RestoreQuorum)
}
2 changes: 1 addition & 1 deletion cmd/commands/rbd.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ var RbdCmd = &cobra.Command{
Args: cobra.MinimumNArgs(1),
Run: func(cmd *cobra.Command, args []string) {
context := GetContext()
fmt.Println(exec.RunCommandInOperatorPod(context, cmd.Use, args, OperatorNamespace, CephClusterNamespace))
fmt.Println(exec.RunCommandInOperatorPod(context, cmd.Use, args, OperatorNamespace, CephClusterNamespace, true))
},
}
2 changes: 1 addition & 1 deletion cmd/commands/rook.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var versionCmd = &cobra.Command{
Args: cobra.NoArgs,
Run: func(cmd *cobra.Command, args []string) {
context := GetContext()
fmt.Println(exec.RunCommandInOperatorPod(context, "rook", []string{cmd.Use}, OperatorNamespace, CephClusterNamespace))
fmt.Println(exec.RunCommandInOperatorPod(context, "rook", []string{cmd.Use}, OperatorNamespace, CephClusterNamespace, true))
},
}

Expand Down
31 changes: 14 additions & 17 deletions kubectl-rook-ceph.sh
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ function path_cm_rook_ceph_operator_config() {
# 'kubectl rook-ceph mons' commands
####################################################################################################

function run_mons_command () {
function run_mons_command() {
if [ "$#" -ge 1 ] && [ "$1" = "restore-quorum" ]; then
shift # remove the subcommand from the front of the arg list
run_restore_quorum "$@"
Expand All @@ -253,16 +253,16 @@ function wait_for_deployment_to_be_running() {
function run_restore_quorum() {
parse_flags parse_image_flag "$@" # parse flags before the good mon name
[[ -z "${REMAINING_ARGS[0]:-""}" ]] && fail_error "Missing healthy mon name"
good_mon="${REMAINING_ARGS[0]}" # get the good mon being used to restore quorum
shift # remove the healthy mon from the front of the arg list
REMAINING_ARGS=("${REMAINING_ARGS[@]:1}") # remove mon name from remaining args
end_of_command_parsing "$@" # end of command tree
good_mon="${REMAINING_ARGS[0]}" # get the good mon being used to restore quorum
shift # remove the healthy mon from the front of the arg list
REMAINING_ARGS=("${REMAINING_ARGS[@]:1}") # remove mon name from remaining args
end_of_command_parsing "$@" # end of command tree

# Parse the endpoints configmap for the mon endpoints
bad_mons=()
mon_endpoints=$(KUBECTL_NS_CLUSTER get cm rook-ceph-mon-endpoints -o jsonpath='{.data.data}')
# split the endpoints into an array, separated by the comma
for single_mon in ${mon_endpoints//,/ } ; do
for single_mon in ${mon_endpoints//,/ }; do
mon_name=$(echo "${single_mon/=/ }" | awk '{print $1}')
mon_endpoint=$(echo "${single_mon/=/ }" | awk '{print $2}')
echo "mon=$mon_name, endpoint=$mon_endpoint"
Expand Down Expand Up @@ -335,12 +335,11 @@ function run_restore_quorum() {
--public-bind-addr=$ROOK_POD_IP \
--extract-monmap=$monmap_path

info_msg "Printing monmap"; \
info_msg "Printing monmap"
KUBECTL_NS_CLUSTER exec deploy/rook-ceph-mon-$good_mon-debug -c mon -- monmaptool --print $monmap_path

# remove all the mons except the good one
for bad_mon in "${bad_mons[@]}"
do
for bad_mon in "${bad_mons[@]}"; do
info_msg "Removing mon $bad_mon"
KUBECTL_NS_CLUSTER exec deploy/rook-ceph-mon-$good_mon-debug -c mon -- monmaptool $monmap_path --rm $bad_mon
done
Expand Down Expand Up @@ -381,8 +380,7 @@ function run_restore_quorum() {
info_msg "Purging the bad mons: ${bad_mons[*]}"
# ignore errors purging old mons if their resources don't exist
set +e
for bad_mon in "${bad_mons[@]}"
do
for bad_mon in "${bad_mons[@]}"; do
info_msg "purging old mon: $bad_mon"
KUBECTL_NS_CLUSTER delete deploy rook-ceph-mon-$bad_mon
KUBECTL_NS_CLUSTER delete svc rook-ceph-mon-$bad_mon
Expand Down Expand Up @@ -433,8 +431,7 @@ function wait_for_mon_status_response() {
sleep_time=5

exit_status=1
while [[ $exit_status != 0 ]]
do
while [[ $exit_status != 0 ]]; do
# Don't fail the script if the ceph command fails
set +e
KUBECTL_NS_CLUSTER exec deploy/rook-ceph-tools -- ceph status --connect-timeout=3
Expand Down Expand Up @@ -642,8 +639,8 @@ function run_start_debug() {
# 3) debug start deploymentName
parse_flags parse_image_flag "$@" # parse flags before the deployment name
[[ -z "${REMAINING_ARGS[0]:-""}" ]] && fail_error "Missing mon or osd deployment name"
deployment_name="${REMAINING_ARGS[0]}" # get deployment name
REMAINING_ARGS=("${REMAINING_ARGS[@]:1}") # remove deploy name from remaining args
deployment_name="${REMAINING_ARGS[0]}" # get deployment name
REMAINING_ARGS=("${REMAINING_ARGS[@]:1}") # remove deploy name from remaining args
set +u
parse_flags parse_image_flag "${REMAINING_ARGS[@]}" # parse flags after the deployment name
set -u
Expand Down Expand Up @@ -694,8 +691,8 @@ function run_start_debug() {
spec:
$deployment_spec
EOF
info_msg "ensure the debug deployment $deployment_name is scaled up"
KUBECTL_NS_CLUSTER scale deployments "$deployment_name-debug" --replicas=1
info_msg "ensure the debug deployment $deployment_name is scaled up"
KUBECTL_NS_CLUSTER scale deployments "$deployment_name-debug" --replicas=1
}

function run_stop_debug() {
Expand Down
47 changes: 17 additions & 30 deletions pkg/debug/start_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,11 @@ import (
"github.com/rook/kubectl-rook-ceph/pkg/k8sutil"
appsv1 "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
corev1 "k8s.io/api/core/v1"
kerrors "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func StartDebug(context *k8sutil.Context, clusterNamespace, deploymentName, alternateImageValue string) {

err := startDebug(context, clusterNamespace, deploymentName, alternateImageValue)
if err != nil {
fmt.Println(err)
Expand All @@ -41,11 +39,14 @@ func StartDebug(context *k8sutil.Context, clusterNamespace, deploymentName, alte
}

func startDebug(context *k8sutil.Context, clusterNamespace, deploymentName, alternateImageValue string) error {
deployment, err := verifyDeploymentExists(context, clusterNamespace, deploymentName)
originalDeployment, err := GetDeployment(context, clusterNamespace, deploymentName)
if err != nil {
return fmt.Errorf("Missing mon or osd deployment name %s. %v\n", deploymentName, err)
}

// We need to dereference the deployment as it is required for the debug deployment
deployment := *originalDeployment

if alternateImageValue != "" {
log.Printf("setting debug image to %s\n", alternateImageValue)
deployment.Spec.Template.Spec.Containers[0].Image = alternateImageValue
Expand All @@ -62,23 +63,21 @@ func startDebug(context *k8sutil.Context, clusterNamespace, deploymentName, alte
deployment.Spec.Template.Spec.Containers[0].Command = []string{"sleep", "infinity"}
deployment.Spec.Template.Spec.Containers[0].Args = []string{}

if err := updateDeployment(context, clusterNamespace, deployment); err != nil {
return fmt.Errorf("Failed to update deployment %s. %v\n", deployment.Name, err)
}

deploymentPodName, err := waitForPodToRun(context, clusterNamespace, deployment.Spec)
labelSelector := fmt.Sprintf("ceph_daemon_type=%s,ceph_daemon_id=%s", deployment.Spec.Template.Labels["ceph_daemon_type"], deployment.Spec.Template.Labels["ceph_daemon_id"])
deploymentPodName, err := k8sutil.WaitForPodToRun(context, clusterNamespace, labelSelector)
if err != nil {
fmt.Println(err)
return err
}

if err := setDeploymentScale(context, clusterNamespace, deployment.Name, 0); err != nil {
if err := SetDeploymentScale(context, clusterNamespace, deployment.Name, 0); err != nil {
return err
}
fmt.Printf("deployment %s scaled down\n", deployment.Name)

fmt.Printf("waiting for the deployment pod %s to be deleted\n", deploymentPodName)
fmt.Printf("waiting for the deployment pod %s to be deleted\n", deploymentPodName.Name)

err = waitForPodDeletion(context, clusterNamespace, deploymentPodName)
err = waitForPodDeletion(context, clusterNamespace, deploymentName)
if err != nil {
fmt.Println(err)
return err
Expand All @@ -99,13 +98,14 @@ func startDebug(context *k8sutil.Context, clusterNamespace, deploymentName, alte
}
fmt.Printf("ensure the debug deployment %s is scaled up\n", deploymentName)

if err := setDeploymentScale(context, clusterNamespace, debugDeployment.Name, 1); err != nil {
if err := SetDeploymentScale(context, clusterNamespace, debugDeployment.Name, 1); err != nil {
return err
}

return nil
}

func setDeploymentScale(context *k8sutil.Context, clusterNamespace, deploymentName string, scaleCount int) error {
func SetDeploymentScale(context *k8sutil.Context, clusterNamespace, deploymentName string, scaleCount int) error {
scale := &autoscalingv1.Scale{
ObjectMeta: v1.ObjectMeta{
Name: deploymentName,
Expand All @@ -122,11 +122,14 @@ func setDeploymentScale(context *k8sutil.Context, clusterNamespace, deploymentNa
return nil
}

func verifyDeploymentExists(context *k8sutil.Context, clusterNamespace, deploymentName string) (*appsv1.Deployment, error) {
func GetDeployment(context *k8sutil.Context, clusterNamespace, deploymentName string) (*appsv1.Deployment, error) {
fmt.Printf("fetching the deployment %s to be running\n", deploymentName)
deployment, err := context.Clientset.AppsV1().Deployments(clusterNamespace).Get(ctx.TODO(), deploymentName, v1.GetOptions{})
if err != nil {
fmt.Printf("deployment %s doesn't exist. %v", deploymentName, err)
return nil, err
}
fmt.Printf("deployment %s exists\n", deploymentName)
return deployment, nil
}

Expand All @@ -138,22 +141,6 @@ func updateDeployment(context *k8sutil.Context, clusterNamespace string, deploym
return nil
}

func waitForPodToRun(context *k8sutil.Context, clusterNamespace string, deploymentSpec appsv1.DeploymentSpec) (string, error) {
labelSelector := fmt.Sprintf("ceph_daemon_type=%s,ceph_daemon_id=%s", deploymentSpec.Template.Labels["ceph_daemon_type"], deploymentSpec.Template.Labels["ceph_daemon_id"])
for i := 0; i < 60; i++ {
pod, _ := context.Clientset.CoreV1().Pods(clusterNamespace).List(ctx.TODO(), v1.ListOptions{LabelSelector: labelSelector})
if pod.Items[0].Status.Phase == corev1.PodRunning && pod.Items[0].DeletionTimestamp.IsZero() {
return pod.Items[0].Name, nil
}

fmt.Println("waiting for pod to be running")
time.Sleep(time.Second * 5)
}

return "", fmt.Errorf("No pod with labels matching %s:%s", deploymentSpec.Template.Labels, deploymentSpec.Template.Labels)

}

func waitForPodDeletion(context *k8sutil.Context, clusterNamespace, podName string) error {
for i := 0; i < 60; i++ {
_, err := context.Clientset.CoreV1().Pods(clusterNamespace).Get(ctx.TODO(), podName, v1.GetOptions{})
Expand Down
5 changes: 2 additions & 3 deletions pkg/debug/stop_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,11 @@ func StopDebug(context *k8sutil.Context, clusterNamespace, deploymentName string
}

func stopDebug(context *k8sutil.Context, clusterNamespace, deploymentName string) error {

if !strings.HasSuffix(deploymentName, "-debug") {
deploymentName = deploymentName + "-debug"
}

debugDeployment, err := verifyDeploymentExists(context, clusterNamespace, deploymentName)
debugDeployment, err := GetDeployment(context, clusterNamespace, deploymentName)
if err != nil {
return fmt.Errorf("Missing mon or osd debug deployment name %s. %v\n", deploymentName, err)
}
Expand All @@ -55,7 +54,7 @@ func stopDebug(context *k8sutil.Context, clusterNamespace, deploymentName string
}

original_deployment_name := strings.ReplaceAll(deploymentName, "-debug", "")
if err := setDeploymentScale(context, clusterNamespace, original_deployment_name, 1); err != nil {
if err := SetDeploymentScale(context, clusterNamespace, original_deployment_name, 1); err != nil {
return err
}
return nil
Expand Down
Loading

0 comments on commit 7f32adb

Please sign in to comment.