Transactional inbox



С помощью transactional outbox мы умеем обеспечивать надежную доставку сообщений до брокера



Но теперь возникает другая проблема — консюмеру нужно ровно один раз обработать сообщение



В случае наивного решения



processMessage() {

databaseTx {



}

message.commit()

}




Может случиться ситуация, что databaseTx закоммитилась, но message.commit() не отработал. Это приведет к тому, что при следующем чтении мы обработаем сообщение еще раз



И здесь нам поможет transactional inbox, у которого я выделяю два вида



1) По-прежнему сначала обрабатываем сообщение, потом коммитим. Но добавляем дедупликацию



processMessage() {

databaseTx {

if (!tryInsert(msgKey)) {

message.commit()

return

}



}

message.commit()

}




В таком случае даже если databaseTx закоммитилась, но message.commit() не отработал, то при повторном чтении мы увидим сохраненный ключ сообщения, и сразу его закоммитим



2) Сохраняем сообщение в таблицу, и фоновые воркеры достают сообщения из таблицы и обрабатывают



processMessage() {

databaseTx {

tryInsert(message)

}

message.commit()

}




Несмотря на то, что такой подход решает ту же проблему, еще и при этом добавляет latency, у него есть весомый плюс — консюмер теперь может балансировать нагрузку на себя



Причем это работает в обе стороны:



1) Например, если сообщения в нас отправляют по http со слишком высоким рейтом, то мы просто сохраняем их в таблицу и процессим с доступной нам скоростью



2) И наоборот: если сообщения мы сами читаем из топика, но у топика слишком мало партиций, и существующие консюмеры не успевают обрабатывать приходящие сообщения, то можно также их просто сохранить в таблицу, и далее нужным количеством воркеров разгребать эту таблицу