Я пытаюсь написать службу почты Batch, который имеет два метода:Producer/Consumer с партией и скрытой функциональностью
add(Mail mail)
: Письма могут посылаться, вызываются Производителями
flushMailService()
: промывать обслуживание. Потребители должны взять список и вызвать другой (дорогой) метод. Обычно дорогой метод следует вызывать только после достижения размера партии.
Это несколько похоже на этот вопрос: Producer/Consumer - producer adds data to collection without blocking, consumer consumes data from collection in batch
можно сделать с помощью poll()
, которая имеет время ожидания. Но производитель должен иметь возможность очищать почтовую службу, если она не хочет ждать таймаута, но заставляет производителя отправлять любые письма, которые находятся в очереди.
poll(20, TimeUnit.SECONDS)
может быть прерван. Если это прерывается, все письма в очереди должны быть отправлены независимо от того, достигнут ли размер партии до тех пор, пока очередь не будет пуста (с использованием poll()
, которая немедленно возвращает null
, если очередь пуста. После того, как она пуста, письма, которые были отправлены продюсер, который прервал бы быть уже отправлен. затем, производитель должен вызвать то блокирующие версию poll
снова, пока не прерывается любым другим производителем и так далее.
Это похоже на работу с данной реализацией.
Я попытался использовать ExecutorServices с Futures, но кажется, что Будущее может быть прервано только , так как они считается отмененным после первого прерывания. Поэтому я прибегал к Threads, которые можно прерывать несколько раз.
В настоящее время у меня есть следующая реализация, которая, кажется, работает (но использует «сырые» потоки).
Это разумный подход? Или, может быть, можно использовать другой подход?
public class BatchMailService {
private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>();
private CopyOnWriteArrayList<Thread> threads = new CopyOnWriteArrayList<>();
private static Logger LOGGER = LoggerFactory.getLogger(BatchMailService.class);
public void checkMails() {
int batchSize = 100;
int timeout = 20;
int consumerCount = 5;
Runnable runnable =() -> {
boolean wasInterrupted = false;
while (true) {
List<Mail> buffer = new ArrayList<>();
while (buffer.size() < batchSize) {
try {
Mail mail;
wasInterrupted |= Thread.interrupted();
if (wasInterrupted) {
mail = queue.poll(); // non-blocking call
} else {
mail = queue.poll(timeout, TimeUnit.SECONDS); // blocking call
}
if (mail != null) { // mail found immediately, or within timeout
buffer.add(mail);
} else { // no mail in queue, or timeout reached
LOGGER.debug("{} all mails currently in queue have been processed", Thread.currentThread());
wasInterrupted = false;
break;
}
} catch (InterruptedException e) {
LOGGER.info("{} interrupted", Thread.currentThread());
wasInterrupted = true;
break;
}
}
if (!buffer.isEmpty()) {
LOGGER.info("{} sending {} mails", Thread.currentThread(), buffer.size());
mailService.sendMails(buffer);
}
}
};
LOGGER.info("starting 5 threads ");
for (int i = 0; i < 5; i++) {
Thread thread = new Thread(runnable);
threads.add(thread);
thread.start();
}
}
public void addMail(Mail mail) {
queue.add(mail);
}
public void flushMailService() {
LOGGER.info("flushing BatchMailService");
for (Thread t : threads) {
t.interrupt();
}
}
}
Другой подход без прерывания, но вариант яд таблетки (Mail POISON_PILL = new Mail()
) может быть следующим. Вероятно, лучше всего работает, когда есть одна потребительская нить. По крайней мере, для одной ядовитой таблетки будет продолжаться только один потребитель.
Runnable runnable =() -> {
boolean flush = false;
boolean shutdown = false;
while (!shutdown) {
List<Mail> buffer = new ArrayList<>();
while (buffer.size() < batchSize && !shutdown) {
try {
Mail mail;
if (flush){
mail = queue.poll();
if (mail == null) {
LOGGER.info(Thread.currentThread() + " all mails currently in queue have been processed");
flush = false;
break;
}
}else {
mail = queue.poll(5, TimeUnit.SECONDS); // blocking call
}
if (mail == POISON_PILL){ // flush
LOGGER.info(Thread.currentThread() + " got flush");
flush = true;
}
else if (mail != null){
buffer.add(mail);
}
} catch (InterruptedException e) {
LOGGER.info(Thread.currentThread() + " interrupted");
shutdown = true;
}
}
if (!buffer.isEmpty()) {
LOGGER.info(Thread.currentThread()+"{} sending " + buffer.size()+" mails");
mailService.sendEmails(buffer);
}
}
};
public void flushMailService() {
LOGGER.info("flushing BatchMailService");
queue.add(POISON_PILL);
}
Ну если чеки производителя размера очереди и флешей, когда размер пакета достигается тогда, если у вас есть несколько потребителей, возможно, они не называют несколько раз почтовой службы с меньшим количеством писем? – user140547
Это было бы, но есть блокировка, чтобы это не происходило. Там может быть ошибка, которую я пропускаю, но, как я вижу, она работает правильно. 'addMail()' приобретает 'fluskLock' перед проверкой' queue.size() 'во второй раз. Когда 'checkMails()' awakens from 'await()' он также получает 'flushLock' перед тем, как продолжить флеш. [Ссылка] (https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/locks/Condition.html # awaitNanos (long)) –
Я запустил код с 5 параллельными производителями и 1500 почтой. Там, где нет дубликатов писем, и ни одна из партий не имела в нем меньше элементов BATCH_SIZE, хотя их часто было больше. –