Skip to main content

RabbitMQ Operator - Quiesce Cluster Blueprint


  • Comes here often
  • 4 comments

So had a situation that I needed to quiesce the entire RabbitMQ cluster (controlled by the Rabbitmq-Operator) before a namespace was to be backed up.  So came up with this as a solution.  Might be useful to someone else.

If you want to follow the background info.

 

TL;DR

Stop all Rabbit messages

Backup cluster

Start Rabbit messages

 

Prerequisites:

  • Custom image that has JQ, kubectl, & Kanister tools (see Docker file)
  • K8s service account in rabbitmq-cluster-namespace (see RBAC file)
  • ImagePullSecret - as the custom image will more than likely be on a private repo

 

Solution:

  • Pre-hook - quiesceRabbitCluster
    • Use to stop the Rabbit cluster
      • target the statefulset
  • Post-hook - activateRabbitCluster
    • Use to start Rabbit cluster
      • target the statefulset
  • Error-hook - revertRabbitCluster
    • Use to make the Rabbit cluster operational
      • target the statefulset
  • Pre-hook-Restore - destroyRabbitCluster (READ CAUTION BEFORE USING)
    • DELETE rabbit cluster before restoring
      • target the statefulset

 

Docker: (had to use Ubutnu as Kanister tools do not install on Alpine; have a bug open)

# Stage 1: Build environment
FROM ubuntu as builder

# Install APT packages
RUN apt update && apt install bash git curl wget libdigest-sha-perl -y

# Install yq
RUN wget https://github.com/mikefarah/yq/releases/latest/download/yq_linux_amd64 -O /usr/bin/yq && chmod +x /usr/bin/yq

# Install kubectl
RUN curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" \
    && chmod +x kubectl && mv kubectl /usr/local/bin/kubectl

# Install helm
RUN wget -c https://get.helm.sh/helm-v3.12.0-linux-amd64.tar.gz -O - | tar -xz && mv linux-amd64/helm /usr/local/bin/helm

# Install helm plugins
RUN helm plugin install https://github.com/databus23/helm-diff

RUN helm plugin install https://github.com/quintush/helm-unittest


# Install Kanister Tools
RUN curl https://raw.githubusercontent.com/kanisterio/kanister/master/scripts/get.sh | bash

# Stage 2: Final image
FROM ubuntu

# Copy necessary binaries from the build environment
COPY --from=builder /usr/bin/curl /usr/bin/curl
COPY --from=builder /usr/bin/yq /usr/bin/yq
COPY --from=builder /usr/bin/wget /usr/bin/wget
COPY --from=builder /usr/local/bin/kubectl /usr/local/bin/kubectl
COPY --from=builder /usr/local/bin/helm /usr/local/bin/helm
COPY --from=builder /root/.local/share/helm/plugins/ /root/.local/share/helm/plugins/
COPY --from=builder /usr/local/bin/kando /usr/local/bin/kando
COPY --from=builder /usr/local/bin/kanctl /usr/local/bin/kanctl

# Install additional packages needed for runtime, if any
RUN apt update && apt install git jq -y


RBAC: (SA account for the image in the blueprint to use)

# serviceAccount for Kanister k8s
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: sa-kanister-bp-rabbit
  namespace: rabbitmq-cluster-namespace
...
# RBAC to exec into pods
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: rabbitmq-cluster-namespace-exec
  namespace: rabbitmq-cluster-namespace
rules:
  - apiGroups: [""]
    resources: ["pods"]
    verbs: ["get", "list"]
  - apiGroups: [""]
    resources: ["pods/exec"]
    verbs: ["create"]
...
# Rolebind for sa-kanister-bp-rabbit
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: rolebind-sa-kanister-bp-rabbit
  namespace: rabbitmq-cluster-namespace
subjects:
- kind: ServiceAccount
  name: sa-kanister-bp-rabbit
  apiGroup: ""
roleRef:
  kind: Role
  name: rabbitmq-cluster-namespace-exec
  apiGroup: ""
...

 

Rabbitmq-cluster-prehook.yaml

# rabbitmq-cluster-prehook.yaml
---
apiVersion: cr.kanister.io/v1alpha1
kind: Blueprint
metadata:
  name: rabbitmq-cluster-prehook
  namespace: kasten
actions:
  # Use as a pre-hook to quiesce cluster
  quiesceRabbitCluster:
    outputArtifacts:
      rabbitClusterStatus:
        keyValue:
          rabbitclusteroperational: "{{ .Phases.rabbitClusterStatus.Output.rabbitclusteroperational }}"
    phases:
        # Get RabbitMQ Cluster status to determine if any processes should proceed
      - name: rabbitClusterStatus
        func: KubeTask
        args:
          podOverride:
            serviceAccountName: "sa-kanister-bp-rabbit"
            imagePullSecrets:
              - name: <ImagePullSecret>
          image: git.thinkon.com:5050/compass/global/images/k8s_runner:latest
          namespace: "{{ .StatefulSet.Namespace }}"
          command:
            - bash
            - -c
            - |
              export LOCALFILEREPO="/tmp"
              export RABBITCLUSTERSTATUSFILE="rabbitcluster_status.json"
              export TARGETFILE="$LOCALFILEREPO/$RABBITCLUSTERSTATUSFILE"
              export NAMESPACE={{ .StatefulSet.Namespace }}
              export RABBITPOD={{ index .StatefulSet.Pods 0 }}
              export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
              export KUBECTLEXEC="kubectl exec -n $NAMESPACE $RABBITPOD -c rabbitmq --"
              $KUBECTLEXEC $RABBITSBIN/rabbitmqctl cluster_status --formatter=json > $TARGETFILE
              if [ ! -s $TARGETFILE ]
              then
                echo "RabbitMQ cluster status JSON file is missing" >> 2&1
                exit 1
              fi
              export RABBITCLUSTEROPERATIONAL="$(cat $LOCALFILEREPO/$RABBITCLUSTERSTATUSFILE | jq '.maintenance_status | to_entries | map(.value == "not under maintenance") | all')"
              kando output rabbitclusteroperational $RABBITCLUSTEROPERATIONAL

        # Drain RabbitMQ nodes (stop all message processing)
        # This will essentially stop your Rabbit cluster
      - name: drainRabbitNodes
        func: KubeExecAll
        args:
          namespace: "{{ .StatefulSet.Namespace }}"
          pods: "{{ range .StatefulSet.Pods }} {{.}}{{end}}"
          containers: "rabbitmq"
          command:
            - bash
            - -c
            - |
              export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
              export RABBITCLUSTEROPERATIONAL={{ index .Phases.rabbitClusterStatus.Output.rabbitclusteroperational | toString }}
              if [[ $RABBITCLUSTEROPERATIONAL == "true" ]]
              then
                $RABBITSBIN/rabbitmq-upgrade --timeout 10 drain
              else
                echo "Kanister will exit: RabbitMQ cluster has nodes in maintenance mode"
                exit 1
              fi

        # Wait for Rabbit nodes to stop
      - name: quiesceWaitRabbitNodesStop
        func: WaitV2
        args:
          timeout: 240s
          conditions:
            anyOf:
                # Condition if all expected replicas are not ready (statefulSet=NotReady)
              - condition: '{{if and (eq .status.availableReplicas 0) (not .status.readyReplicas)}}true{{else}}false{{end}}'
                objectReference:
                  apiVersion: "v1"
                  group: "apps"
                  resource: "statefulsets"
                  name: "{{ .Object.metadata.name }}"
                  namespace: "{{ .Object.metadata.namespace }}"

        # Set vhost connections to '0' (block any connections)
      - name: blockVhostConnections
        func: KubeTask
        args:
          podOverride:
            serviceAccountName: "sa-kanister-bp-rabbit"
            imagePullSecrets:
              - name: <ImagePullSecret>
          image: git.thinkon.com:5050/compass/global/images/k8s_runner:latest
          namespace: "{{ .StatefulSet.Namespace }}"
          command:
            - bash
            - -c
            - |
              export LOCALFILEREPO="/tmp"
              export RABBITCLUSTERVHOSTSFILE="rabbitcluster_vhosts.json"
              export TARGETFILE="$LOCALFILEREPO/$RABBITCLUSTERVHOSTSFILE"
              export RABBITPOD={{ index .StatefulSet.Pods 0 }}
              export NAMESPACE={{ .StatefulSet.Namespace }}
              export KUBECTLEXEC="kubectl exec -n $NAMESPACE $RABBITPOD -c rabbitmq --"
              export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
              export VHOSTMAXCONNECTIONS='0'
              $KUBECTLEXEC $RABBITSBIN/rabbitmqctl list_vhosts --formatter=json > $TARGETFILE
              if [ ! -s $TARGETFILE ]
              then
                echo "Kanister will exit: RabbitMQ cluster vhost JSON file is missing" >> 2&1
                exit 1
              fi
              export RABBITCLUSTERVHOSTS="$(cat $TARGETFILE | jq -r '.[].name')"
              echo "$RABBITCLUSTERVHOSTS" | while IFS= read -r vhost
              do
                $KUBECTLEXEC $RABBITSBIN/rabbitmqctl set_vhost_limits -p $vhost "{\"max-connections\": $VHOSTMAXCONNECTIONS}"
              done

        # Bring Rabbit nodes back up so Kasten can snapshot them, as per normal
      - name: quiesceReviveRabbitNodes
        func: KubeExecAll
        args:
          namespace: "{{ .StatefulSet.Namespace }}"
          pods: "{{ range .StatefulSet.Pods }} {{.}}{{end}}"
          containers: "rabbitmq"
          command:
            - bash
            - -c
            - |
              export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
              $RABBITSBIN/rabbitmq-upgrade --timeout 10 revive

        # Wait for Rabbit nodes to be Ready
      - name: quiesceWaitRabbitNodesStart
        func: WaitV2
        args:
          timeout: 240s
          conditions:
            anyOf:
                # Condition if all expected replicas are ready (statefulSet=Ready)
              - condition: '{{ if and (eq .status.replicas .status.readyReplicas) (eq .status.replicas .status.availableReplicas) }}true{{else}}false{{end}}'
                objectReference:
                  apiVersion: "v1"
                  group: "apps"
                  resource: "statefulsets"
                  name: "{{ .Object.metadata.name }}"
                  namespace: "{{ .Object.metadata.namespace }}"
  # Use as post-hook to put cluster back into operation
  activateRabbitCluster:
    phases:
        # Remove vhost connection block (allow message processing)
      - name: releaseVhostConnections
        func: KubeTask
        args:
          podOverride:
            serviceAccountName: "sa-kanister-bp-rabbit"
            imagePullSecrets:
              - name: <ImagePullSecret>
          image: git.thinkon.com:5050/compass/global/images/k8s_runner:latest
          namespace: "{{ .StatefulSet.Namespace }}"
          command:
            - bash
            - -c
            - |
              export LOCALFILEREPO="/tmp"
              export RABBITCLUSTERVHOSTSFILE="rabbitcluster_vhosts.json"
              export TARGETFILE="$LOCALFILEREPO/$RABBITCLUSTERVHOSTSFILE"
              export RABBITPOD={{ index .StatefulSet.Pods 0 }}
              export NAMESPACE={{ .StatefulSet.Namespace }}
              export KUBECTLEXEC="kubectl exec -n $NAMESPACE $RABBITPOD -c rabbitmq --"
              export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
              export VHOSTMAXCONNECTIONS='-1'
              $KUBECTLEXEC $RABBITSBIN/rabbitmqctl list_vhosts --formatter=json > $TARGETFILE
              if [ ! -s $TARGETFILE ]
              then
                echo "Kanister will exit: RabbitMQ cluster vhost JSON file is missing" >> 2&1
                exit 1
              fi
              export RABBITCLUSTERVHOSTS="$(cat $TARGETFILE | jq -r '.[].name')"
              echo "$RABBITCLUSTERVHOSTS" | while IFS= read -r vhost
              do
                $KUBECTLEXEC $RABBITSBIN/rabbitmqctl set_vhost_limits -p $vhost "{\"max-connections\": $VHOSTMAXCONNECTIONS}"
              done

  # Use as error-hook to leave the cluster operational
  revertRabbitCluster:
    phases:
        # Remove vhost connection block (allow message processing)
      - name: revertVhostConnections
        func: KubeTask
        args:
          podOverride:
            serviceAccountName: "sa-kanister-bp-rabbit"
            imagePullSecrets:
              - name: <ImagePullSecret>
          image: git.thinkon.com:5050/compass/global/images/k8s_runner:latest
          namespace: "{{ .StatefulSet.Namespace }}"
          command:
            - bash
            - -c
            - |
              export LOCALFILEREPO="/tmp"
              export RABBITCLUSTERVHOSTSFILE="rabbitcluster_vhosts.json"
              export TARGETFILE="$LOCALFILEREPO/$RABBITCLUSTERVHOSTSFILE"
              export RABBITPOD={{ index .StatefulSet.Pods 0 }}
              export NAMESPACE={{ .StatefulSet.Namespace }}
              export KUBECTLEXEC="kubectl exec -n $NAMESPACE $RABBITPOD -c rabbitmq --"
              export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
              export VHOSTMAXCONNECTIONS='-1'
              $KUBECTLEXEC $RABBITSBIN/rabbitmqctl list_vhosts --formatter=json > $TARGETFILE
              if [ ! -s $TARGETFILE ]
              then
                echo "Kanister will exit: RabbitMQ cluster vhost JSON file is missing" >> 2&1
                exit 1
              fi
              export RABBITCLUSTERVHOSTS="$(cat $TARGETFILE | jq -r '.[].name')"
              echo "$RABBITCLUSTERVHOSTS" | while IFS= read -r vhost
              do
                $KUBECTLEXEC $RABBITSBIN/rabbitmqctl set_vhost_limits -p $vhost "{\"max-connections\": $VHOSTMAXCONNECTIONS}"
              done      

        # Bring Rabbit nodes back up
      - name: revertReviveRabbitNodes
        func: KubeExecAll
        args:
          namespace: "{{ .StatefulSet.Namespace }}"
          pods: "{{ range .StatefulSet.Pods }} {{.}}{{end}}"
          containers: "rabbitmq"
          command:
            - bash
            - -c
            - |
              export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
              $RABBITSBIN/rabbitmq-upgrade --timeout 10 revive

        # Wait for Rabbit nodes to be Ready 
      - name: revertWaitRabbitNodesStart
        func: WaitV2
        args:
          timeout: 240s
          conditions:
            anyOf:
                # Condition if all expected replicas are ready (statefulSet=Ready)
              - condition: '{{ if and (eq .status.replicas .status.readyReplicas) (eq .status.replicas .status.availableReplicas) }}true{{else}}false{{end}}'
                objectReference:
                  apiVersion: "v1"
                  group: "apps"
                  resource: "statefulsets"
                  name: "{{ .Object.metadata.name }}"
                  namespace: "{{ .Object.metadata.namespace }}"

  # Use a pre-restore-hook to remove the cluster
  # CAUTION - this will delete your entire Rabbit cluster
  # This is required do the Rabbitmq-Operator prevents Kasten from scaling down or replacing PVs
  # https://github.com/rabbitmq/cluster-operator/issues/1491
  destroyRabbitCluster:
    phases:
      - name: deleteRabbitCluster
        func: KubeOps
        args:
          operation: delete
          objectReference:
            apiVersion: "v1beta1"
            group: "rabbitmq.com"
            resource: "rabbitmqclusters"
            name: '{{trimSuffix "-server" .StatefulSet.Name}}'
            namespace: "{{ .StatefulSet.Namespace }}"

 

1 comment

Chris.Childerhose
Forum|alt.badge.img+21
  • Veeam Legend, Veeam Vanguard
  • 8486 comments
  • November 28, 2023

Very interesting script and workaround.  We use RabbitMQ so might pass this along to our team.


Comment