2016-06-01 4 views
2

В этом сегменте кода я создаю трубку и прикрепляю сканер на одном конце и PrintStream на другом, чтобы связываться между несколькими потоками потребителей и производителей. Затем я создаю и запускаю три потока:Java: два потока, сообщающихся через потоки, - компания, три - толпа

  1. Первый поток - потребительский поток. Он проверяет Сканер, чтобы увидеть, доступна ли строка текста для потребления, потребляет, печатает на стандартный вывод, а затем спит в течение нескольких миллисекунд, а затем повторяет. Если вам нечего потреблять, тогда он печатает сообщение об этом, спит и повторяет.

  2. Вторая нить в этом сегменте кода ничего не делает. Подробнее об этом ниже.

2.5 Запускается 3-секундная задержка до запуска третьей нити.

  1. Третья нить является производителем и только что создает текстовые сообщения для потребления первого потока. Он производит сообщение, спит

public static void main(String[] args) throws IOException 
{ 
    PipedInputStream pis = new PipedInputStream(); 
    PipedOutputStream pos = new PipedOutputStream(pis); 
    Scanner scan = new Scanner(pis); 
    PrintStream ps = new PrintStream(pos); 

    new Thread() 
    { 
     public void run() 
     { 
      int x = 0; 
      while (true) 
      { 
       x++; 
       if (scan.hasNextLine()) 
       { 
        System.out.println("pulled: " + scan.nextLine()); 
       } else 
       { 
        if (x % 100 == 0) 
        { 
         System.out.println("no data to pull"); 
        } 
       } 
       try 
       { 
        sleep(10); 
       } catch (InterruptedException ex) { } 
      } 
     } 
    }.start(); 

    new Thread() 
    { 
     public void run() 
     { 
     } 
    }.start(); 

    try 
    { 
     sleep(3000); 
    } catch (InterruptedException ex) { } 

    new Thread() 
    { 
     public void run() 
     { 
      int x = 0; 
      while (true) 
      { 
       x++; 
       ps.println("hello: " + x); 
       try 
       { 
        sleep(1000); 
       } catch (InterruptedException ex) {} 
      } 
     } 
    }.start(); 
} 

Выход (как я ожидал):

pulled: hello: 1 
pulled: hello: 2 
pulled: hello: 3 
pulled: hello: 4 
pulled: hello: 5 
pulled: hello: 6 

обратите внимание также, что scan.nextLine() блокирует (поскольку у нас нет сообщения, указывающие, что данные не доступны ... данные всегда «доступны», даже если они «находятся в пути»).

Теперь, если я заменю тело 2 нити с некоторым кодом, который производит некоторый текст для первого потока, чтобы потреблять:

new Thread() 
{ 
    public void run() 
    { 
     ps.println("Interfere"); 
    } 
}.start(); 

Тогда я начинаю, чтобы вызвать пункт не данных первого потока:

pulled: Interfere 
no data to pull 
no data to pull 
no data to pull 
no data to pull 
no data to pull 
no data to pull 
no data to pull 
no data to pull 

Таким образом, если второй поток начинает использовать объект PrintStream для получения сообщения, что-то пойдет не так в трубе и потребитель нить перестает быть в состоянии найти сообщения на другом конце.

И теперь все становится страннее. Если предотвратить вторую нить от завершения, скажем, бросая его в очень длинный цикл, то труба не получает клейким:

new Thread() 
{ 
    public void run() 
    { 
     ps.println("interfere"); 
     for (long i = 0; i < 10000000000L; i++); 
     System.out.println("done interfering"); 
    } 
}.start(); 

Выход:

pulled: interfere 
pulled: hello: 1 
pulled: hello: 2 
done interfering 
pulled: hello: 3 
pulled: hello: 4 
pulled: hello: 5 
pulled: hello: 6 

Так что я думаю, что если второй поток прекращается до того, как начнется третий поток, тогда первый поток никогда не получит сообщений от третьего потока. Однако, если второй поток удается висеть до тех пор, пока третий поток не начнет производиться, все пройдет так, как ожидалось.

Что здесь происходит? Является ли вторая нить закрывающей трубу/поток (или выполнение какого-либо другого действия на трубе/потоке), когда она завершается? Если да, то почему? И почему это не кажется закрытым (или выполняющим какое-либо действие) на трубе/потоке, если третий поток начинает использовать поток/поток до того, как закончится второй поток?Есть ли способ сделать этот код «работать», как ожидалось (то есть, чтобы первый поток потреблял все, что создается ни одним из обоих потоков производителей), когда второй поток создает сообщения и заканчивается до начала третьего потока?

Фон: Это уплотнение для важных компонентов системы, в которой несколько клиентов будут потреблять сообщения из одного потока производителей. Однако нить производителя не может быть запущена до тех пор, пока все потоки клиентов не сообщают, что они готовы. Для каждого потока клиентов есть еще один поток, который запрашивает, готовы ли они. Как только все потоки клиентов сигнализируют, что они готовы, запускается поток производителя. Я пытаюсь связать потоки через потоки, чтобы позже я мог распространять их на нескольких компьютерах и настраивать каналы с помощью сокетов с минимальным количеством изменений в базовом коде. Не стесняйтесь предлагать альтернативную стратегию решения здесь, но я хотел бы понять, почему решение выше не работает.

+0

Ваши сны бессмысленны. 'hasNextLine()' будет блокироваться, если уже нет строки или конца потока. – EJP

+0

Прошло некоторое время с тех пор, как я просмотрел код, но я думаю, что 'PipedInput/OutputStream' может быть явно написан для использования двумя потоками. Вы считали, что вместо этого используете параллельную очередь блокировки? – erickson

ответ

1

Вашего Scanner экземпляр ударяя исключение в своем методе readInput, который устанавливает его sourceClosed поля для true и предотвращает вас от чтения. Если вы заинтересованы в том, где это происходит на самом деле:

private void readInput() { 
    ... 

    int n = 0; 
    try { 
     n = source.read(buf); 
    } catch (IOException ioe) { 
     lastException = ioe; 
     n = -1; 
    } 

    if (n == -1) { 
     sourceClosed = true; 
     needInput = false; 
    } 

    ... 
} 

Такое поведение не неправильно, вам нужно устранить причину исключения. Вопрос здесь: java.io.IOException: Write end dead. Есть куча ответов и сообщений в блогах, которые могут помочь вам решить эту проблему лучше, чем я могу. Также взгляните на соответствующую тему «Прочитать конец мертвой». Проверьте:

+0

Кстати, вы можете воспроизвести исключение самостоятельно, используя BufferedReader вместо Scanner. Используйте 'BufferedReader bufferedReader = new BufferedReader (новый InputStreamReader (pis));' и 'if ((line = bufferedReader.readLine())! = Null) {' везде, где вы используете сканер. –

0

Связанные потоковые объекты не являются потокобезопасными, поэтому поведение при обращении к ним из разных потоков без синхронизации является непредсказуемым. Я бы предположил, что в этом конкретном случае разница в поведении связана с тем, что на самом деле портится в основную память, но это всего лишь предположение.

Чтобы получить предсказуемое поведение, вам необходимо выполнить правильную синхронизацию между вашими потоками.