0

Я изучаю параллельное программирование и написал этот concurrentLinkeQueue с использованием AtomicReference.Выполнение ConcurrentLinkQueue вступает в тупик

Следующий пример относится к тупиковой ситуации. Пожалуйста, посмотри.

package concurrent.AtomicE; 

import java.util.concurrent.atomic.AtomicReference; 

public class ConcurrentLinkQueue<V> { 
private AtomicReference<Node> head = new AtomicReference<Node>(); 

public void offer(final V data) { 
    final Node<V> newNode = new Node<V>(data,Thread.currentThread().getName()); 
    System.out.println("*********** NEW "+ newNode); 
    AtomicReference<Node> pointer = head; 
    for(;;){ 
     if(pointer.get() == null){ // Threads wait here for infinite time 
      final boolean success = pointer.compareAndSet(null,newNode); 
      System.out.println(Thread.currentThread().getName() +" " + success); 
      if(success) 
      { 
       System.out.println(Thread.currentThread().getName() +"Returning"); 
       return; 
      }else{ 
       final Node<V> current = pointer.get(); 
       pointer = current.next; 
       System.out.println(Thread.currentThread().getName() +" Next Pointer"); 
      } 
     } 
    } 
} 

public void printQueueData(){ 
    AtomicReference<Node> pointer = head; 
    for(;pointer!=null;){ 
     final Node node = pointer.get(); 
     System.out.println(node); 
     pointer = node.next; 
    } 
} 

private static class Node<V>{ 
    private AtomicReference<Node> next; 
    private volatile V data = null; 
    private String threadName = ""; 

    Node(V data1,String threadName){ 
     this.data = data1; 
     this.threadName = threadName; 
    } 

    @Override 
    public String toString() { 
     return "threadName=" + threadName + 
       ", data=" + data; 
    } 

    private AtomicReference<Node> getNext() { 
     return next; 
    } 

    private void setNext(AtomicReference<Node> next) { 
     this.next = next; 
    } 

    private V getData() { 
     return data; 
    } 

    private void setData(V data) { 
     this.data = data; 
    } 
} 

}

 package concurrent.AtomicE; 

import java.util.concurrent.Executors; 
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; 

public class Main { 
private static final ConcurrentLinkQueue<Integer> clq = new ConcurrentLinkQueue<Integer>(); 

public static void main(String[] args) throws InterruptedException { 
    Task t = new Task(); 
    Thread t1 = new Thread(t); t1.setName("t1"); 
    Thread t2 = new Thread(t); t2.setName("t2"); 
    //Thread t3 = new Thread(t); t3.setName("t3"); 
    //Thread t4 = new Thread(t); t4.setName("t4"); 
    //Thread t5 = new Thread(t); t5.setName("t5"); 

    t1.start(); 
    t2.start(); 
    //t3.start(); 
    //t4.start(); 
    //t5.start(); 

    t1.join(); 
    t2.join(); 
    //t3.join(); 
    //t4.join(); 
    //t5.join(); 


} 

private static class Task implements Runnable{ 

    @Override 
    public void run() { 
     for(int i=0;i<5;++i){ 
      clq.offer(i); 
     } 
    } 
} 

}

после приема нити дамп показывает, что потоки ждать вечно на следующую строчку

if(pointer.get() == null){ // Threads wait here for infinite time 

пожалуйста, вы можете помочь, почему потоки ждать здесь навсегда?

[EDIT] решаемые его --->

public class ConcurrentLinkQueue<V> { 
    private final AtomicReference<Node> firstNodePointer = new AtomicReference<Node>(); 

public void offer(final V data) { 
    final Node<V> newNode = new Node<V>(data,Thread.currentThread().getName()); 
    System.out.println(newNode); 
    final Node<Integer> firstNode = firstNodePointer.get(); 
    if(firstNode == null){ 
     if(firstNodePointer.compareAndSet(null,newNode) == true) 
      return; 
    } 
    boolean success = false; 
    Node<Integer> nodePointer = firstNode; 
    AtomicReference<Node> atomicRefPointer = firstNodePointer; 
    while(!success){ 
     atomicRefPointer = nodePointer.getNext(); 
     if(atomicRefPointer.get() == null){ 
      success = atomicRefPointer.compareAndSet(null,newNode); 
     }else{ 
      nodePointer = atomicRefPointer.get(); 
     } 
    } 
} 

}

Другой решение->

 public void fastOffer(final V data){ 
    final Node<V> newNode = new Node<V>(data,Thread.currentThread().getName()); 
    System.out.println(newNode); 
    AtomicReference<Node> pointer = firstNodePointer; 
    for(;;){ 
      if(pointer.compareAndSet(null,newNode)){ 
       return; 
      } 

     pointer = pointer.get().getNext(); 
    } 
} 
+0

Сколько раз он проходит цикл, прежде чем он застрянет? – tbodt

+0

только 1 нить succeddes остаточный тупик первый раз. и поток, который превзошел stucks 2-й раз – HakunaMatata

+1

Я не думаю, что этот код является потокобезопасным, я подозреваю, что два потока попадают в 'if' с тем же' head', а затем случается, что 'next' больше не указывает на' null' - такой бесконечный цикл ... Если вы хотите выполнить несколько операций атомарно, вам нужно синхронизировать. –

ответ

0

В вашем примере условие pointer.get() == null всегда возвращает false excepts первый случай, когда вы назначьте его head, потому что в Node класс это null , Вы можете присвоить его по умолчанию и удалить нулевую проверку.

Я предлагаю вам изменить класс битого узла, сделать его неизменным:

private static class Node<V> { 
     private final AtomicReference<Node> next = new AtomicReference<>(); 
     private final V data; 
     private final String threadName; 

     Node(V data1, String threadName) { 
      this.data = data1; 
      this.threadName = threadName; 
     } 
    } 

И тогда вы можете просто пройти через все элементы:

private final AtomicReference<Node> head = new AtomicReference<>(); 

@SuppressWarnings("unchecked") 
public void offer(final V data) { 
    // create new Node 
    final Node<V> newNode = new Node<>(data, Thread.currentThread().getName()); 
    // set root element if it's null 
    if (head.compareAndSet(null, newNode)) { 
     return; 
    } 
    // else pass trough all elements and try to set new 
    Node<V> pointer = head.get(); 
    for (;;) { 
     if (pointer.next.compareAndSet(null, newNode)) { 
      break; 
     } 
     pointer = pointer.next.get(); 
    } 
} 

И изменить метод печати:

@SuppressWarnings("unchecked") 
    public void printQueueData() { 
     AtomicReference<Node> pointer = head; 
     while (pointer.get() != null) { 
      System.out.println(pointer.get().data); 
      pointer = pointer.get().next; 
     } 
    } 
+0

Ваша программа кажется правильной, я изменил ее и ее работу отлично. Правильно ли это? public void fastOffer (окончательные данные V) { final Node newNode = новый Узел (данные, Thread.currentThread(). getName()); System.out.println (newNode); AtomicReference pointer = firstNodePointer; for (;;) { if (pointer.get() == null) { if (pointer.compareAndSet (null, newNode)) { return; } } pointer = pointer.get(). GetNext(); } } – HakunaMatata

+0

Да, но я думаю состояние 'если (указатель.get() == null) 'не обязательно, потому что следующий' compareAndSet' делает то же самое. – aim