6

Я пытаюсь реализовать архитектуру, управляемую событиями, для обработки распределенных транзакций. Каждая служба имеет свою собственную базу данных и использует Kafka для отправки сообщений для информирования других микросервисов об операциях.Как реализовать микросервисную архитектуру Event Driven с Spring Cloud Stream Kafka и Database for service

Пример:

Order service -------> | Kafka |------->Payment Service 
     |          | 
Orders MariaDB DB     Payment MariaDB Database 

Заказ принимает запрос заказа. Он должен сохранить новый заказ в своей БД и опубликовать сообщение, чтобы Платежная служба осознала, что она должна взимать плату за товар:

частный заказБизнес-заказБизнес;

@PostMapping 
public Order createOrder(@RequestBody Order order){ 
    logger.debug("createOrder()"); 
    //a.- Save the order in the DB 
    orderBusiness.createOrder(order); 
    //b. Publish in the topic so that Payment Service charges for the item. 
    try{ 
     orderSource.output().send(MessageBuilder.withPayload(order).build()); 
    }catch(Exception e){ 
     logger.error("{}", e); 
    } 
    return order; 
} 

Это мои сомнения:

  1. шаги a.- (сохранить в Приказе БД) и B.- (опубликовать сообщение) должны быть выполнены в транзакции, атомарно. Как я могу это достичь?
  2. Это связано с предыдущим: я отправляю сообщение с помощью: orderSource.output(). Send (MessageBuilder.withPayload (order) .build()); Эти операции являются асинхронными, и ВСЕГДА возвращает true, независимо от того, работает ли брокер Kafka. Как я узнаю, что сообщение дошло до брокера Kafka?

ответ

8

шаги a.- (сохранить в Приказе БД) и B.- (опубликовать сообщение) должно быть выполняется в транзакции, атомарно. Как я могу это достичь?

В настоящее время Kafka не поддерживает транзакции (и, следовательно, также откаты или фиксации), которые вам необходимо синхронизировать с этим. Короче говоря: вы не можете делать то, что хотите. Это изменится в ближайшем будущем, когда KIP-98 будет объединено, но это может занять некоторое время. Кроме того, даже при транзакциях в Кафке атомная транзакция через две системы - это очень трудная задача, все, что следует, будет улучшено только благодаря поддержке транзакций в Kafka, это все равно не решит вашу проблему. Для этого вам нужно будет изучить некоторые формы two phase commit через ваши системы.

Вы можете получить несколько близко настройке свойств производителей, но в конце концов вам придется выбирать между по крайней мере один раз или не более одного раза для одного из ваших систем (MariaDB или Кафки).

Начнем с того, что вы можете сделать в Кафке, чтобы обеспечить доставку сообщения, и дальше мы погрузимся в ваши варианты для общего потока процессов и каковы его последствия.

Гарантированная доставка

Вы можете настроить количество брокеров, чтобы подтвердить получение ваших сообщений, прежде чем запрос возвращается к вам с параметром ACKs: установив это все вы рассказать брокер, чтобы ждать, пока все реплики не подтвердят ваше сообщение, прежде чем возвращать вам ответ. Это по-прежнему не гарантирует 100%, что ваше сообщение не будет потеряно, поскольку оно еще только написано в кэше страниц, и есть теоретические сценарии, когда брокер терпит неудачу, прежде чем он останется на диске, где сообщение все равно может быть потеряно. Но это такая же хорошая гарантия, как вы собираетесь получить. Вы можете еще больше уменьшить риск потери данных, снизив интервал, при котором брокеры вынуждают fsync на диск (подчеркнули текст и/или flush.ms), но, пожалуйста, имейте в виду, что эти значения могут принести с собой высокую производительность штрафы.

В дополнение к этим настройкам вам нужно будет дождаться, когда ваш производитель Kafka вернет вам ответ на ваш запрос и проверит, произошло ли исключение. Это связано со второй частью вашего вопроса, поэтому я пойду дальше. Если ответ чист, вы можете быть как можно увереннее, чтобы ваши данные попали в Kafka и начали беспокоиться о MariaDB.

Все, что мы рассмотрели до сих пор, касается только того, как обеспечить, чтобы Kafka получал ваши сообщения, но вам также необходимо записывать данные в MariaDB, и это может также потерпеть неудачу, что потребовало бы вызвать сообщение, которое вы, возможно, уже отправлено в Кафку - и этого вы не можете сделать.

Поэтому в основном вы должны выбрать одну систему, в которой вы лучше иметь дело с дубликатами/отсутствующих значений (в зависимости от наличия или отсутствия повторно вы частичные неудачи), и это будет влиять на порядок вы делаете вещи.

Вариант 1

Kafka first

в этой опции вы инициализировать транзакцию в MariaDB, а затем отправить сообщение Кафки, ждать ответа, и если отправить был успешным, вы совершить сделку в MariaDB. Если отправка в Kafka не удалась, вы можете отменить транзакцию в MariaDB, и все будет денди. Если, однако, отправка в Kafka успешна, и ваша фиксация MariaDB по какой-то причине не срабатывает, тогда нет способа вернуть сообщение от Kafka. Таким образом, вы будете либо пропускать сообщение в MariaDB, либо иметь дублирующее сообщение в Kafka, если вы повторно отправите его позже.

Вариант 2

MariaDB first

Это в значительной степени только наоборот, но вы, вероятно, лучше, чтобы удалить сообщение, которое было написано в MariaDB, в зависимости от модели данных.

Конечно, вы можете смягчить оба подхода, отслеживая неудачные попытки отправки и повторив их только позже, но все это больше связано с более сложной проблемой.

Лично я бы пошел с подходом 1, так как вероятность сбоя фиксации должна быть несколько меньше, чем сама передача, и выполнить какую-то проверку обмана на другой стороне Кафки.


Это связано с предыдущим: Я отправить сообщение с:. orderSource.output() отправить (MessageBuilder.withPayload (заказ) .build()); Эта операция асинхронна и ВСЕГДА возвращает значение true, независимо от того, брокера Kafka не работает. Как я могу узнать, что сообщение достигло брокера Kafka?

Теперь, прежде всего, я признаю, что я не знаком с Spring, поэтому это может быть вам не полезно, но следующий фрагмент кода иллюстрирует один из способов проверки ответов на получение исключений. Вызов флеша блокируется до тех пор, пока все посылки не закончились (а также не сработали или не удались), а затем проверьте результаты.

Producer<String, String> producer = new KafkaProducer<>(myConfig); 
final ArrayList<Exception> exceptionList = new ArrayList<>(); 

for(MessageType message : messages){ 
    producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() { 
    @Override 
    public void onCompletion(RecordMetadata metadata, Exception exception) { 
     if (exception != null) { 
     exceptionList.add(exception); 
     } 
    } 
    }); 
} 

producer.flush(); 

if (!exceptionList.isEmpty()) { 
    // do stuff 
} 
+0

Я редактировал вопрос с новым подходом я следующим, однако, учитывая, что вы ответили очень разъяснение я возвращаясь его к оригиналу и запустите новую версию с отредактированной версией. Я вернусь с отзывами о вашем ответе. Благодаря! – codependent

+0

Sönke, все понятно, я ценю подробное объяснение. Для тех, кто заинтересован в том, как обеспечить доставку сообщения с помощью Spring Cloud Stream: https://github.com/spring-cloud/spring-cloud-stream/issues/795 – codependent

+0

@codependent Как получить доступ к шлюзу api или любой другой отправке сначала кафка, а затем два микросервиса подписываются на сообщения кафки .. разве это невозможно? или вы не делаете этого, потому что хотите иметь согласованные данные, а не последовательно? –

2

Я думаю, что правильный путь для реализации Event Sourcing является наличием Кафка заполняются непосредственно из событий подтолкнули плагин, который читает из РСУБДА Двоичных например с помощью Confluent бутилированной воды (https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/) или более активного Debezium (http://debezium.io/). Затем потребление микросервисов может прослушивать эти события, потреблять их и действовать в соответствующих базах данных, что в конечном итоге соответствует базе данных РСУБД.

Посмотрите здесь, чтобы мой полный ответ на рекомендацию: https://stackoverflow.com/a/43607887/986160

 Смежные вопросы

  • Нет связанных вопросов^_^