2016-06-09 4 views
1

Я пытаюсь написать службу почты Batch, который имеет два метода:Producer/Consumer с партией и скрытой функциональностью

add(Mail mail): Письма могут посылаться, вызываются Производителями

flushMailService(): промывать обслуживание. Потребители должны взять список и вызвать другой (дорогой) метод. Обычно дорогой метод следует вызывать только после достижения размера партии.

Это несколько похоже на этот вопрос: Producer/Consumer - producer adds data to collection without blocking, consumer consumes data from collection in batch

можно сделать с помощью poll(), которая имеет время ожидания. Но производитель должен иметь возможность очищать почтовую службу, если она не хочет ждать таймаута, но заставляет производителя отправлять любые письма, которые находятся в очереди.

poll(20, TimeUnit.SECONDS) может быть прерван. Если это прерывается, все письма в очереди должны быть отправлены независимо от того, достигнут ли размер партии до тех пор, пока очередь не будет пуста (с использованием poll(), которая немедленно возвращает null, если очередь пуста. После того, как она пуста, письма, которые были отправлены продюсер, который прервал бы быть уже отправлен. затем, производитель должен вызвать то блокирующие версию poll снова, пока не прерывается любым другим производителем и так далее.

Это похоже на работу с данной реализацией.

Я попытался использовать ExecutorServices с Futures, но кажется, что Будущее может быть прервано только , так как они считается отмененным после первого прерывания. Поэтому я прибегал к Threads, которые можно прерывать несколько раз.

В настоящее время у меня есть следующая реализация, которая, кажется, работает (но использует «сырые» потоки).

Это разумный подход? Или, может быть, можно использовать другой подход?

public class BatchMailService { 
    private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>(); 
    private CopyOnWriteArrayList<Thread> threads = new CopyOnWriteArrayList<>(); 
    private static Logger LOGGER = LoggerFactory.getLogger(BatchMailService.class); 

    public void checkMails() { 

     int batchSize = 100; 
     int timeout = 20; 
     int consumerCount = 5; 

     Runnable runnable =() -> { 
      boolean wasInterrupted = false; 

      while (true) { 
       List<Mail> buffer = new ArrayList<>(); 
       while (buffer.size() < batchSize) { 
        try { 
         Mail mail; 
         wasInterrupted |= Thread.interrupted(); 
         if (wasInterrupted) { 
          mail = queue.poll(); // non-blocking call 
         } else { 
          mail = queue.poll(timeout, TimeUnit.SECONDS); // blocking call 
         } 
         if (mail != null) { // mail found immediately, or within timeout 
          buffer.add(mail); 
         } else { // no mail in queue, or timeout reached 
          LOGGER.debug("{} all mails currently in queue have been processed", Thread.currentThread()); 
          wasInterrupted = false; 
          break; 
         } 
        } catch (InterruptedException e) { 
         LOGGER.info("{} interrupted", Thread.currentThread()); 
         wasInterrupted = true; 
         break; 
        } 
       } 
       if (!buffer.isEmpty()) { 
        LOGGER.info("{} sending {} mails", Thread.currentThread(), buffer.size()); 
        mailService.sendMails(buffer); 
       } 
      } 
     }; 

     LOGGER.info("starting 5 threads "); 
     for (int i = 0; i < 5; i++) { 
      Thread thread = new Thread(runnable); 
      threads.add(thread); 
      thread.start(); 
     } 

    } 

    public void addMail(Mail mail) { 
     queue.add(mail); 
    } 

    public void flushMailService() { 
     LOGGER.info("flushing BatchMailService"); 
     for (Thread t : threads) { 
      t.interrupt(); 
     } 
    } 
} 

Другой подход без прерывания, но вариант яд таблетки (Mail POISON_PILL = new Mail()) может быть следующим. Вероятно, лучше всего работает, когда есть одна потребительская нить. По крайней мере, для одной ядовитой таблетки будет продолжаться только один потребитель.

Runnable runnable =() -> { 
     boolean flush = false; 
     boolean shutdown = false; 

     while (!shutdown) { 
      List<Mail> buffer = new ArrayList<>(); 
      while (buffer.size() < batchSize && !shutdown) { 
       try { 
        Mail mail; 
        if (flush){ 
         mail = queue.poll(); 
         if (mail == null) { 
          LOGGER.info(Thread.currentThread() + " all mails currently in queue have been processed"); 
          flush = false; 
          break; 
         } 
        }else { 
         mail = queue.poll(5, TimeUnit.SECONDS); // blocking call 
        } 

        if (mail == POISON_PILL){ // flush 
         LOGGER.info(Thread.currentThread() + " got flush"); 
         flush = true; 
        } 
        else if (mail != null){ 
         buffer.add(mail); 
        } 
       } catch (InterruptedException e) { 
        LOGGER.info(Thread.currentThread() + " interrupted"); 
        shutdown = true; 
       } 
      } 
      if (!buffer.isEmpty()) { 
       LOGGER.info(Thread.currentThread()+"{} sending " + buffer.size()+" mails"); 
       mailService.sendEmails(buffer); 
      } 
     } 
    }; 

public void flushMailService() { 
    LOGGER.info("flushing BatchMailService"); 
    queue.add(POISON_PILL); 
} 

ответ

1

Как насчет использования сигнала и ожидания вместо прерывания?

Производители помещают почту и сигнал, если ее необходимо очистить. Диспетчер ждет сигнала или таймаута и продолжает отправлять электронные письма в потоках потребителей.

import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.ExecutorService; 
import java.util.concurrent.Executors; 
import java.util.concurrent.LinkedBlockingQueue; 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.Lock; 
import java.util.concurrent.locks.ReentrantLock; 

public class BatchMailService { 

    private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>(); 

    public static final int BATCH_SIZE = 100; 
    public static final int TIMEOUT = 20; 
    public static final int CONSUMER_COUNT = 5; 

    private final Lock flushLock = new ReentrantLock(); 
    private final Condition flushCondition = flushLock.newCondition(); 

    MailService mailService = new MailService(); 

    public void checkMails() { 

     ExecutorService consumerExecutor = Executors.newFixedThreadPool(CONSUMER_COUNT); 

     while (true) { 

      try { 
       // wait for timeout or for signal to come 
       flushLock.lock(); 
       flushCondition.await(TIMEOUT, TimeUnit.SECONDS); 

       // flush all present emails 
       final List<Mail> toFLush = new ArrayList<>(); 
       queue.drainTo(toFLush); 

       if (!toFLush.isEmpty()) { 
        consumerExecutor.submit(() -> { 
         LOGGER.info("{} sending {} mails", Thread.currentThread(), toFLush.size()); 
         mailService.sendEmails(toFLush); 
        }); 
       } 

      } catch (InterruptedException e) { 
       Thread.currentThread().interrupt(); 
       break; // terminate execution in case of external interrupt 
      } finally { 
       flushLock.unlock(); 
      } 
     } 

    } 

    public void addMail(Mail mail) { 

     queue.add(mail); 

     // check batch size and flush if necessary 
     if (queue.size() >= BATCH_SIZE) { 

      try { 
       flushLock.lock(); 
       if (queue.size() >= BATCH_SIZE) { 
        flushMailService(); 
       } 
      } finally { 
       flushLock.unlock(); 
      } 
     } 
    } 

    public void flushMailService() { 
     LOGGER.info("flushing BatchMailService"); 
     try { 
      flushLock.lock(); 
      flushCondition.signal(); 
     } finally { 
      flushLock.unlock(); 
     } 
    } 

} 
+0

Ну если чеки производителя размера очереди и флешей, когда размер пакета достигается тогда, если у вас есть несколько потребителей, возможно, они не называют несколько раз почтовой службы с меньшим количеством писем? – user140547

+0

Это было бы, но есть блокировка, чтобы это не происходило. Там может быть ошибка, которую я пропускаю, но, как я вижу, она работает правильно. 'addMail()' приобретает 'fluskLock' перед проверкой' queue.size() 'во второй раз. Когда 'checkMails()' awakens from 'await()' он также получает 'flushLock' перед тем, как продолжить флеш. [Ссылка] (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html # awaitNanos (long)) –

+0

Я запустил код с 5 параллельными производителями и 1500 почтой. Там, где нет дубликатов писем, и ни одна из партий не имела в нем меньше элементов BATCH_SIZE, хотя их часто было больше. –

 Смежные вопросы

  • Нет связанных вопросов^_^