Rabbitmq для початківців, ajaxblog

сервер черг RabbitMQ

Іноді в веб-додатках з'являється необхідність виконати складні ресурсомісткі завдання, які не можуть бути уміщені в короткому часовому інтервалі HTTP запиту. У цьому випадку на допомогу приходять черги. Основна ідея черг - уникнути виконання ресурсоємних завдань безпосередньо після відправки запиту. Замість цього завдання ставиться в чергу для подальшого виконання в асинхронному режимі. Тобто при отриманні запиту від клієнта ми інкапсуліруем завдання як повідомлення і відправляємо його в чергу, а вже обробник черги дістає повідомлення в порядку їх слідування і обробляє належним чином. Забігаючи вперед, скажу, що можливий режим роботи черг, коли при наявності декількох копій обробника, наступна завдань буде надходити на вільний обробник. Таким чином досягається розпаралелювання виконання завдань.

В даному розділі розглядається робота з чергами, які використовують сервер повідомлень RabbitMQ. Сервер RabbitMQ по суті є менеджером черг, який має такі переваги:

У туторіали будуть наведені приклади для всіх перерахованих вище варіантів. За основу взято туторіали з офіційного сайту, доповнені і реалізовані на PHP для RabbitMQ.

RabbitMQ испозуют протокол AMQP. Щоб іспользовть RabbitMQ необхідно поставити клієнтську і серверну частини.

установка сервера

Для установки розширення AMQP для PHP необхідно спочатку встановити RabbitMQ Server

Додамо следующии рядок в файл /etc/apt/sources.list

установка клієнта

базові поняття

У RabbitMQ використовуються наступний позначення. Продюсер - програма, яка посилає повідомлення. Будемо позначати його так

Брокер (черга) - власне просто буфер в пам'яті без будь-яких обмежень на кількість збережених повідомлень. В одну і ту ж чергу можуть відсилати повідомлення кілька продюсерів, так само як кілька Консьюмер можуть намагатися отримати повідомлення з однієї і тієї ж черги. Черга буде позначена так (зверху вказано ім'я черги)

Консьюмер (одержувач) - програма, яка приймає повідомлення з черги. Будемо позначати його так

Тут важливо зазначити, що продюсер, консьюмер і брокер можуть бути розташовані на різних машинах, більш того, в більшості випадків це саме так.

Перший скрипт роботи з чергою, свого роду "Hello world", посилатиме текстове повідомлення з клієнта, приймати його на сервері і виводити на екран.

Тобто схема роботи наступна: Перше, що треба зробити, це встановити з'єднання з сервером RabbitMQ. З'єднання встановлюється командами

Використовуючи коннект можна отримати об'єкт для каналу

На основі отриманого каналу створюємо обмінник

і, власне, саму чергу

Коли обмінник і чергу готові, їх можна пов'язати з ключу

Після того як повідомлення відіслано, коннект можна розірвати.

Одержувач також повинен виконати ту ж послідовність - пріконнектіться до сервера повідомлень; - створити канал; - оголосити обмінник; - оголосити чергу; - зв'язати чергу з обмінником по ключу Останні два дії, як згадувалося вище, не обов'язкові. Тепер можна почати прослуховувати чергу

Тут методу get в качетсве параметра передається константа ARMQ_AUTOACK, яка сповіщає сервер повідомлень про те, що дане повідомлення отримано. Це найпростіший спосіб видалити повідомлення з черги. Однак в даному випадку в разі невдалої обробки повідомлення, повернути повторно його в чергу не можна.

Таким чином, отримуємо два скрипта

розподілені черги

Rabbitmq для початківців, ajaxblog

Оповіщення (acknowledgment)

Деякі завдання можуть виконуватися досить довго. І невідомо, що може статися з сервером в цей момент: сервер може перезавантажитися, або завдання може зависнути або завершиться фатальною помилкою. У першому туторіали оповіщення було відключено шляхом передачі параметра AMQP_AUTOACK в метод get (). У цьому випадку повідомлення буде видалено з пам'яті відразу після виконання методу get і в разі помилки, що сталася під час обробки, не повернуться в чергу. Щоб уникнути цього, не будемо передавати константу AMQP_AUTOACK в метод get. Замість цього по завершенню обробки викличемо метод ack (), який повідомить брокер про те, що повідомлення успішно оброблено і його можна видалити з пам'яті. В іншому випадку RabbitMQ розуміє, що сполучення не обратботано і перенаправляє його іншому вільному Консьюмер. Однак тут варто відзначити один важливий момент. Переслані повідомлення не будуть оброблятися до того поки консьюмер НЕ отконнектітся і пріконнектітся заново до брокера. Якщо необхідно заново обробити повідомлення в рамках того ж конекту до сервера повідомлень, то необхідно викликати метод nack () з прапором AMQP_REQUEUE, який поставить невдало оброблену завдання назад в чергу і повідомить брокер про те, що ця задача повинна бути знову оброблена.

Распростанение помилка - при включеному сповіщення не підтверджувати коректно оброблені завдання (повідомлення). В цьому випадку при кожному новому коннекте, все вже оброблені завдання будуть надходити заново на обробку. Процес буде виглядати як безладна повторна відпарювання повідомлень, що в кінцевому підсумку призведе до переповнення пам'яті. Відстежити таку ситуацію можна шляхом використання нативного інструменту сервера повідомлень rabbitmqctl

Життєздатність повідомлень (durability)

У попередньому параграфі ми розглянули як не втратити повідомлення в черзі шляхом повторної відправки його в чергу. Проте повідомлення може бути втрачено в разі якщо сервер повідомлень був несподівано зупинений. Щоб цього уникнути, чергу повинна бути створена з прапором AMQP_DURABLE.

Якщо чергу 'hello' вже була оголошена, то даний код викличе помилку, оскільки один раз оголошену чергу не можна оголосити повторно з іншими параметрами. З цієї ситуації є два виходи, або обнулити всі черги як сказано тут, або створити нову чергу з невживаних ім'ям. Подивитися список черг можна спопособом згаданим в попередньому параграфі. Установка прапора AMQP_DURABLE не гарантує стовідсоткову безпеку повідомлень в черзі. Незважаючи на те, що таким спопосбом ми вказуємо RabbitMQ зберігати повідомлення на диску, існує мертва зона після отримання соощенія, коли воно вже в пам'яті, але ще не збережено на диску. У цей момент, в разі не передбаченої ситуації, воно може бути втрачено з пам'яті. Для нашого простого прикладу таких гарантій досить, але якщо необхідно домогтися високих гарантій отримання повідомлення, то слід використовувати транзакції.

Всі разом

Для прикладу розподілу повідомлень між чергами нам знадобиться функція, що імітує завантаженість системи. Для цього ми використовуємо звичайний таймер

Для виконання зв'язку з шалбону обмінник повинен мати тип topic, який визначається константою AMQP_EX_TYPE_TOPIC. Ключі routingKey складаються з слова, які прямують через точку, наприклад, "logs.devices.kernel.notice", "logs.devices.cron". Максимальна довжина такого ключа може становити 255 символів. Логіка доставки повідомлень по ключу схожа з логікою для обмінників з типом direct - повідомлення з певним ключем будуть доставлені в черзі з відповідним ключем. Але є одна велика різниця. Ключі, що використовуються для зв'язку за шаблоном, можуть містити два спеціальних символу:

  • *. відповідає строго одному слову;
  • #. відповідає будь-якій кількості слів, в тому числі і відсутності слів;

Наприклад, маємо такі зв'язку
* .orange. *
*. *. Rabbit
lazy. #

Rabbitmq для початківців, ajaxblog

Перше слово описує швидкість, друге - колір і третє - вид тварини, тобто [Speed] [color] [species]. Ми створили три зв'язку: чергу Q1 зв'язали по ключу ".orange. "І чергу Q2 - по ключам". .rabbit "і" lazy. # ". Таким чином, можна сказати, що черга Q1 розглядає всіх помаранчевих тварин, а чергу Q2 - всіх зайців і всіх повільних тварин.

Розглянемо кілька прикладів:

  • "Quick.orange.rabbit" - в обидві черги
  • "Lazy.orange.elephant" - в обидві черги
  • "Quick.orange.fox" - тільки в 1-у
  • "Lazy.brown.fox" - тільки у 2-у
  • "Quick.brown.fox" - буде відкинута
  • "Quick.orange.male.fox" - буде відкинута
  • "Lazy.orange.male.fox" - тільки у 2-у

Обмінник з типом topic може повторювати поведінку обмінника з типом fanout, якщо з ним зв'язати чергу по ключу "#". Якщо в ключ не испльзовать спеціальних символів, то такий обмінник буде відповідати обменнику з типом direct.

Відправлення повідомлень

Для відправки повідомлень за шаблоном обмінник повинен бути створений з типом topic, який сооветствует константі AMQP_EX_TYPE_TOPIC.

Реалізація RPC шаблону

У другому уроці була реалізована чергу, яка розподіляла навантаження між усіма наявними Консьюмер. Але, що якщо нам потрібно отримати результат від обробника черзі. Такий підхід відомий як виклик віддалених процедур або RPC (remote procedure call). У цьому уроці буде реалізована модель RPC з використанням черги повідомлень RabbitMQ. Звичайно, такий підхід передбачає, що обробка не повинна займати багато часу. Для реалізації прикладу наша функція обробник буде змінювати повідомлення "message before" на "message after".

В цілому, реалізація RPC за допомогою RabbitMQ досить проста. Клієнт відправляє повідомлення, а сервері відповідає. Для обробки відповіді сервера, необхідно створити callback чергу. Щоб дізнатися яка callback чергу очікує відповіді, ми повинні в запиті послати її ім'я. Для цього на продюсера створюється анонімна чергу і її ім'я додається в параметри запиту

Зверніть увагу, що callback чергу створюється з прапором AMQP_EXCLUSIVE, що означає, що тільки один консьюмер може слухати цю чергу.

Correlation ID

У методі, представленому вище, ми припускаємо створювати callback чергу для кожного RPC запиту. Оскільки не можна однозначно на ім'я черги визначити якому запиту належить відповідь, в запит також додається параметр correlationId, який має унікальне значення для кожного запиту. Пізніше, коли ми отримаємо відповідь, ми зможемо порівняти його correlationId зі значенням, переданим разом із запитом. І в разі їх розбіжності просто відкинути отриману відповідь.

Підсумковий план дій

  • клієнт створює анонімну ексклюзивну callback чергу
  • клієнт відсилає запит з двома параметрами: replyTo - ім'я callback черзі corralationId - унікальне значення для кожного запиту
  • запит відправляється в іменовану чергу, наприклад, з ім'ям rpc_queue
  • RPC Воркер (RPC сервер) чекає запит від цієї черги і коли запит з'являється, обробляє його і шле відповідь назад клієнту, використовуючи ім'я callback черзі в якості роутер-ключа
  • клієнт слухає callback чергу і коли повідомлення з'являється, звіряє correlationId. Якщо значення цієї властивості з отриманого повідомлення відповідає раніше сформованому значенням, відповідь обробляється додатком.

Всі разом

Функція обробки повідомлення на стороні сервера виглядає наступним чином

Схожі статті