⚡Transactional inbox
С помощью transactional outbox мы умеем обеспечивать надежную доставку сообщений до брокера
Но теперь возникает другая проблема — консюмеру нужно ровно один раз обработать сообщение
В случае наивного решения
Может случиться ситуация, что databaseTx закоммитилась, но message.commit() не отработал. Это приведет к тому, что при следующем чтении мы обработаем сообщение еще раз
И здесь нам поможет transactional inbox, у которого я выделяю два вида
1) По-прежнему сначала обрабатываем сообщение, потом коммитим. Но добавляем дедупликацию
В таком случае даже если databaseTx закоммитилась, но message.commit() не отработал, то при повторном чтении мы увидим сохраненный ключ сообщения, и сразу его закоммитим
2) Сохраняем сообщение в таблицу, и фоновые воркеры достают сообщения из таблицы и обрабатывают
Несмотря на то, что такой подход решает ту же проблему, еще и при этом добавляет latency, у него есть весомый плюс — консюмер теперь может балансировать нагрузку на себя
Причем это работает в обе стороны:
1) Например, если сообщения в нас отправляют по http со слишком высоким рейтом, то мы просто сохраняем их в таблицу и процессим с доступной нам скоростью
2) И наоборот: если сообщения мы сами читаем из топика, но у топика слишком мало партиций, и существующие консюмеры не успевают обрабатывать приходящие сообщения, то можно также их просто сохранить в таблицу, и далее нужным количеством воркеров разгребать эту таблицу
С помощью transactional outbox мы умеем обеспечивать надежную доставку сообщений до брокера
Но теперь возникает другая проблема — консюмеру нужно ровно один раз обработать сообщение
В случае наивного решения
processMessage() {
databaseTx {
…
}
message.commit()
}
Может случиться ситуация, что databaseTx закоммитилась, но message.commit() не отработал. Это приведет к тому, что при следующем чтении мы обработаем сообщение еще раз
И здесь нам поможет transactional inbox, у которого я выделяю два вида
1) По-прежнему сначала обрабатываем сообщение, потом коммитим. Но добавляем дедупликацию
processMessage() {
databaseTx {
if (!tryInsert(msgKey)) {
message.commit()
return
}
…
}
message.commit()
}
В таком случае даже если databaseTx закоммитилась, но message.commit() не отработал, то при повторном чтении мы увидим сохраненный ключ сообщения, и сразу его закоммитим
2) Сохраняем сообщение в таблицу, и фоновые воркеры достают сообщения из таблицы и обрабатывают
processMessage() {
databaseTx {
tryInsert(message)
}
message.commit()
}
Несмотря на то, что такой подход решает ту же проблему, еще и при этом добавляет latency, у него есть весомый плюс — консюмер теперь может балансировать нагрузку на себя
Причем это работает в обе стороны:
1) Например, если сообщения в нас отправляют по http со слишком высоким рейтом, то мы просто сохраняем их в таблицу и процессим с доступной нам скоростью
2) И наоборот: если сообщения мы сами читаем из топика, но у топика слишком мало партиций, и существующие консюмеры не успевают обрабатывать приходящие сообщения, то можно также их просто сохранить в таблицу, и далее нужным количеством воркеров разгребать эту таблицу