Опубликованы фото новейших российских роботов «Рысь-БП» и «Аватар» || Рысь робот

Еще раз о чтении сообщений

Клиент, который хочет прочитать сообщения, управляет именованным указателем, называемым группа консюмеров (consumer group), который указывает на смещение (offset) сообщения в партиции. Смещение — это позиция с возрастающим номером, которая начинается с 0 в начале партиции. Эта группа консюмеров, на которую ссылаются в API через определяемый пользователем идентификатор group_id, соответствует одному логическому потребителю или системе.

Большинство систем, использующих обмен сообщениями, читают данные из адресата посредством нескольких экземпляров и потоков для параллельной обработки сообщений. Таким образом, обычно будет много экземпляров консюмеров, совместно использующих одну и ту же группу консюмеров.Проблему чтения можно представить следующим образом:

  • Топик имеет несколько партиций
  • Использовать топик может одновременно множество групп консюмеров
  • Группа консюмеров может иметь несколько отдельных экземпляров

Это нетривиальная проблема «многие ко многим». Чтобы понять, как Kafka обращается с отношениями между группами консюмеров, экземплярами консюмеров и партициями, рассмотрим ряд постепенно усложняющихся сценариев чтения.

Давайте возьмем в качестве отправной точки топик с одной партицией (Figure 3-2).

Figure 3-2. Консюмер читает из партиции

Когда экземпляр консюмера подключается со своим собственным group_id к этому топику, ему назначается партиция для чтения и смещение в этой партиции. Положение этого смещения конфигурируется в клиенте, как указатель на самую последнюю позицию (самое новое сообщение) или самую раннюю позицию (самое старое сообщение).

Консюмер запрашивает (polls) сообщения из топика, что приводит к их последовательному чтению из журнала.Позиция смещения регулярно коммитится обратно в Kafka и сохраняется, как сообщения во внутреннем топике _consumer_offsets. Прочитанные сообщения все равно не удаляются, в отличие от обычного брокера, и клиент может перемотать (rewind) смещение, чтобы повторно обработать уже просмотренные сообщения.

Смотрите про коптеры:  Мойщик окон робот рейтинг ТОП-8 – интересный обзор, где купить отзывы

Опубликованы фото новейших российских роботов «Рысь-БП» и «Аватар» || Рысь робот

Когда подключается второй логический консюмер, используя другой group_id, он управляет вторым указателем, который не зависит от первого (Figure 3-3). Таким образом, топик Kafka действует как очередь, в которой существует один консюмер и, как обычный топик pub-sub, на который подписаны несколько консюмеров, с дополнительным преимуществом, что все сообщения сохраняются и могут обрабатываться несколько раз.

Figure 3-3. Два консюмера в разных группах консюмеров читают из одной партиции

Когда один экземпляр консюмера читает данные из партиции, он полностью контролирует указатель и обрабатывает сообщения, как описано в предыдущем разделе.Если несколько экземпляров консюмеров были подключены с одним и тем же group_id к топику с одной партицией, то экземпляру, который подключился последним, будет передан контроль над указателем и с этого момента он будет получать все сообщения (Figure 3-4).

Figure 3-4. Два консюмера в одной и той же группе консюмеров читают из одной партиции

Этот режим обработки, в котором количество экземпляров консюмеров превышает число партиций, можно рассматривать как разновидность монопольного потребителя. Это может быть полезно, если вам нужна «активно-пассивная» (или «горячая-теплая») кластеризация ваших экземпляров консюмеров, хотя параллельная работа нескольких консюмеров («активно-активная» или «горячая-горячая») намного более типична, чем консюмеры в режиме ожидания.

Чаще всего, когда мы создаем несколько экземпляров консюмеров, мы делаем это либо для параллельной обработки сообщений, либо для увеличения скорости чтения, либо для повышения устойчивости процесса чтения. Поскольку читать данные из партиции может одновременно только один экземпляр консюмера, то как это достигается в Kafka?

Один из способов сделать это — использовать один экземпляр консюмера, чтобы прочитать все сообщения и передать их в пул потоков. Хотя этот подход увеличивает пропускную способность обработки, он увеличивает сложность логики консюмеров и ничего не делает для повышения устойчивости системы чтения. Если один экземпляр консюмера отключается из-за сбоя питания или аналогичного события, то вычитка прекращается.Каноническим способом решения этой проблемы в Kafka является использование бОльшего количества партиций.

Партиционирование

Партиции являются основным механизмом распараллеливания чтения и масштабирования топика за пределы пропускной способности одного экземпляра брокера. Чтобы лучше понять это, давайте рассмотрим ситуацию, когда существует топик с двумя партициями и на этот топик подписывается один консюмер (Figure 3-5).

Опубликованы фото новейших российских роботов «Рысь-БП» и «Аватар» || Рысь робот

Figure 3-5. Один консюмер читает из нескольких партиций

В этом сценарии консюмеру дается контроль над указателями, соответствующими его group_id в обоих партициях, и начинается чтение сообщений из обеих партиций.Когда в этот топик добавляется дополнительный консюмер для того же group_id, Kafka переназначает (reallocate) одну из партиций с первого на второй консюмер.

После чего каждый экземпляр консюмера будет вычитывать из одной партиции топика (Figure 3-6).Чтобы обеспечить обработку сообщений параллельно в 20 потоков, вам потребуется как минимум 20 партиций. Если партиций будет меньше, у вас останутся консюмеры, которым не над чем работать, что описано ранее в нашем обсуждении монопольных консюмеров.

Figure 3-6. Два консюмера в одной и той же группе консюмеров читают из разных партиций

Эта схема значительно снижает сложность работы брокера Kafka по сравнению с распределением сообщений, необходимым для поддержки очереди JMS. Здесь не нужно заботится о следующих моментах:

  • Какой консюмер должен получить следующее сообщение, основываясь на циклическом (round-robin) распределении, текущей емкости буферов предварительной выборки или предыдущих сообщениях (как для групп сообщений JMS).
  • Какие сообщения отправлены каким консюмерам и должны ли они быть доставлены повторно в случае сбоя.

Все, что должен сделать брокер Kafka — это последовательно передавать сообщения консюмеру, когда последний запрашивает их.Однако, требования к распараллеливанию вычитки и повторной отправке неудачных сообщений никуда не деваются — ответственность за них просто переходит от брокера к клиенту. Это означает, что они должны быть учтены в вашем коде.

ConsumerRecords {amp}lt; K, V {amp}gt; poll(long timeout);

Возвращаемое значение метода — это контейнерная структура, содержащая несколько объектов ConsumerRecord из потенциально нескольких партиций. ConsumerRecord сам по себе является объектом-холдером для пары ключ-значение с соответствующими метаданными, такими, как партиция, из которой он получен.Как обсуждалось в Главе 2, мы должны постоянно помнить, что происходит с сообщениями после их успешной или неуспешной обработки, например, если клиент не может обработать сообщение или если он прерывает работу.

В JMS это обрабатывалось через режим подтверждения (acknowledgement mode). Брокер либо удалит успешно обработанное сообщение, либо повторно доставит необработанное или зафейленное (при условии, что были использованы транзакции). Kafka работает совсем по-другому. Сообщения не удаляются в брокере после вычитки и ответственность за то, что происходит при сбое, лежит на самом вычитывающем коде.

Как мы уже говорили, группа консюмеров связана со смещением в журнале. Позиция в журнале, связанная с этим смещением, соответствует следующему сообщению, которое будет выдано в ответ на poll (). Решающее значение при чтении имеет момент времени, когда это смещение увеличивается.Возвращаясь к модели чтения, рассмотренной ранее, обработка сообщения состоит из трех этапов:

  1. Извлечь сообщение для чтения.
  2. Обработать сообщение.
  3. Подтвердить сообщение.

Отправка сообщений

Опубликованы фото новейших российских роботов «Рысь-БП» и «Аватар» || Рысь робот

Ответственность за решение, в какую партицию отправить сообщение, возлагается на продюсер этого сообщения. Чтобы понять механизм, с помощью которого это делается, нам сначала нужно рассмотреть, что именно мы на самом деле отправляем.В то время, как в JMS мы используем структуру сообщения с метаданными (заголовками и свойствами) и телом, содержащим полезную нагрузку (payload), в Kafka сообщение является парой «ключ-значение».

Полезная нагрузка сообщения отправляется, как значение (value). Ключ, с другой стороны, используется главным образом для партиционирования и должен содержать специфичный для бизнес-логики ключ, чтобы поместить связанные сообщений в ту же партицию.В Главе 2 мы обсуждали сценарий онлайн-ставок, когда связанные события должны обрабатываться по порядку одним консюмером:

  1. Учетная запись пользователя настроена.
  2. Деньги зачисляются на счет.
  3. Делается ставка, которая выводит деньги со счета.
interface Partitioner { int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster); }

Реализация Partitioner для определения партиции использует по-умолчанию алгоритм хеширования ключа (general-purpose hashing algorithm over the key) или циклический перебор (round-robin), если ключ не указан. Это значение по-умолчанию работает хорошо в большинстве случаев. Однако, в будущем вы захотите написать свой собственный.

{ "signature": "541661622185851c248b41bf0cea7ad0", "accountId": "10007865234" }

Поскольку значение подписи будет варьироваться в зависимости от полезной нагрузки, дефолтная стратегия хеширования интерфейса Partitioner не будет надежно группировать связанные сообщения. Поэтому нам нужно будет написать свою собственную стратегию, которая будет анализировать этот ключ и разделять (partition) значение accountId.

Пользовательская стратегия партиционирования должна гарантировать, что все связанные сообщения окажутся в одной партиции. Хотя это кажется простым, но требование может быть усложнено из-за важности упорядочивания связанных сообщений и того, насколько фиксировано количество партиций в топике.Количество партиций в топике может изменяться со временем, так как их можно добавить, если трафик выходит за пределы первоначальных ожиданий.

Таким образом, ключи сообщений могут быть связаны с партицией, в которую они были первоначально отправлены, подразумевая часть состояния, которое должно быть распределено между экземплярами продюсера.Другим фактором, который следует учитывать, является равномерность распределения сообщений между партициями.

Как правило, ключи не распределяются равномерно по сообщениям, и хеш-функции не гарантируют справедливое распределение сообщений для небольшого набора ключей.Важно отметить, что, как бы вы ни решили разделить сообщения, сам разделитель, возможно, придется использовать повторно.Рассмотрим требование репликации данных между кластерами Kafka в разных географических расположениях.

Для этой цели Kafka поставляется с инструментом командной строки под названием MirrorMaker, который используется для чтения сообщений из одного кластера и передачи их в другой.MirrorMaker должен понимать ключи реплицируемого топика, чтобы поддерживать относительный порядок между сообщениями при репликации между кластерами, поскольку количество партиций для этого топика может не совпадать в двух кластерах.

Пользовательские стратегии партиционирования встречаются относительно редко, так как дефолтные хеширование или циклический перебор успешно работают в большинстве сценариев. Однако, если вам требуются строгие гарантии упорядочивания или вам необходимо извлечь метаданные из полезных нагрузок, то партиционирование — это то, на что вам следует взглянуть более подробно.

Соглашения по продюсеру

Future {amp}lt; RecordMetadata {amp}gt; send(ProducerRecord {amp}lt; K, V {amp}gt; record); Future {amp}lt; RecordMetadata {amp}gt; send(ProducerRecord {amp}lt; K, V {amp}gt; record, Callback callback);
RecordMetadata metadata = producer.send(record).get();

Высокая доступность (High Availability)

Опубликованы фото новейших российских роботов «Рысь-БП» и «Аватар» || Рысь робот

Подход Kafka в отношении высокой доступности существенно отличается от подхода ActiveMQ. Kafka разработана на базе горизонтально масштабируемых кластеров, в которых все экземпляры брокера принимают и раздают сообщения одновременно.Кластер Kafka состоит из нескольких экземпляров брокера, работающих на разных серверах.

Kafka была разработана для работы на обычном автономном железе, где каждый узел имеет свое собственное выделенное хранилище. Использование сетевых хранилищ (SAN) не рекомендуется, поскольку множественные вычислительные узлы могут конкурировать за временнЫе интервалы хранилища и создавать конфликты.

Kafka — это постоянно включенная система. Многие крупные пользователи Kafka никогда не гасят свои кластеры и программное обеспечение всегда обеспечивает обновление путем последовательного рестарта. Это достигается за счет гарантирования совместимости с предыдущей версией для сообщений и взаимодействий между брокерами.

Брокеры подключены к кластеру серверов ZooKeeper, который действует, как реестр конфигурационных данный и используется для координации ролей каждого брокера. ZooKeeper сам является распределенной системой, которая обеспечивает высокую доступность посредством репликации информации путем установления кворума.В базовом случае топик создается в кластере Kafka со следующими свойствами:

  • Количество партиций. Как обсуждалось ранее, точное значение, используемое здесь, зависит от желаемого уровня параллельного чтения.
  • Коэффициент (фактор) репликации определяет, сколько экземпляров брокера в кластере должны содержать журналы для этой партиции.

Используя ZooKeepers для координации, Kafka пытается справедливо распределить новые партиции между брокерами в кластере. Это делается одним экземпляром, который выполняет роль Контроллера.В рантайме для каждой партиции топикаКонтроллер назначает брокеру роли лидера (leader, master, ведущего) и последователей (followers, slaves, подчиненных).

Брокер, выступающий в качестве лидера для данной партиции, отвечает за прием всех сообщений, отправленных ему продюсерами, и распространение сообщений по консюмерам. При отправке сообщений в партицию топика они реплицируются на все узлы брокера, выступающие в качестве последователей для этой партиции.

Каждый узел, содержащий журналы для партиции, называется репликой. Брокер может выступать в качестве лидера для одних партиций и в качестве последователя для других.Последователь, содержащий все сообщения, хранящиеся у лидера, называется синхронизированной репликой (репликой, находящейся в синхронизированном состоянии, in-sync replica).

Если брокер, выступающий в качестве лидера для партиции, отключается, любой брокер, который находится в актуализированном или синхронизированном состоянии для этой партиции, может взять на себя роль лидера. Это невероятно устойчивый дизайн.Частью конфигурации продюсера является параметр acks, который определяет, сколько реплик должно подтвердить (acknowledge) получение сообщения, прежде чем поток приложения продолжит отправку: 0, 1 или все.

Если задано значение all, то при получении сообщения лидер отправит подтверждение (confirmation) обратно продюсеру, как только получит подтверждение (acknowledgements) записи от нескольких реплик (включая саму себя), определенных настройкой топика min.insync.replicas (по умолчанию 1). Если сообщение не может быть успешно реплицировано, то продюсер вызовет исключение для приложения (NotEnoughReplicas или NotEnoughReplicasAfterAppend).

Опубликованы фото новейших российских роботов «Рысь-БП» и «Аватар» || Рысь робот

В типичной конфигурации создается топик с коэффициентом репликации 3 (1 лидер, 2 последователя для каждой партиции) и параметр min.insync.replicas устанавливается в значение 2. В этом случае, кластер будет допускать, чтобы один из брокеров, управляющих партицией топика, мог отключаться без влияния на клиентские приложения.

Это возвращает нас к уже знакомому компромиссу между производительностью и надежностью. Репликация происходит за счет дополнительного времени ожидания подтверждений (acknowledgments) от последователей. Хотя, поскольку она выполняется параллельно, репликация, как минимум на три узла, имеет такую же производительность, как и на два (игнорируя увеличение использования пропускной способности сети).

Используя эту схему репликации, Kafka ловко избегает необходимости обеспечивать физическую запись каждого сообщения на диск с помощью операции sync (). Каждое сообщение, отправленное продюсером, будет записано в журнал партиции, но, как обсуждалось в Главе 2, запись в файл первоначально выполняется в буфер операционной системы.

Если это сообщение реплицировано на другой экземпляр Kafka и находится в его памяти, потеря лидера не означает, что само сообщение было потеряно — его может взять на себя синхронизированная реплика.Отказ от необходимости выполнять операцию sync () означает, что Kafka может принимать сообщения со скоростью, с которой она может записывать их в память.

И наоборот, чем дольше можно избежать сброса (flushing) памяти на диск, тем лучше. По этой причине нередки случаи, когда брокерам Kafka выделяется 64 Гб памяти или более. Такое использование памяти означает, что один экземпляр Kafka может легко работать на скоростях во много тысяч раз быстрее, чем традиционный брокер сообщений.

Kafka также можно настроить для применения операции sync () к пакетам сообщений. Поскольку всё в Kafka ориентировано на работу с пакетами, это на самом деле работает довольно хорошо для многих сценариев использования и является полезным инструментом для пользователей, которые требуют очень сильных гарантий.

Большая часть чистой производительности Kafka связана с сообщениями, которые отправляются брокеру в виде пакетов, и с тем, что эти сообщения считываются из брокера последовательными блоками с помощью zero-copy операций (операциями, в ходе которых не выполняется задача копирования данных из одной области памяти в другую).

Последнее является большим выигрышем с точки зрения производительности и ресурсов и возможно только благодаря использованию лежащей в основе структуры данных журнала, определяющей схему партиции.В кластере Kafka возможна гораздо более высокая производительность, чем при использовании одного брокера Kafka, поскольку партиции топика могут горизонтально масштабироваться на множестве отдельных машин.

Итоги

В этой главе мы рассмотрели, как архитектура Kafka переосмысливает отношения между клиентами и брокерами, чтобы обеспечить невероятно устойчивый конвейер обмена сообщениями, с пропускной способностью во много раз большей, чем у обычного брокера сообщений. Мы обсудили функциональность, которую она использует для достижения этой цели, и кратко рассмотрели архитектуру приложений, обеспечивающих эту функциональность.

В следующей главе мы рассмотрим общие проблемы, которые необходимо решать приложениям на основе обмена сообщениями, и обсудим стратегии их решения. Мы завершим главу, обрисовав, как рассуждать о технологиях обмена сообщениями в целом, чтобы вы могли оценить их пригодность для ваших сценариев использования.

Предыдущая переведенная часть: Понимание брокеров сообщений. Изучение механики обмена сообщениями посредством ActiveMQ и Kafka. Глава 1

Перевод выполнен: tele.gg/middle_java

Продолжение следует…

Оцените статью
Радиокоптер.ру
Добавить комментарий