В этом сегменте кода я создаю трубку и прикрепляю сканер на одном конце и PrintStream на другом, чтобы связываться между несколькими потоками потребителей и производителей. Затем я создаю и запускаю три потока:Java: два потока, сообщающихся через потоки, - компания, три - толпа
Первый поток - потребительский поток. Он проверяет Сканер, чтобы увидеть, доступна ли строка текста для потребления, потребляет, печатает на стандартный вывод, а затем спит в течение нескольких миллисекунд, а затем повторяет. Если вам нечего потреблять, тогда он печатает сообщение об этом, спит и повторяет.
Вторая нить в этом сегменте кода ничего не делает. Подробнее об этом ниже.
2.5 Запускается 3-секундная задержка до запуска третьей нити.
- Третья нить является производителем и только что создает текстовые сообщения для потребления первого потока. Он производит сообщение, спит
public static void main(String[] args) throws IOException
{
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
Scanner scan = new Scanner(pis);
PrintStream ps = new PrintStream(pos);
new Thread()
{
public void run()
{
int x = 0;
while (true)
{
x++;
if (scan.hasNextLine())
{
System.out.println("pulled: " + scan.nextLine());
} else
{
if (x % 100 == 0)
{
System.out.println("no data to pull");
}
}
try
{
sleep(10);
} catch (InterruptedException ex) { }
}
}
}.start();
new Thread()
{
public void run()
{
}
}.start();
try
{
sleep(3000);
} catch (InterruptedException ex) { }
new Thread()
{
public void run()
{
int x = 0;
while (true)
{
x++;
ps.println("hello: " + x);
try
{
sleep(1000);
} catch (InterruptedException ex) {}
}
}
}.start();
}
Выход (как я ожидал):
pulled: hello: 1
pulled: hello: 2
pulled: hello: 3
pulled: hello: 4
pulled: hello: 5
pulled: hello: 6
обратите внимание также, что scan.nextLine() блокирует (поскольку у нас нет сообщения, указывающие, что данные не доступны ... данные всегда «доступны», даже если они «находятся в пути»).
Теперь, если я заменю тело 2 нити с некоторым кодом, который производит некоторый текст для первого потока, чтобы потреблять:
new Thread()
{
public void run()
{
ps.println("Interfere");
}
}.start();
Тогда я начинаю, чтобы вызвать пункт не данных первого потока:
pulled: Interfere
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
Таким образом, если второй поток начинает использовать объект PrintStream для получения сообщения, что-то пойдет не так в трубе и потребитель нить перестает быть в состоянии найти сообщения на другом конце.
И теперь все становится страннее. Если предотвратить вторую нить от завершения, скажем, бросая его в очень длинный цикл, то труба не получает клейким:
new Thread()
{
public void run()
{
ps.println("interfere");
for (long i = 0; i < 10000000000L; i++);
System.out.println("done interfering");
}
}.start();
Выход:
pulled: interfere
pulled: hello: 1
pulled: hello: 2
done interfering
pulled: hello: 3
pulled: hello: 4
pulled: hello: 5
pulled: hello: 6
Так что я думаю, что если второй поток прекращается до того, как начнется третий поток, тогда первый поток никогда не получит сообщений от третьего потока. Однако, если второй поток удается висеть до тех пор, пока третий поток не начнет производиться, все пройдет так, как ожидалось.
Что здесь происходит? Является ли вторая нить закрывающей трубу/поток (или выполнение какого-либо другого действия на трубе/потоке), когда она завершается? Если да, то почему? И почему это не кажется закрытым (или выполняющим какое-либо действие) на трубе/потоке, если третий поток начинает использовать поток/поток до того, как закончится второй поток?Есть ли способ сделать этот код «работать», как ожидалось (то есть, чтобы первый поток потреблял все, что создается ни одним из обоих потоков производителей), когда второй поток создает сообщения и заканчивается до начала третьего потока?
Фон: Это уплотнение для важных компонентов системы, в которой несколько клиентов будут потреблять сообщения из одного потока производителей. Однако нить производителя не может быть запущена до тех пор, пока все потоки клиентов не сообщают, что они готовы. Для каждого потока клиентов есть еще один поток, который запрашивает, готовы ли они. Как только все потоки клиентов сигнализируют, что они готовы, запускается поток производителя. Я пытаюсь связать потоки через потоки, чтобы позже я мог распространять их на нескольких компьютерах и настраивать каналы с помощью сокетов с минимальным количеством изменений в базовом коде. Не стесняйтесь предлагать альтернативную стратегию решения здесь, но я хотел бы понять, почему решение выше не работает.
Ваши сны бессмысленны. 'hasNextLine()' будет блокироваться, если уже нет строки или конца потока. – EJP
Прошло некоторое время с тех пор, как я просмотрел код, но я думаю, что 'PipedInput/OutputStream' может быть явно написан для использования двумя потоками. Вы считали, что вместо этого используете параллельную очередь блокировки? – erickson