2016-01-11 4 views
3

У меня есть следующие классы:Как использовать метод "removeIf" с помощью предиката в ArrayBlockingQueue

WorkerTask.java

public interface WorkerTask extends Task { 

    // Constants 
    public static final short WORKERTASK_SPIDER = 1; 
    public static final short WORKERTASK_PARSER = 2; 
    public static final short WORKERTASK_PRODUCT = 3; 

    public int getType(); 
} 

WorkerPool.java

class workerPool { 

    private ThreadPoolExecutor executorPool_; 

    //---------------------------------------------------- 

    public WorkerPool(int poolSize) 
    { 
     executorPool_ = new ThreadPoolExecutor(
      poolSize,5,10,TimeUnit.SECONDS, 
      new ArrayBlockingQueue<Runnable>(10000000,false), 
      Executors.defaultThreadFactory() 
    ); 

    //----------------------------------------------------   

    public void assign(WorkerTask workerTask) { 
     executorPool_.execute(new WorkerThread(workerTask)); 
    } 

    //---------------------------------------------------- 

    public void removeTasks(int siteID) { 
     executorPool_.getQueue().removeIf(...);  
    } 
} 

Я хочу вызовите метод removeTasks, чтобы удалить определенное количество ожидающих задач, но я не знаю, как использовать метод removeIf. В нем говорится: Удаляет все элементы этой коллекции, которые удовлетворяют заданному предикату, но я понятия не имею, как создать параметр Predicate. Есть идеи?

ответ

2

Если у вас Queue<WorkerTask>, вы могли бы сделать что-то вроде этого:

queue.removeIf(task -> task.getSiteID() == siteID) 

Есть несколько проблем. Одна из проблем заключается в том, что очередь, которую вы получаете от getQueue(), составляет BlockingQueue<Runnable>, а не Queue<WorkerTask>. Если вы отправляете в пул Runnable экземпляров, очередь может содержать ссылки на ваши фактические задачи; если это так, вы можете опустить их до WorkerTask. Однако это не гарантируется. Кроме того, класс док для ThreadPoolExecutor говорит (в разделе «Очередь обслуживания»):

Метод getQueue() позволяет получить доступ к рабочей очереди для целей мониторинга и отладки. Использование этого метода для любых других целей категорически не рекомендуется. Доступны два предложенных метода: remove(Runnable) и purge(), чтобы помочь в рекультивации хранилища, когда большое количество поставленных задач будет отменено.

Глядя на метод remove(Runnable), его доктор говорит

может не удалять задачи, которые были преобразованы в другие формы перед помещением на внутренней очереди.

Это говорит о том, что вы должны висеть на Runnable экземпляров, которые были представлены для того, чтобы вызвать remove() на них позже. Или позвоните submit(Runnable), чтобы получить Future и сохраните эти экземпляры, чтобы их отменить.

Но есть также вторая проблема, которая, вероятно, делает этот подход неадекватным. Предположим, вы нашли способ удалить или отменить соответствующие задачи из очереди. Другой поток, возможно, решил отправить новую задачу, которая соответствует, но еще не отправила ее. Здесь есть состояние гонки. Возможно, вы сможете отменить установленные задачи, но после этого вы не можете гарантировать, что новые сопоставимые задачи не были отправлены.

Вот альтернативный подход. Предположительно, когда вы отменяете (или что-то еще) идентификатор сайта, есть некоторая логика где-то, чтобы прекратить отправку новых задач, соответствующих идентификатору стороны. Проблема заключается в том, как решать совпадающие задачи, которые «находятся в полете», то есть находятся в очереди или находятся в очереди.

Вместо того, чтобы отменить соответствующие задачи, измените задачу так, чтобы, если ее идентификатор сайта был отменен, задача превращается в no-op. Вы можете записать отмену идентификатора сайта, например, ConcurrentHashMap. Любая задача будет проверять эту карту до начала ее работы, и если идентификатор сайта присутствует, он просто вернется. Добавление идентификатора сайта на карту немедленно повлияет на то, что новая задача на этом идентификаторе сайта не будет начата. (Задачи, которые уже были запущены, будут завершены.) Любые задачи в полете в конечном итоге будут стекать из очереди, не вызывая никакой реальной работы.

0

Предикат - это функция, которая получает вход и возвращает логическое значение.

Если вы используете Java 8 вы можете использовать лямбда-выражения: (elem) -> return elem.id == siteID