2016-10-27 5 views
0

У меня есть 1 основной поток, который запускает n дочерние потоки. Каждый из этих дочерних потоков постоянно создает новое событие и добавляет его в общую очередь. Это событие представляет собой последнее состояние сложного вычисления для дочернего потока.Параллельная очередь на Java, которая сохраняет только последний элемент каждого дочернего потока

Главный поток потребляет эту общую очередь, но не заинтересован в обработке всех событий, все еще находящихся в очереди: если один дочерний поток отправил 3 очереди в очередь, основной поток интересует только последний. Более старые события этого дочернего потока можно отбросить (как только дочерний поток добавит новое событие).

Например:

childThread A adds event A1 
mainThread removes event A1 

childThread B adds event B1 
childThread B adds event B2 // => B1 should be discarded 
childThread B adds event B3 // => B2 should be discarded 
mainThread removes event B3 

childThread A adds event A2 
childThread A adds event A3 // => A2 should be discarded 
childThread B adds event B4 
mainThread removes event A3 
mainThread removes event B4 

childThread B adds event B5 
childThread A adds event A4 
childThread A adds event A5 // => A4 should be discarded 
childThread B adds event B6 // => B5 should be discarded 
childThread A adds event A6 // => A5 should be discarded 
mainThread removes event B6 // do B6 first because B5 was before A4 
mainThread removes event A6 

Дополнительное требование: Основной поток действительно хочет круговой системе как можно более событий дочерних потоков, но по-прежнему блокировать, если ни один из дочерних потоков не производят.

ответ

1

Я хотел бы использовать два DataStructures: 1 BlockingQueue для «триггеры» и 1 Карта для ссылки на последние события.

EventSource будет:

  1. На новом событии сделать запись в карте (обновление более старой, когда ключ уже существует) Key = "это" (EventSource), Value = Последние события
  2. На новый Событие делает запись в BlockingQueue с «this» (EventSource), но только если там еще нет.

Последняя деталь довольно необязательна. Это просто уменьшает безуспешные поиски на карте. Вы могли бы просто добавить триггеры для каждого нового события. Каждый раз, когда поиск отрицателен, если ключ на карте просто игнорирует его.

Потребитель:

  1. Надейся на очереди блокировки для не пусто.
  2. Удалить верхнюю часть, использовать ее в качестве ключа в Event-Map ...
  3. Ключ не найден: продолжить со следующей строкой.
  4. Key найдено: удалить запись и значение процесса (который является событие)
  5. Loop к 1.
1

Используйте обычный BlockingQueue, но объект-узел, содержащийся в очереди, имеет синхронизированный флаг с именем wasConsumed. Он начинается с false, и основной потребитель устанавливает его на true всякий раз, когда он начинает работать.

В каждом потоке хранится последний элемент, который был установлен в очередь. Если поток имеет обновление, а последний элемент не был использован, он обновляет элемент, помещенный в очередь, используя ту же синхронизацию, что и раньше. В противном случае он завершает новую работу.

class UpdateableNode<T> 
{ 
    private boolean wasConsumed; 
    private T task; 

    // main consumer calls this BEFORE processing the task 
    synchronized void startConsuming() 
    { 
    wasConsumed=true; 
    } 

    // producer tries to update the task if it wasn't consumed 
    synchronized boolean tryUpdate(T newTask) 
    { 
    if (wasConsumed) 
     return false; 
    task = newTask; 
    return true; 
    } 
} 

UPDATE:

После понимания предыдущей концепции, то же самое можно управлять с помощью AtomicReference вместо добытой UpdatableNode

потребителей код:

  1. Поп в AtomicReference<Task> ref от BlockingQueue
  2. Получить задание, вызвав Task t = ref.getAndSet(null), это очистит задачу от ref и вернет ее атомным способом (без блокировки на большинстве платформ). Код

Производитель:

  1. Держите последнюю задачу Task lastTask
  2. Держите последний реф AtomicReference<Task> lastRef
  3. Когда новая задача прибывает попытаться обновить lastRef.compareAndSet(lastTask, newTask) он будет работать только тогда, когда старая задача по-прежнему в ref и не потреблялось.
  4. Если это не удается, создайте новый AtomicReference<Task>, запишите его и сохраните в lastRef.
  5. В обоих случаях, за исключением newTask в lastTask
+0

Не должно быть 'wasConsumed = true', а удаление этого узла из очереди должно быть в одном и том же« синхронизированном »блоке (так что добавление не может пересекаться в состоянии гонки)? –

+0

Нет. Даже если он был удален из очереди, он все равно может быть обновлен, если потребитель не начал его потреблять. – Shloim

0

Вы можете использовать CorrurentMap, который принимает в качестве ключа Нить, и в качестве значения этого события. Поскольку для Карт у вас может быть только одно значение для каждого ключа, это именно то, что вы ищете. И поскольку он разделяется многими Threads, он идеально подходит для одновременной версии.

Проверьте документы на одном из CorrurentMap реализаций ConcurrentHashMap: http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ConcurrentHashMap.html

EDIT:

Подумав немного, я придумал возможным решением. это просто, но я надеюсь, что это поможет:

public class EventHandler implements Runnable { 

    private final BlockingQueue<Thread> blockingQueue; 
    private final ConcurrentMap<Thread, Event> eventMap; 
    private final AtomicReference<Boolean> machineState; 

    public EventHandler() { 
     int threadNumber = 3; 
     blockingQueue = new ArrayBlockingQueue<Thread>(threadNumber); 
     eventMap = new ConcurrentHashMap<Thread, Event>(); 
     machineState = new AtomicReference(Boolean.TRUE); 
    } 

    @Override 
    public void run() { 
     while (machineState.get()) { 
      Event toConsume = null; 

      //We don't want Threads to medle in while we are removing it from the map AND the queue; 
      synchronized (this) { 
       Thread eventParent = null; 
       try { 
        eventParent = blockingQueue.take(); 
       } 
       catch (InterruptedException ex) { 
        //Exception Handling; 
       } 

       toConsume = eventMap.remove(eventParent); 
      } 

      runEvent(toConsume); 
     } 
    } 

    private void runEvent(Event toConsume) { 
     //Event Handling 
    } 

    //Notice that this method is syncronized 
    public synchronized void addEvent(Event event) { 
     Thread thisThread = Thread.currentThread(); 
     eventMap.put(thisThread, event); 

     //Now checking and removing old unpoped Events, if any 

     for (Iterator<Thread> iterator = blockingQueue.iterator() ; iterator.hasNext() ;) { 
      Thread next = iterator.next(); 
      if(next == thisThread) { 
       iterator.remove(); 
       break; 
      } 
     } 
     blockingQueue.offer(thisThread); 
    } 

    //Other methods... 

} 

Надеюсь, я помог.

Имейте славный день. :)

+1

как вы используете эту карту? –

+0

Что вы подразумеваете под потреблением? Очистить карту? Просто используйте 'map.clear();'. –

+1

Я имею в виду, как вы получаете элементы с этой карты, когда есть доступные элементы, пока они не заняты, когда их нет. Также проверьте ответ Fildor. –

0

Вы можете использовать очередь атомных ссылок. Ссылки содержат последний элемент. Когда потребитель потребляет элемент, он также очистит исходную информацию через getAndSet. Следовательно, производящий поток, который сохраняет ссылку, может проверить, был ли предмет потреблен. Если нет, он обновит ссылку через compareAndSet, в противном случае она будет помещена в ссылку.

Как

ExecutorService es=Executors.newCachedThreadPool(); 
BlockingQueue<AtomicReference<String>> queue=new ArrayBlockingQueue<>(20); 
es.execute(() -> { // consumer 
    while(!Thread.interrupted()) try { 
     String item=queue.take().getAndSet(null); 
     System.out.println("consuming "+item); 
     Thread.sleep(10);// simulate workload 
    } catch(InterruptedException ex) { break; } 
}); 
for(int i=0; i<20; i++) { // schedule multiple producers 
    String name="T"+i; 
    es.execute(() -> { // producer 
     AtomicReference<String> lastItem=new AtomicReference<>(); 
     for(int item=0; item<100; item++) try { 
      String current=name+" - item "+item; 
      String last=lastItem.get(); 
      if(last==null || !lastItem.compareAndSet(last, current)) { 
       lastItem.set(current); 
       queue.put(lastItem); 
      } 
      Thread.sleep(5);// simulate workload 
     } 
     catch(InterruptedException ex) { 
      System.err.println(name+" interrupted"); 
      break; 
     } 
    }); 
} 

это имеет некоторую степень «справедливости», так как после того, как один атомарный ссылка израсходован, он будет поставлен в конце очереди. Если вы построите очередь с fair параметром, установленным на true, он действительно будет демонстрировать цикличное поведение, подобное тому, как только все потоки в первый раз обрели свою ссылку. Если, однако, поток начинается так медленно, что ему не удается вставить свой первый элемент перед тем, как другой завершит свой последний элемент, это будет не совсем справедливо, но я полагаю, вы не хотите принуждать «справедливость» в соответствии с такие условия в любом случае.