Стендап Сьогодні 📢 Канал в Telegram @stendap_sogodni
🤖🚫 AI-free content. This post is 100% written by a human, as is everything on my blog. Enjoy!Пости з тегом #Kafka
07.06.2023
Kafka
Тиждень тому я писав про важливість перенесення навантаження з критичного шляху. А також про те, що це вирішується чергами. Черга надає можливість передати роботу з одного сервісу до іншого. Але є проблема — черга це труба о двох кінцях. Нам обовʼязково доведеться створювати споживача “в пару”. Черга створює жорстку звʼязку сервісів.
Apache Kafka розв’язує цю проблему. Це дещо середнє між чергою та базою даних. Як в черзі, дані організовані в послідовність повідомлень. Як в базі даних, ці повідомлення надійно зберігаються миттєво після приймання, та не видаляються після їх прочитання та обробки. Категорія сервісів, до яких належить Kafka, називається брокер повідомлень. Один сервіс (або сервіси) повідомлення пише, інший (або інші) читають - все з надійним посередником посередині.
Як на мене, то в першу чергу Kafka корисна тим, що дозволяє високонавантаженому сервісу максимально швидко писати дані, без того, щоб відразу розробляти архітектуру їх обробки. Це й спрощує розробку, й підвищує надійність, і взагалі в наш час Big Data розкриває нові можливості збирати те, що незрозуміло як використати.
Але є й інші переваги. Kafka може роздавати одній й ті самі повідомлення декільком споживачам — наприклад, сьогодні вони потрібні тільки в базі, а завтра, може, зʼявиться сервіс аналітики.
Також є прості, але можливості обробляти дані всередині Кафки, наприклад, рахувати їх, або обчислювати кінцевий стан сутності по журналу змін - Kafka Streams. Та Kafka Connect - інструмент для завантаження даних з Кафки до звичайної бази даних або ще кудись.
Єдине, що мені не подобається — все воно написане на Java, з усіма особливостями конфігурації та розміщення сервісів на Java.
03.07.2023
Враження від Kafka
Минув місяць моїх випробувань Кафки. Швидкість, з якою вона здатна приймати дані, реально вражає. Мільйон записів на секунду — цілком реальна цифра. Так, звісно, сама Kafka нічого з ними не зробить — ані індексації, ані агрегації. Проте цінність її не в тому. Кафку ставлять тому, що вона здатна розділити навантаження виробника та споживача подій. Та, навіть якщо мільйон на секунду вам не потрібний постійно, таке навантаження може виникнути, наприклад, на піці попиту, та Кафка спокійно його проковтне.
Так само Кафка розділяє й ризики виробника та споживача. Споживач подій — напевно, складна система, з внутрішньою логікою. Якщо виробник пише прямо в неї, то треба піклуватися й про високу надійність споживача — а чим більше логіки, тим це важче. Тому знову Кафка приходить на допомогу та підхопить всі дані, що були видобуті виробником, навіть якщо споживач зараз не може їх прийняти. (При цьому сама Кафка — досить проста та надійна система, з можливістю високого ступеня надмірності.)
Що мені не подобається — то, безумовно, складнощі в налаштуванні та інтеграції. Ну, з налаштуванням можна взяти керований сервіс, такий як Confluent. Але ще доведеться нормально розбиратись, як же з нею читати та писати. Та які саме компоненти потрібні. От є такі речі, як Zookeeper - сервіс керування кластером — який вже типу не потрібний, бо Кафка вміє й без нього. А ще є Schema Registry - вона для початку теж, це тільки для зрілих систем. А ще є Kafka Connect, який дуже боляче налаштовувати, але насправді вимоги такої немає, можна й без нього.
Поруч з Кафкою, AWS Kinesis Firehose вже не виглядає таким незамінним сервісом. Бо Кафка точно розвʼязує задачу “безрозмірного збору подій.” Тільки Firehose - сервіс вищого рівня, хоч і заточений під одну задачу. Його точно простіше налаштовувати та інтегрувати. З боку запису, Firehose надає простий API, а з боку збереження даних, він доставить їх туди, куди потрібно (до речі, перелік призначень нещодавно розширився.) Плюс, Firehose коштуватиме дешевше, до деякого обсягу. Тільки треба чітко розуміти, що Firehose зробить те, що вам від нього потрібно.
13.07.2023
Батчинг в Кафці та інші подробиці
Сьогодні працював над споживачем даних з Kafka та особливої уваги потребував батчинг — тобто збір вхідних повідомлень в пачки такого розміру, який є економічним для подальшої обробки. Батчинг — взагалі складна справа, бо треба не тільки набирати пачку, але й стежити за таймером. А ще думати про безпечну зупинку всього цього сервісу.
Тому, я був трохи здивований побачити, що в Кафці батчинг вже вбудований. А саме, Кафка віддаватиме дані споживачу пачками заданого розміру та з заданою максимальною затримкою. На першому ознайомленні я ці налаштування проігнорував, та розглядав споживача як джерело поодиноких пакетів. Взагалі це ще одна дуже корисна функція для брокера повідомлень.
До речі, щодо клієнтів для Golang. Їх є немало, але знайти хороший не так легко. Баланс проходить як раз за віссю “проста абстракція — відповідність архітектурі Kafka”
-
Sarama - мега складний. Goka - простіша абстракція для Sarama, але я впевнений, що без розуміння Sarama туди краще не лізти,
-
Confluent-Kafka-Go - обгортка над CGO, тому туди я навіть не дивлюся.
-
Kafka-Go - це був мій перший вибір. В цілому непогана бібліотека, але на мою думку надто абстрагована. До того ж в ній не виявилось методу
Flush()
, тому я вирішив шукати щось інше. А ще в них надто дивно влаштований інтерфейс підключень та клієнтів. -
Franz-Go - це те що в мене зараз, та баланс абстракції тут кращий. Наприклад, треба знати, що насправді кожний топік в Кафці поділений на розділи для паралелізації, та кожний споживач є підписаним на частину розділів. Тому коли отримуєш повідомлення, то в Franz-Go вони приходять поділені на розділи (хоча можна взяти й масивом.) З боку постачальника повідомлень мені теж більше подобається, бо є окремі функції для синхронного та асинхронного надсилання, між іншими.
02.08.2023
Як Kafka гарантує послідовність повідомлень
Ну, годі про формальну верифікацію. Я нещодавно був вражений простим, але дієвим підходом Kafka до послідовності повідомлень.
Для побудови простої в розумінні системи бажано, щоб послідовність подій на виході була така сама, як на вході. Втім, зазвичай зі збільшенням масштабу гарантія послідовності втрачається. Це відбувається тому, що повідомлення можуть бути отримані різними вузлами в кластері. А при читанні послідовність обробки вузлів також не визначена, тому може перемішатись ще й там.
Розглянемо приклад. де нам треба обробляти журнал у форматі “запит”-“відповідь”. Через втрату гарантій послідовності ми не можемо очікувати, що побачимо повідомлення “запит” раніше, ніж “відповідь”, та нам доведеться додатково продумати, що робити з відповіддю, яка прийшла раніше свого запита.
Звісно, рішення існують — наприклад, в AWS SQS є режим FIFO - хоч і суворо обмежений - 300 повідомлень в секунду проти мільйонів у Кафки.
Що робить Кафка? У Кафки одиниця масштабування — це розділ. Кожна “черга” (topic
) ділиться на розділи (partition
). На рівні розділу послідовність повідомлень гарантована. Кожний розділ призначений до конкретного вузла, який приймає всі записи в нього. Зі збільшенням масштабу кількість розділів та вузлів може зростати стільки, скільки треба, зі збереженням гарантій.
Ключовим є те, що виробники можуть самі вирішувати, в який розділ писати кожне повідомлення. Наприклад, якщо ми пишемо журнал подій, та їх треба групувати за субʼєктом, можна зробити ID субʼєкта ключем розподілу. А в системі “запит”-“відповідь” - просто стежити, щоб відповідь була записана в той самий розділ. (Єдине розумне обмеження — щоб розподіл був рівномірним та не створював “гарячих розділів”.)
А з боку споживача існує механізм групи споживачів. Група споживачів ділить між собою всі розділі топіка — кожний розділ дістається тільки одному клієнту з групи. Таким чином, кожний споживач отримує “скибочку” повідомлень. Послідовність в скибочці гарантована. як і відсутність конфліктів. Коли клієнти підключаються чи відключаються, призначення у групі балансуються наново.
Така от неочевидна архітектурна перевага (а я спочатку думав — навіщо ті розділи?)
14.08.2023
Kafka проти черг: події проти задач
Як цікаво: з першого погляду Kafka майже не відрізняється від черг, таких як SQS або RabbitMQ. Проте відмінності, на мою думку, починаються з того, як кожна система дивиться на повідомлення всередині.
Задача в черзі — це наказ на роботу. Значить, кожна задача витратить нетривіальний проміжок часу. А це, своєю чергою, значить, що задач буде не так вже й багато (порівняно з подіями.) Задачі цікавлять нас поодинці: кожну задачу ми робимо окремо, а коли зробили — підтверджуємо, що зроблено.
Подія в Кафці — це запис про зміну стану. Подій може бути астрономічно багато — якщо це показники акселерометра з флоту автівок. З подій ми збираємо картину світу. Події нам потрібні всі відразу, або принаймні якнайбільшими пачками — бо логістичні витрати на пачку менше. Одинична подія для нас нічого не значить — нас цікавить остаточний стан.
Така різниця інформує API Кафки: воно розроблене, щоб якнайшвидше отримувати події, яких буде багато. Реально багато — хоч мільйон на секунду. Коли отримали та обробили, відмічаємо події як оброблені. Не кожну окремо (бо кому то цікаво робити з мільйоном записів), а всі разом — за зсувом.
Отже, Кафку має сенс ставити там, де вам потрібно обробляти значний обсяг подій та продукувати стан, або якось реагувати на зміни стану. А про Кафку як чергу гарно написали на StackOverflow.
16.08.2023
Чому Кафка обмежує розмір пачки в байтах?
API споживача Кафки відрізняється від типового API черги одним цікавим нюансом. Ти не можеш отримати одне повідомлення, або навіть задану кількість повідомлень. Єдине доступне налаштування — це розмір в байтах. Захотілося знайти цьому пояснення.
Причина найпростіша. Kafka не розглядає записи як окремі сутності. Всередині вони зберігаються саме як послідовність байтів. Коли ти робиш fetch
, то Кафка відрізає шматок потрібного розміру та віддає. В протоколі навіть написано, що в кінці шматка може бути неповне повідомлення, яке треба проігнорувати. Тобто сервер навіть не шукає, де закінчується останнє повідомлення в пачці.
(Це неповне повідомлення — не така страшна справа. По-перше, поки йде активне споживання, дані не повинні накопичуватись, тож вони не сягнуть порогу пачки та не будуть обрізані. По-друге, якщо вже будуть, то пачка міститиме тисячі повідомлень чи більше — та швидше один раз відкинути решту, ніж тисячу раз додавати, поки не наберемо потрібний розмір.)
Все, зрозуміло, заради продуктивності. Буквально немає швидшої операції, ніж зчитувати сиру двійкову послідовність. Так Kafka й досягає пропускної здатності в мільйони записів на секунду.
Рекомендую для прочитання опис протоколу Kafka, щоб краще зрозуміти ідеї, на яких вона побудована.
Примітка: є ще опція max.poll.records та можна подумати, що вона є альтернативою для fetch.max.bytes. Проте ні — обмеження за кількістю записів відбувається вже на боці клієнта, після того, як вони отримані. Така опція є зручною абстракцією для нашого коду, не більше.
18.09.2023
Не все так просто з Кафкою: стан
В продовження теми про вивантаження всього змісту з Кафки. При перевірці виявилося що обставини трохи складніше, ніж я думав.
Що треба зрозуміти — в семантиці Kafka дані течуть, як річка, та стан цієї річки складно вловлюється. Та друге — топіки розділяються на розділи. Кожний розділ — це окремий потік даних. Ми обовʼязково маємо переконатись, що кожний розділ оброблений. Дані з розділів надходять в невизначеному порядку. Навіть якщо ми ще не отримували з деякого розділу жодного запису, це не значить, що їх там немає.
Ключем до розвʼязку є зміщення даних в розділі. А саме. зміщення останнього запису з кожного розділу — доступне через API ListOffset. Залишається один раз зберігти поточне значення, а потім споживати, аж допоки не дійдеш до зміщення, яке записав на початку операції. Далі — вже тільки нові записи.
От тільки якщо в одному з розділів неспожитих даних немає, то доведеться чекати нових, щоб зрозуміти, що старих немає — та, можливо, взагалі не дочекатись. Щоб цього уникнути, потрібно також перевіряти зміщення, яке вже спожила наша група споживання — через API Offset Fetch. (Такі чудові назви API, не переплутаєш.) Якщо спожите зміщення дорівнює кінцевому, то на цей розділ можна не дивитись.
Розроблений таким чином алгоритм реально працює. Попри меншу передбачуваність, ніж я звик, все ж можна написати тести, які наповнюють топік, а потім з нього читають. Та це чудово, бо перевіряти таке вручну було б проблематично.
31.10.2023
Оптимізація Kafka для локального запуску
Звісно, зазвичай Kafka як і інші технології оптимізують під велике навантаження. Але, щоб інтеграційні тести швидко працювали локально або на CI, потрібні інші параметри оптимізації, та деколи варто витратити час, щоб їх знайти.
Як я до того дійшов? Вирішив зрозуміти, чому пакет тестів довго працює. На Go якщо запустити тести з ключем -cpuprofile
, то буде записаний профіль виконання. Але профіль тільки показав, що більшість часу тести проводять в очікуванні даних, тобто в IO. В таких випадках профілювання по CPU дає зовсім некорисні результати, бо місце очікування не повʼязане з конкретним рядком програми, отже, незрозуміло, де саме вона гальмує.
Тоді є інший інструмент - -trace
. Трасування запише деревоподібний журнал виконання програми, в якому в тому числі видні й очікування IO, проміж іншими видами блокування. Оскільки я вже знаю, що проблема в IO, то конвертую трасувальний журнал у профіль (go tool trace -pprof=net
) та вивчаю його. Виявилось, що тести багато чекають на Кафку. Вдалося трохи це виправити налаштуваннями.
Чого не треба робити: зменшувати параметр fetch.max.wait
. При мінімальних його значеннях цикл обробки буде безперестанно звертатися до Кафки. Це спричиняє зайві витрати CPU. Якщо ми очікуємо від Кафки хоч якісь дані, то fetch.max.wait
спокійно можна ставити вище.
Натомість треба переконатись, що fetch.min.bytes
встановлений в 1 (мінімальне значення, яке стоїть за замовчуванням.) Тоді ми дійсно отримуватимемо записи як тільки вони зʼявляться
Є ще linger.ms
- це час буферизації продюсера. Його краще виставити більше ніж 0
, що за замовчуванням, бо тоді відбудеться менше запитів до Кафки. Якщо середній тест триває 100 мс — то можна робити linger.ms=100
.
Нарешті, за замовчуванням Кафка створює топіки на декілька розділів — а локально та з єдиним споживачем це тільки сповільнює отримання даних. Раджу перевірити, що топіки створюються лише з одним розділом.
Завтра ще є ідеї паралелізувати деякі кроки підготовки та попрацювати з викликами внутрішніх API.
02.05.2024
Kafka як барʼєр від стресу
Я вже писав як Kafka розʼєднує сервіси за навантаженням: тобто звільняє джерело подій від турбот про навантаження на споживача. Проте є й інший аспект турбот: надійність. З ненадійними споживачами доводиться перестраховуватись, щоб уникнути каскадних збоїв.
Та ось в чому річ. За останній рік Kafka була, без перебільшень, найбільш стабільним компонентом з тих, з якими мені доводиться працювати. Краще ніж “хмарні” амазонівські сервіси на кшталт SQS та Firehose. Тому Kafka виступає надійним буфером між сервісами, який завжди готовий прийняти події… та притримати їх стільки, скільки це потрібно.
Є декілька обʼєктивних причин такої надійності. В першу чергу, це простота - Kafka нічого не робить з даними. В ній немає схеми — а значить, схему не можна порушити. Скільки разів було таке, що міграції в базі розійшлися з клієнтом, та це зупинило запис? З Кафкою такого ніколи не буде.
Також в Кафки гарний принцип надмірності. Кафка зазвичай побудована кластером. Та клієнти знають не одну вхідну адресу сервера, а багато. При збої одного з серверів клієнти просто переходять на інший. Це стає до нагоди й при оновленнях ПЗ.
Якщо ви стикнулися з ситуацією, де отримувач даних ненадійний, та доводиться вводити повторні спроби та інші механізми захисту — подивіться на Кафку.
25.10.2024
Kafka: неочевидне з досвіду
Неочевидна сильна сторона Kafka: кожний потік можна обробляти будь скільки разів. Тобто я звик думати, що черга це: один зайшов — один вийшов. А з Кафкою не так. В Кафці записи живуть визначений час — наприклад, декілька днів — та протягом цього часу доступні для читання скільки завгодно разів. Причому якщо під час читання оголосити ідентифікатор — тобто створити групу споживачів (consumer group) - то Кафка відстежуватиме її позицію в потоці записів.
Тобто з Kafka маємо незнайому парадигму, де головною сутністю є не окремі записи, а їхня послідовність — тема — яку ми можемо наповнювати та споживати, як це потрібно. Наприклад, ми можемо використовувати послідовність подій як джерело правди для нашої бази даних, але також сканувати ту ж саму послідовність на присутність цікавих нам маркерів, а ще обчислювати за ними статистику в реальному часі та записувати в іншу тему ще для чогось.
Неочевидна слабка сторона Kafka: відсутня можливість відкласти запис на майбутнє. Для черги це типова функція, але ж Кафка це не черга (а журнал!) Одна з головних характеристик Кафки — записи читаються в тому самому порядку, як їх записали. Так само й статус прочитання призначається послідовно. А щоб відкласти на майбутнє, нам потрібно порушити цей порядок.
Тобто вбудованого механізму немає, але ж можна придумати свій. Наприклад, є потреба повторно обробляти невдалі записи — та відомо, що кожна наступна повторна спроба буде пізніше за попередні. Тоді можна зробити окрему тему для повторних спроб, а також процес, який вичитує наступну спробу, чекає потрібний час, та надсилає назад в головну тему.
А от можливість відкласти отримання на невизначений проміжок взагалі не вкладається в модель Кафки, та краще вже брати нормальну чергу.
10.12.2024
Дев-адвент 10: редагування тегів... та AWS Lambda
Сьогодні не те щоб найцікавіші функції, яких в кожному проєкті вистачає. (Хоча гарно, що простий CRUD на Swift UI робиться без зайвих перешкод.) Тому розкажу про іншу цікаву знахідку.
На вихідних одна AWSLambda накрилася та почала замість 15 секунд тривати 15 хвилин, тобто вилітати за браком часу. Лямбда ця запускається за розкладом та перекачує дані з Kafka ще кудись. Ніяких підстав бути такою повільною в неї не було.
В Кафки особливий підхід до споживачів, а саме: щоб уникнути повторної обробки даних, споживачі в групі ділять між собою потік задач. (Саме для того потоки — теми - topic діляться на розділи - partition.) А це своєю чергою значить, що коли споживач уходить, або зʼявляється новий, Кафка негайно виконує перебалансування групи. А якщо споживач просто відвалився, то щоб переконатися в його відсутності потрібний ще й тайм-аут. (Тому важливо завжди виходити з групи ввічливо.)
(До речі, це значить, що лямбди — не краще поєднання з Кафкою; сталий сервіс однозначно природніше. Втім, в моєму випадку на цілий сервіс задач було мало.)
Що, гадаю вийшло: в якийсь момент чергова лямбда “спіткнулася” - можливо, в Кафці йшли роботи, які самі викликають перебалансування — та зависла. Після того її наздогнала наступна лямбда, та вони почали ділити між собою групу в Кафці. Далі почався каскадний ефект: поки старі лямбди запускалися наново, зʼявлялися ще й нові та викликали нове перебалансування, причому кожна з них мала необмежену кількість спроб, тому вони нікуди не зникали, та проблема тривала аж до ручного втручання.
Виявилося, що запуск лямбд за розкладом — не така тривіальна справа. 1)тайм-аут лямбди повинен бути менше за інтервал розкладу - це, думаю, очевидно. 2) в налаштуваннях рівночасності треба вказати 1 рівночасне виконання, щоб уникнути набігання. 3) в налаштуваннях надійності вказати 0 повторних спроб, бо лямбда все одно запуститься за наступним розкладом. 4) там же ж вказати мінімальний вік подій, бо знов-таки немає сенсу обробляти старий розклад, коли завжди буде новий.