Skip to main content

So had a situation that I needed to quiesce the entire RabbitMQ cluster (controlled by the Rabbitmq-Operator) before a namespace was to be backed up.  So came up with this as a solution.  Might be useful to someone else.

If you want to follow the background info.

 

TL;DR

Stop all Rabbit messages

Backup cluster

Start Rabbit messages

 

Prerequisites:

  • Custom image that has JQ, kubectl, & Kanister tools (see Docker file)
  • K8s service account in rabbitmq-cluster-namespace (see RBAC file)
  • ImagePullSecret - as the custom image will more than likely be on a private repo

 

Solution:

  • Pre-hook - quiesceRabbitCluster
    • Use to stop the Rabbit cluster
      • target the statefulset
  • Post-hook - activateRabbitCluster
    • Use to start Rabbit cluster
      • target the statefulset
  • Error-hook - revertRabbitCluster
    • Use to make the Rabbit cluster operational
      • target the statefulset
  • Pre-hook-Restore - destroyRabbitCluster (READ CAUTION BEFORE USING)
    • DELETE rabbit cluster before restoring
      • target the statefulset

 

Docker: (had to use Ubutnu as Kanister tools do not install on Alpine; have a bug open)

# Stage 1: Build environment
FROM ubuntu as builder

# Install APT packages
RUN apt update && apt install bash git curl wget libdigest-sha-perl -y

# Install yq
RUN wget https://github.com/mikefarah/yq/releases/latest/download/yq_linux_amd64 -O /usr/bin/yq && chmod +x /usr/bin/yq

# Install kubectl
RUN curl -LO "https://dl.k8s.io/release/$(curl -L -s https://dl.k8s.io/release/stable.txt)/bin/linux/amd64/kubectl" \
&& chmod +x kubectl && mv kubectl /usr/local/bin/kubectl

# Install helm
RUN wget -c https://get.helm.sh/helm-v3.12.0-linux-amd64.tar.gz -O - | tar -xz && mv linux-amd64/helm /usr/local/bin/helm

# Install helm plugins
RUN helm plugin install https://github.com/databus23/helm-diff

RUN helm plugin install https://github.com/quintush/helm-unittest


# Install Kanister Tools
RUN curl https://raw.githubusercontent.com/kanisterio/kanister/master/scripts/get.sh | bash

# Stage 2: Final image
FROM ubuntu

# Copy necessary binaries from the build environment
COPY --from=builder /usr/bin/curl /usr/bin/curl
COPY --from=builder /usr/bin/yq /usr/bin/yq
COPY --from=builder /usr/bin/wget /usr/bin/wget
COPY --from=builder /usr/local/bin/kubectl /usr/local/bin/kubectl
COPY --from=builder /usr/local/bin/helm /usr/local/bin/helm
COPY --from=builder /root/.local/share/helm/plugins/ /root/.local/share/helm/plugins/
COPY --from=builder /usr/local/bin/kando /usr/local/bin/kando
COPY --from=builder /usr/local/bin/kanctl /usr/local/bin/kanctl

# Install additional packages needed for runtime, if any
RUN apt update && apt install git jq -y


RBAC: (SA account for the image in the blueprint to use)

# serviceAccount for Kanister k8s
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: sa-kanister-bp-rabbit
namespace: rabbitmq-cluster-namespace
...
# RBAC to exec into pods
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: rabbitmq-cluster-namespace-exec
namespace: rabbitmq-cluster-namespace
rules:
- apiGroups: a""]
resources: r"pods"]
verbs: "get", "list"]
- apiGroups: a""]
resources: r"pods/exec"]
verbs: "create"]
...
# Rolebind for sa-kanister-bp-rabbit
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: rolebind-sa-kanister-bp-rabbit
namespace: rabbitmq-cluster-namespace
subjects:
- kind: ServiceAccount
name: sa-kanister-bp-rabbit
apiGroup: ""
roleRef:
kind: Role
name: rabbitmq-cluster-namespace-exec
apiGroup: ""
...

 

Rabbitmq-cluster-prehook.yaml

# rabbitmq-cluster-prehook.yaml
---
apiVersion: cr.kanister.io/v1alpha1
kind: Blueprint
metadata:
name: rabbitmq-cluster-prehook
namespace: kasten
actions:
# Use as a pre-hook to quiesce cluster
quiesceRabbitCluster:
outputArtifacts:
rabbitClusterStatus:
keyValue:
rabbitclusteroperational: "{{ .Phases.rabbitClusterStatus.Output.rabbitclusteroperational }}"
phases:
# Get RabbitMQ Cluster status to determine if any processes should proceed
- name: rabbitClusterStatus
func: KubeTask
args:
podOverride:
serviceAccountName: "sa-kanister-bp-rabbit"
imagePullSecrets:
- name: <ImagePullSecret>
image: git.thinkon.com:5050/compass/global/images/k8s_runner:latest
namespace: "{{ .StatefulSet.Namespace }}"
command:
- bash
- -c
- |
export LOCALFILEREPO="/tmp"
export RABBITCLUSTERSTATUSFILE="rabbitcluster_status.json"
export TARGETFILE="$LOCALFILEREPO/$RABBITCLUSTERSTATUSFILE"
export NAMESPACE={{ .StatefulSet.Namespace }}
export RABBITPOD={{ index .StatefulSet.Pods 0 }}
export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
export KUBECTLEXEC="kubectl exec -n $NAMESPACE $RABBITPOD -c rabbitmq --"
$KUBECTLEXEC $RABBITSBIN/rabbitmqctl cluster_status --formatter=json > $TARGETFILE
if ! -s $TARGETFILE ]
then
echo "RabbitMQ cluster status JSON file is missing" >> 2&1
exit 1
fi
export RABBITCLUSTEROPERATIONAL="$(cat $LOCALFILEREPO/$RABBITCLUSTERSTATUSFILE | jq '.maintenance_status | to_entries | map(.value == "not under maintenance") | all')"
kando output rabbitclusteroperational $RABBITCLUSTEROPERATIONAL

# Drain RabbitMQ nodes (stop all message processing)
# This will essentially stop your Rabbit cluster
- name: drainRabbitNodes
func: KubeExecAll
args:
namespace: "{{ .StatefulSet.Namespace }}"
pods: "{{ range .StatefulSet.Pods }} {{.}}{{end}}"
containers: "rabbitmq"
command:
- bash
- -c
- |
export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
export RABBITCLUSTEROPERATIONAL={{ index .Phases.rabbitClusterStatus.Output.rabbitclusteroperational | toString }}
if $RABBITCLUSTEROPERATIONAL == "true" ]]
then
$RABBITSBIN/rabbitmq-upgrade --timeout 10 drain
else
echo "Kanister will exit: RabbitMQ cluster has nodes in maintenance mode"
exit 1
fi

# Wait for Rabbit nodes to stop
- name: quiesceWaitRabbitNodesStop
func: WaitV2
args:
timeout: 240s
conditions:
anyOf:
# Condition if all expected replicas are not ready (statefulSet=NotReady)
- condition: '{{if and (eq .status.availableReplicas 0) (not .status.readyReplicas)}}true{{else}}false{{end}}'
objectReference:
apiVersion: "v1"
group: "apps"
resource: "statefulsets"
name: "{{ .Object.metadata.name }}"
namespace: "{{ .Object.metadata.namespace }}"

# Set vhost connections to '0' (block any connections)
- name: blockVhostConnections
func: KubeTask
args:
podOverride:
serviceAccountName: "sa-kanister-bp-rabbit"
imagePullSecrets:
- name: <ImagePullSecret>
image: git.thinkon.com:5050/compass/global/images/k8s_runner:latest
namespace: "{{ .StatefulSet.Namespace }}"
command:
- bash
- -c
- |
export LOCALFILEREPO="/tmp"
export RABBITCLUSTERVHOSTSFILE="rabbitcluster_vhosts.json"
export TARGETFILE="$LOCALFILEREPO/$RABBITCLUSTERVHOSTSFILE"
export RABBITPOD={{ index .StatefulSet.Pods 0 }}
export NAMESPACE={{ .StatefulSet.Namespace }}
export KUBECTLEXEC="kubectl exec -n $NAMESPACE $RABBITPOD -c rabbitmq --"
export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
export VHOSTMAXCONNECTIONS='0'
$KUBECTLEXEC $RABBITSBIN/rabbitmqctl list_vhosts --formatter=json > $TARGETFILE
if ! -s $TARGETFILE ]
then
echo "Kanister will exit: RabbitMQ cluster vhost JSON file is missing" >> 2&1
exit 1
fi
export RABBITCLUSTERVHOSTS="$(cat $TARGETFILE | jq -r '.E].name')"
echo "$RABBITCLUSTERVHOSTS" | while IFS= read -r vhost
do
$KUBECTLEXEC $RABBITSBIN/rabbitmqctl set_vhost_limits -p $vhost "{\"max-connections\": $VHOSTMAXCONNECTIONS}"
done

# Bring Rabbit nodes back up so Kasten can snapshot them, as per normal
- name: quiesceReviveRabbitNodes
func: KubeExecAll
args:
namespace: "{{ .StatefulSet.Namespace }}"
pods: "{{ range .StatefulSet.Pods }} {{.}}{{end}}"
containers: "rabbitmq"
command:
- bash
- -c
- |
export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
$RABBITSBIN/rabbitmq-upgrade --timeout 10 revive

# Wait for Rabbit nodes to be Ready
- name: quiesceWaitRabbitNodesStart
func: WaitV2
args:
timeout: 240s
conditions:
anyOf:
# Condition if all expected replicas are ready (statefulSet=Ready)
- condition: '{{ if and (eq .status.replicas .status.readyReplicas) (eq .status.replicas .status.availableReplicas) }}true{{else}}false{{end}}'
objectReference:
apiVersion: "v1"
group: "apps"
resource: "statefulsets"
name: "{{ .Object.metadata.name }}"
namespace: "{{ .Object.metadata.namespace }}"
# Use as post-hook to put cluster back into operation
activateRabbitCluster:
phases:
# Remove vhost connection block (allow message processing)
- name: releaseVhostConnections
func: KubeTask
args:
podOverride:
serviceAccountName: "sa-kanister-bp-rabbit"
imagePullSecrets:
- name: <ImagePullSecret>
image: git.thinkon.com:5050/compass/global/images/k8s_runner:latest
namespace: "{{ .StatefulSet.Namespace }}"
command:
- bash
- -c
- |
export LOCALFILEREPO="/tmp"
export RABBITCLUSTERVHOSTSFILE="rabbitcluster_vhosts.json"
export TARGETFILE="$LOCALFILEREPO/$RABBITCLUSTERVHOSTSFILE"
export RABBITPOD={{ index .StatefulSet.Pods 0 }}
export NAMESPACE={{ .StatefulSet.Namespace }}
export KUBECTLEXEC="kubectl exec -n $NAMESPACE $RABBITPOD -c rabbitmq --"
export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
export VHOSTMAXCONNECTIONS='-1'
$KUBECTLEXEC $RABBITSBIN/rabbitmqctl list_vhosts --formatter=json > $TARGETFILE
if ! -s $TARGETFILE ]
then
echo "Kanister will exit: RabbitMQ cluster vhost JSON file is missing" >> 2&1
exit 1
fi
export RABBITCLUSTERVHOSTS="$(cat $TARGETFILE | jq -r '.E].name')"
echo "$RABBITCLUSTERVHOSTS" | while IFS= read -r vhost
do
$KUBECTLEXEC $RABBITSBIN/rabbitmqctl set_vhost_limits -p $vhost "{\"max-connections\": $VHOSTMAXCONNECTIONS}"
done

# Use as error-hook to leave the cluster operational
revertRabbitCluster:
phases:
# Remove vhost connection block (allow message processing)
- name: revertVhostConnections
func: KubeTask
args:
podOverride:
serviceAccountName: "sa-kanister-bp-rabbit"
imagePullSecrets:
- name: <ImagePullSecret>
image: git.thinkon.com:5050/compass/global/images/k8s_runner:latest
namespace: "{{ .StatefulSet.Namespace }}"
command:
- bash
- -c
- |
export LOCALFILEREPO="/tmp"
export RABBITCLUSTERVHOSTSFILE="rabbitcluster_vhosts.json"
export TARGETFILE="$LOCALFILEREPO/$RABBITCLUSTERVHOSTSFILE"
export RABBITPOD={{ index .StatefulSet.Pods 0 }}
export NAMESPACE={{ .StatefulSet.Namespace }}
export KUBECTLEXEC="kubectl exec -n $NAMESPACE $RABBITPOD -c rabbitmq --"
export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
export VHOSTMAXCONNECTIONS='-1'
$KUBECTLEXEC $RABBITSBIN/rabbitmqctl list_vhosts --formatter=json > $TARGETFILE
if ! -s $TARGETFILE ]
then
echo "Kanister will exit: RabbitMQ cluster vhost JSON file is missing" >> 2&1
exit 1
fi
export RABBITCLUSTERVHOSTS="$(cat $TARGETFILE | jq -r '.E].name')"
echo "$RABBITCLUSTERVHOSTS" | while IFS= read -r vhost
do
$KUBECTLEXEC $RABBITSBIN/rabbitmqctl set_vhost_limits -p $vhost "{\"max-connections\": $VHOSTMAXCONNECTIONS}"
done

# Bring Rabbit nodes back up
- name: revertReviveRabbitNodes
func: KubeExecAll
args:
namespace: "{{ .StatefulSet.Namespace }}"
pods: "{{ range .StatefulSet.Pods }} {{.}}{{end}}"
containers: "rabbitmq"
command:
- bash
- -c
- |
export RABBITSBIN="/opt/bitnami/rabbitmq/sbin"
$RABBITSBIN/rabbitmq-upgrade --timeout 10 revive

# Wait for Rabbit nodes to be Ready
- name: revertWaitRabbitNodesStart
func: WaitV2
args:
timeout: 240s
conditions:
anyOf:
# Condition if all expected replicas are ready (statefulSet=Ready)
- condition: '{{ if and (eq .status.replicas .status.readyReplicas) (eq .status.replicas .status.availableReplicas) }}true{{else}}false{{end}}'
objectReference:
apiVersion: "v1"
group: "apps"
resource: "statefulsets"
name: "{{ .Object.metadata.name }}"
namespace: "{{ .Object.metadata.namespace }}"

# Use a pre-restore-hook to remove the cluster
# CAUTION - this will delete your entire Rabbit cluster
# This is required do the Rabbitmq-Operator prevents Kasten from scaling down or replacing PVs
# https://github.com/rabbitmq/cluster-operator/issues/1491
destroyRabbitCluster:
phases:
- name: deleteRabbitCluster
func: KubeOps
args:
operation: delete
objectReference:
apiVersion: "v1beta1"
group: "rabbitmq.com"
resource: "rabbitmqclusters"
name: '{{trimSuffix "-server" .StatefulSet.Name}}'
namespace: "{{ .StatefulSet.Namespace }}"

 

Very interesting script and workaround.  We use RabbitMQ so might pass this along to our team.


Comment