2015-06-08 2 views
0

Я простую реализацию Слушатель шаблон, который (в сжатом виде) выглядит следующим образом:Что является предметом слушателя в реакторе?

public class EmailProducer implements Runnable { 
    private List<Consumer<EmailMessage>> listeners = new ArrayList<>(); 
    private IdleManager idleManager; 

    public void registerListener(Consumer<EmailMessage> listener) { 
     listeners.add(listener); 
    } 

    public void run() { 
     Session session = Session.getDefaultInstance(new Properties()); 
     idleManager = new IdleManager(session, Executors.newCachedThreadPool()); 
     // use IMAP IDLE to listen for new incoming emails 
     Folder inbox = openInbox(); 
     inbox.addMessageCountListener(new MessageCountAdapter() { 
      public void messagesAdded(MessageCountEvent ev) { 
       Message[] msgs = ev.getMessages(); 
       for (Message msg : msgs) { 
        EmailMessage info = EmailMessage.from(msg); 
        for (Consumer<EmailMessage> listener : listeners) { 
         listener.accept(info); 
        } 
       } 
      } 
     }); 
     try { 
      idleManager.watch(inbox); 
     catch (FolderClosedException|StoreClosedException ex) { 
      run(); // restart, need to re-establish connection 
     } 
    } 
} 

Таким образом, в сущности, это захватывает электронную почту из моего почтового ящика, извлекает информацию из каждого сообщения электронной почты и преобразует их во внутреннее сообщение формат.

После этого обычное Consumer сохранило это сообщение, другое отобразило бы информацию на странице HTML, а еще одна могла бы заставить сторонние системы совершить какое-либо действие. Все это прекрасно.

Теперь я пытаюсь получить дополнительную устойчивость в случае сбоя Consumer. Например, я хочу, чтобы мой EmailMessage работал, если потребитель настойчивости потерпел неудачу (выбросил исключение), закончив обработку. В этом случае я хочу, чтобы обработка была повторена после паузы. Прямо сейчас, если потребитель терпит неудачу, сообщение будет потеряно.

Недавно я узнал о реакторе, и я думаю, что это оркестровка сохранит сообщение и позволит мне делать то, что мне нужно. Тем не менее, я не вижу, как я буду адаптировать мой текущий Runnable в его модель. От documentation кажется, что мне нужно RingBufferProcessor для хранения моих сообщений. Я не могу найти другой путь, чтобы передать процессор моему Runnable, потому что он вызывает метод onNext() вместо метода (ов) отдельного слушателя accept. Я что-то упускаю, или так оно и должно работать?

(Дополнительные куки для показа как мне повторить в условиях отказа потребителя)

ответ

1

Вы можете взглянуть на RingBufferProcessor, который будет позволяет выполнять параллельные задачи с одним EmailMessage. Вы бы установить его что-то вроде:

ReactorProcessor<Message, Message> processor = RingBufferProcessor.share("email-processor", 1024); 
Stream<Message> s = Streams.wrap(processor); 

s.consume(m -> firstTask(m)); 
s.consume(m -> secondTask(m)); 
s.consume(m -> thirdTask(m)); 

это будет распараллелить работу на 3 отдельных нитей (1 на Consumer). Чтобы делать повторы или ошибки захвата, вам нужно либо Stream.retry, либо Stream.retryWhen (для отсрочки), либо просто Stream.when (что не дает вам сообщения, вызвавшего ошибку).

s.retry(t -> t instanceof IllegalStateException) 
.consume(m -> retryMessage(m)); 

s.when(NumberFormatException.class, e -> e.printStackTrace()); 

Затем публиковать сообщения в этом Processor, просто позвоните onNext:

for(EmailMessage msg : msgs) { 
    processor.onNext(msg); 
} 
+0

Наконец нашел время, чтобы осуществить это, спасибо! Я решил оставить «Продюсер» в списке «Потребитель», потому что это отдельный продукт от того, который использует реактор, но простой «производитель.registerListener (processor :: onNext)» должен отлично выполнять эту работу. – mabi