Я простую реализацию Слушатель шаблон, который (в сжатом виде) выглядит следующим образом:Что является предметом слушателя в реакторе?
public class EmailProducer implements Runnable {
private List<Consumer<EmailMessage>> listeners = new ArrayList<>();
private IdleManager idleManager;
public void registerListener(Consumer<EmailMessage> listener) {
listeners.add(listener);
}
public void run() {
Session session = Session.getDefaultInstance(new Properties());
idleManager = new IdleManager(session, Executors.newCachedThreadPool());
// use IMAP IDLE to listen for new incoming emails
Folder inbox = openInbox();
inbox.addMessageCountListener(new MessageCountAdapter() {
public void messagesAdded(MessageCountEvent ev) {
Message[] msgs = ev.getMessages();
for (Message msg : msgs) {
EmailMessage info = EmailMessage.from(msg);
for (Consumer<EmailMessage> listener : listeners) {
listener.accept(info);
}
}
}
});
try {
idleManager.watch(inbox);
catch (FolderClosedException|StoreClosedException ex) {
run(); // restart, need to re-establish connection
}
}
}
Таким образом, в сущности, это захватывает электронную почту из моего почтового ящика, извлекает информацию из каждого сообщения электронной почты и преобразует их во внутреннее сообщение формат.
После этого обычное Consumer
сохранило это сообщение, другое отобразило бы информацию на странице HTML, а еще одна могла бы заставить сторонние системы совершить какое-либо действие. Все это прекрасно.
Теперь я пытаюсь получить дополнительную устойчивость в случае сбоя Consumer
. Например, я хочу, чтобы мой EmailMessage
работал, если потребитель настойчивости потерпел неудачу (выбросил исключение), закончив обработку. В этом случае я хочу, чтобы обработка была повторена после паузы. Прямо сейчас, если потребитель терпит неудачу, сообщение будет потеряно.
Недавно я узнал о реакторе, и я думаю, что это оркестровка сохранит сообщение и позволит мне делать то, что мне нужно. Тем не менее, я не вижу, как я буду адаптировать мой текущий Runnable
в его модель. От documentation кажется, что мне нужно RingBufferProcessor
для хранения моих сообщений. Я не могу найти другой путь, чтобы передать процессор моему Runnable
, потому что он вызывает метод onNext()
вместо метода (ов) отдельного слушателя accept
. Я что-то упускаю, или так оно и должно работать?
(Дополнительные куки для показа как мне повторить в условиях отказа потребителя)
Наконец нашел время, чтобы осуществить это, спасибо! Я решил оставить «Продюсер» в списке «Потребитель», потому что это отдельный продукт от того, который использует реактор, но простой «производитель.registerListener (processor :: onNext)» должен отлично выполнять эту работу. – mabi