шаги a.- (сохранить в Приказе БД) и B.- (опубликовать сообщение) должно быть выполняется в транзакции, атомарно. Как я могу это достичь?
В настоящее время Kafka не поддерживает транзакции (и, следовательно, также откаты или фиксации), которые вам необходимо синхронизировать с этим. Короче говоря: вы не можете делать то, что хотите. Это изменится в ближайшем будущем, когда KIP-98 будет объединено, но это может занять некоторое время. Кроме того, даже при транзакциях в Кафке атомная транзакция через две системы - это очень трудная задача, все, что следует, будет улучшено только благодаря поддержке транзакций в Kafka, это все равно не решит вашу проблему. Для этого вам нужно будет изучить некоторые формы two phase commit через ваши системы.
Вы можете получить несколько близко настройке свойств производителей, но в конце концов вам придется выбирать между по крайней мере один раз или не более одного раза для одного из ваших систем (MariaDB или Кафки).
Начнем с того, что вы можете сделать в Кафке, чтобы обеспечить доставку сообщения, и дальше мы погрузимся в ваши варианты для общего потока процессов и каковы его последствия.
Гарантированная доставка
Вы можете настроить количество брокеров, чтобы подтвердить получение ваших сообщений, прежде чем запрос возвращается к вам с параметром ACKs: установив это все вы рассказать брокер, чтобы ждать, пока все реплики не подтвердят ваше сообщение, прежде чем возвращать вам ответ. Это по-прежнему не гарантирует 100%, что ваше сообщение не будет потеряно, поскольку оно еще только написано в кэше страниц, и есть теоретические сценарии, когда брокер терпит неудачу, прежде чем он останется на диске, где сообщение все равно может быть потеряно. Но это такая же хорошая гарантия, как вы собираетесь получить. Вы можете еще больше уменьшить риск потери данных, снизив интервал, при котором брокеры вынуждают fsync на диск (подчеркнули текст и/или flush.ms), но, пожалуйста, имейте в виду, что эти значения могут принести с собой высокую производительность штрафы.
В дополнение к этим настройкам вам нужно будет дождаться, когда ваш производитель Kafka вернет вам ответ на ваш запрос и проверит, произошло ли исключение. Это связано со второй частью вашего вопроса, поэтому я пойду дальше. Если ответ чист, вы можете быть как можно увереннее, чтобы ваши данные попали в Kafka и начали беспокоиться о MariaDB.
Все, что мы рассмотрели до сих пор, касается только того, как обеспечить, чтобы Kafka получал ваши сообщения, но вам также необходимо записывать данные в MariaDB, и это может также потерпеть неудачу, что потребовало бы вызвать сообщение, которое вы, возможно, уже отправлено в Кафку - и этого вы не можете сделать.
Поэтому в основном вы должны выбрать одну систему, в которой вы лучше иметь дело с дубликатами/отсутствующих значений (в зависимости от наличия или отсутствия повторно вы частичные неудачи), и это будет влиять на порядок вы делаете вещи.
Вариант 1
в этой опции вы инициализировать транзакцию в MariaDB, а затем отправить сообщение Кафки, ждать ответа, и если отправить был успешным, вы совершить сделку в MariaDB. Если отправка в Kafka не удалась, вы можете отменить транзакцию в MariaDB, и все будет денди. Если, однако, отправка в Kafka успешна, и ваша фиксация MariaDB по какой-то причине не срабатывает, тогда нет способа вернуть сообщение от Kafka. Таким образом, вы будете либо пропускать сообщение в MariaDB, либо иметь дублирующее сообщение в Kafka, если вы повторно отправите его позже.
Вариант 2
Это в значительной степени только наоборот, но вы, вероятно, лучше, чтобы удалить сообщение, которое было написано в MariaDB, в зависимости от модели данных.
Конечно, вы можете смягчить оба подхода, отслеживая неудачные попытки отправки и повторив их только позже, но все это больше связано с более сложной проблемой.
Лично я бы пошел с подходом 1, так как вероятность сбоя фиксации должна быть несколько меньше, чем сама передача, и выполнить какую-то проверку обмана на другой стороне Кафки.
Это связано с предыдущим: Я отправить сообщение с:. orderSource.output() отправить (MessageBuilder.withPayload (заказ) .build()); Эта операция асинхронна и ВСЕГДА возвращает значение true, независимо от того, брокера Kafka не работает. Как я могу узнать, что сообщение достигло брокера Kafka?
Теперь, прежде всего, я признаю, что я не знаком с Spring, поэтому это может быть вам не полезно, но следующий фрагмент кода иллюстрирует один из способов проверки ответов на получение исключений. Вызов флеша блокируется до тех пор, пока все посылки не закончились (а также не сработали или не удались), а затем проверьте результаты.
Producer<String, String> producer = new KafkaProducer<>(myConfig);
final ArrayList<Exception> exceptionList = new ArrayList<>();
for(MessageType message : messages){
producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
exceptionList.add(exception);
}
}
});
}
producer.flush();
if (!exceptionList.isEmpty()) {
// do stuff
}
Я редактировал вопрос с новым подходом я следующим, однако, учитывая, что вы ответили очень разъяснение я возвращаясь его к оригиналу и запустите новую версию с отредактированной версией. Я вернусь с отзывами о вашем ответе. Благодаря! – codependent
Sönke, все понятно, я ценю подробное объяснение. Для тех, кто заинтересован в том, как обеспечить доставку сообщения с помощью Spring Cloud Stream: https://github.com/spring-cloud/spring-cloud-stream/issues/795 – codependent
@codependent Как получить доступ к шлюзу api или любой другой отправке сначала кафка, а затем два микросервиса подписываются на сообщения кафки .. разве это невозможно? или вы не делаете этого, потому что хотите иметь согласованные данные, а не последовательно? –