~/
~/

Contents

Миграция с classic на quorum queue в RabbitMQ

В этой статье мы будем мигрировать с классических очередей на кворум очереди в RabbitMQ

Начиная с версии 4.0 классические очереди будут удалены, а depricated они были еще в версии 3.9, весь текст ниже относится к версии RabbitMQ 3.12

image

  • Очереди кворума обладают более высоким уровнем пропускной способности почти во всех случаях использования. Очередь кворума может поддерживать пропускную способность 30000 сообщений (при использовании сообщений размером 1 кб), обеспечивая при этом высокий уровень безопасности данных и реплицируя их на все 3 узла в кластере. Классические зеркальные очереди обеспечивают лишь треть от этой пропускной способности и гораздо более низкий уровень безопасности данных
  • Очереди кворума более надежны, быстрее для большинства рабочих нагрузок и не требуют особого обслуживания.

Не все параметры классических очередей на 100% совместимы с кворум очередями, ниже табличка с параметрами, мы пройдемся по самым критичным и разберемся с тем, что нам грозит если на них просто забить

image

Warning

Матрица немного отличается в зависимости от того с какой версии на какую вы переезжаете, учитывайте это

durable означает, что очередь будет сохранена после рестарта брокера, non-durable более не поддерживается, если это кому-то было нужно…

Эксклюзивная очередь может использоваться только подключением объявившим эту очередь, такая очередь по своей природе является временной, потому что не может пережить “свое подключение”, более не поддерживается

В сценарии когда консьюмер подключен к очереди в которую попадают приоритетные сообщения, такие сообщения не ожидают когда консьюмер подтвердит их получение, все сообщения доставляются немедленно

Так же есть интересная особенность таких сообщений:

Tip

Скажем у нас есть консьюмер qos которого равен 10, при попадании в очередь 10 сообщений без приоритета все они будут отправлены консьюмеру на обработку и qos будет заполнен Если тут же в очередь пападут еще 5 сообщений с приоритетом, им придется ждать обработки первых 10 сообщений, поскольку qos заполнен Так же на все это тратится большее колличесвто ресурсов вашего кластера

Такое поведение как минимум странно, и лучше всего использовать prefetch для ваших консьюмеров, явно устанавливая колличество сообщений которое они могут принять, в совокупности с явным подтверждением получения сообщений

Объявляется при подключении к очереди и означает, что у очереди может быть только один эксклюзивный консьюмер, сработает только если в очереди еще нет консьюмеров, более не поддерживается

Про QoS мы говорили в message priority, но QoS бывает разный, глобальный и не глобальный

  • Не глобальный это QoS per consumer когда колличество сообщений которое консьюмер может принять устанавливается самим консьюмером

  • Глобальный устанавливается на весь канал, для всех консьюмеров использующих этот канал

Имена очередей сгенерированные сервером автоматически, если имя не передано явно

  • Blue-Green Deployment Перезд между blue и green кластером, в этом случае необходимо подготовить полностью новый кластер “рядом” и переехать на него, мы выберем не этот путь, но подходы будут похожи

  • Миграция между виртуальными хостами в рамках одного кластера (то, что нам нужно)

  • Миграция в рамках одного кластера и одного виртуального хоста Отключаем всех продюсеров и консьюмеров, пересоздаем вхост, подключаем всех обратно (такой метод едва ли подойдет кому то в продакшн среде)

Note

Все манипуляции будут выполнены на тестовом кластере kubernetes и rabbitmq, но максимально приближены к реальным условиям и основаны на реальных событиях

У нас имеется:

  • Классический вхост
  • Немного тестовых очередей
  • Немного тестовых эксченджей

image

image

Все новые очереди наследуют свой тип от вхоста, если этого не указать явно

image

Имеем classic vhost (с него мы будем мигрировать), и quorum vhost (на него мы поедем)

image

Проще всего проделать это в консоли любого пода RabbitMQ

rabbitmqctl set_parameter federation-upstream quorum-migration-upstream --vhost quorum '{"uri":"amqp:///classic", "trust-user-id":true}'

Таким образом федерация создается на quorum вхосте, а апстримом федерации является classic вхост

image

Что бы все заработало так как задумано и сообщения отправлялись из classic в quorum vhost нужна соответствующая политика

Virtual host: quorum
Name: federate-to-quorum
Pattern: .*
Apply to: queues
Priority: 1
Definition: federation-upstream-set = all

image

Сообщения из старого (classic) вхоста будут попадать в новый (quorum) вхост только при условии что и там и там созданы одинаковые очереди и эксченджи Что бы не переносить все руками (очередй могут быть тысячи), воспользуемся встроенным механизмом, доступным из коробки, Export definitions / Import definitions

Выполним export из classic vhost

image

Теперь import полученного json в quorum vhost

image

Tip

Очереди перенеслись как quorum, унаследовав тип от вхоста
federate-to-quorum говорит о том, что политика была настроенна правильно и наша федерация работает корректно

image

Note

Так же убедиться, что федерация работает и очереди федерируются можно на страничке Federation status

image

На этом этапе представим, что все происходит на “боевом” кластере работающим в проде
Мы подготовили все, что было нужно, теперь переключим консьюмеров на использование нового quorum вхоста (продюсеры пока остаются на старом classic вхосте)

Запустим консьюмера на go, подключим его к новому quorum вхосту и будем слушать очередь example-queue-1

package main

import (
	"fmt"
	"github.com/streadway/amqp"
)

func main() {

	fmt.Println("RabbitMQ in Golang: Getting started tutorial")

	connection, err := amqp.Dial("amqp://default_user_Y7DEkXxrZCvuGmG_qac:uLoYngeeyIwboKlUCYpeoV-p0SIKxSaY@192.168.2.109:31131/quorum")

	if err != nil {
		panic(err)
	}

	defer connection.Close()
	
	fmt.Println("Successfully connected to RabbitMQ instance")

	// opening a channel over the connection established to interact with RabbitMQ

	channel, err := connection.Channel()
	if err != nil {
		panic(err)
	}

	defer channel.Close()

	// declaring consumer with its properties over channel opened

	msgs, err := channel.Consume(
		"example-queue-1", // queue
		"", // consumer
		true, // auto ack
		false, // exclusive
		false, // no local
		false, // no wait
		nil, //args
	)

	if err != nil {
		panic(err)
	}

	// print consumed messages from queue

	forever := make(chan bool)

	go func() {

		for msg := range msgs {
			fmt.Printf("Received Message: %s\n", msg.Body)
		}

	}()

	fmt.Println("Waiting for messages...")

	<-forever

}

image

Попробуем что-то отправить в эту очередь, запустим продюссера и подлючим его к classic вхосту

package main

import (
	"fmt"
	"github.com/streadway/amqp"
)

func main() {

	fmt.Println("RabbitMQ in Golang: Getting started tutorial")
	connection, err := amqp.Dial("amqp://default_user_Y7DEkXxrZCvuGmG_qac:uLoYngeeyIwboKlUCYpeoV-p0SIKxSaY@192.168.2.109:31131/classic")

	if err != nil {
		panic(err)
	}

	defer connection.Close()

	fmt.Println("Successfully connected to RabbitMQ instance")

	// opening a channel over the connection established to interact with RabbitMQ

	channel, err := connection.Channel()

	if err != nil {
		panic(err)
	}

	defer channel.Close()

	// declaring queue with its properties over the the channel opened

	queue, err := channel.QueueDeclare(
		"example-queue-1", // name
		true,  // durable
		false, // auto delete
		false, // exclusive
		false, // no wait
		nil,   // args
	)

	if err != nil {
		panic(err)
	}

	// publishing a message

	err = channel.Publish(

		"",                // exchange
		"example-queue-1", // key
		false,             // mandatory
		false,             // immediate

		amqp.Publishing{
			ContentType: "text/plain",
			Body: []byte("Test Message from classic vhost"),
		},
	)

	if err != nil {
		panic(err)
	}

	fmt.Println("Queue status:", queue)
	fmt.Println("Successfully published message")

}

image

Успех, мы получили сообщение отправленное в классическую очередь
На этом этапе все наши консьюмеры уже переключены на новые очереди и получают сообщения от продюсеров работающих на старых очередях

Остается переключить продюсеров, миграция завершена, самое время открывать пиво :)

Note

В случае с Blue-Green нужно будет проделать все то же самое, только федерация будет настраиваться не между вхостами, а между кластерами