2

Я запускаю цикл Parallel for, который изначально запускается на время = количество процессоров и выполняет длительную операцию. Каждая задача, когда закончена, проверяет больше задач и, если она найдена, снова вызывает себя.Непредсказуемые результаты при использовании Parallel.For

Вот как мой код выглядит следующим образом:

static void Main(string[] args) 
{ 
    Int32 numberOfProcessors = Environment.ProcessorCount; 

    Parallel.For(0, numberOfProcessors, index => DoSomething(index, sqsQueueURL)); 

} 

private async static Task DoSomething(int index, string queueURL) 
{ 
    var receiveMessageRequest = new ReceiveMessageRequest { QueueUrl = queueURL, WaitTimeSeconds = 20, MaxNumberOfMessages = 1, VisibilityTimeout = 1200 }; 

    AmazonSQSClient sqsClient = new AmazonSQSClient(new AmazonSQSConfig { MaxErrorRetry = 4 }); 

    var receiveMessageResponse = sqsClient.ReceiveMessage(receiveMessageRequest); 

    foreach (var msg in receiveMessageResponse.Messages) 
    { 
     PerformALongRunningTask...... 

     //delete the message 

     DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueURL, msg.ReceiptHandle); 

     AmazonSQSClient sqsDeleteClient = new AmazonSQSClient(); 

     sqsDeleteClient.DeleteMessage(deleteMessageRequest); 

     //Do it again 
     DoSometing(index,queueURL) 

    } 
} 

Я получаю очень непредсказуемые результаты. Он никогда не завершает все задачи. Он выходит, прежде чем завершить все.

Что я здесь делаю неправильно?

Более короткий код:

static Int32 TimesToLoop = 143; 
static void Main(string[] args) 
{ 

    Int32 numberOfProcessors = Environment.ProcessorCount; 

    Parallel.For(0, numberOfProcessors, index => DoSomething(index)); 

    Console.Read(); 
} 

private async static Task DoSomething(int index) 
{ 
    if(TimesToLoop == 0) 
    { 
     return; 
    } 
    Console.WriteLine(index); 
    Interlocked.Decrement(ref TimesToLoop); 
    DoSomething(index++); 
    return; 

} 
+1

Первое, что нужно сделать: уменьшить это до краткого, но полного примера, который вообще не включает веб-службы Amazon. Проблема заключается в том, что вы используете 'Parallel.For'. Создав свой метод, просто напечатайте «start» и индекс, затем ожидайте «Thread.Delay», затем распечатайте «окончание», и индекс продемонстрирует ту же проблему, с меньшим количеством зависимостей и меньшим количеством кода. –

+0

Действительно ли ваш реальный код содержит выражения 'await', кстати? Если это не так, каждый вызов вашего метода «DoSomething» будет выполняться синхронно ... –

+0

Нет. Не знаю, как использовать ожидание в Parallel.For – Asdfg

ответ

1
private async static Task DoSomething(int index, string queueURL) 
{ 
    ... 
    foreach (var msg in receiveMessageResponse.Messages) 
    { 
     ... 
     //Do it again 
     DoSometing(index,queueURL) 

    } 
} 

Вы звоните DoSomething рекурсивно, и нет никаких условий, чтобы сломать/вернуться из него. Это может привести к stackoverflow и завершить работу вашей программы.

+0

это так. просто заметили, что – Asdfg

+1

@carbinecoder есть условие, чтобы сломать/вернуть право? когда нет сообщений в receiveMessageResponse.Messages –

+0

Правда, я пропустил это, я согласен с вами, опрос должен выполняться до получения 'receiveMessageResponse.Messages', но он не должен быть рекурсивным. – CarbineCoder

3

я вижу различные проблемы в данный момент:

  • Parallel.For только начиная задач. Он не будет ждать их завершения. Он будет ожидать возврата вызовов DoSomething, но они возвращают задачи, представляющие асинхронные операции, которые, вероятно, не будут выполняться синхронно.
  • Как отметил CarbineCoder, ваша рекурсия почти наверняка испорчена. Непонятно, чего вы пытаетесь достичь, но вам нужно переосмыслить этот аспект.
  • В вашей рекурсии нет await задач, возвращаемых в любом случае - это почти наверняка должно быть. Это может хочу создать коллекцию всех задач, созданных в цикле foreach, и ждать их всех за один раз, или же он может сразу await. Мы не можем сказать.

Самый простой способ фиксации первой части, вероятно, использовать Task.WaitAll вместо Parallel.For:

var tasks = Enumerable.Range(0, numberOfProcessors) 
         .Select(index => DoSomething(index, sqsQueueURL)) 
         .ToList(); 
Task.WaitAll(tasks); 

В отличие от Task.WhenAll, Task.WaitAll будет блокировать до тех пор, все указанные задачи не будут выполнены. Обратите внимание, что это не безопасно делать, если какая-либо из задач должна быть продолжена в потоке, вызывающем WaitAll, именно потому, что он блокирует, но если это консольное приложение, и вы вызываете это из начального потока, вы будете быть в порядке, поскольку продолжения будут выполняться в пуле потоков в любом случае.

+0

То, что я пытаюсь сделать, - это как только одна задача закончится, она должна начать новую, так как будет много задач. Я пытаюсь ограничить число задач равными числу процессоров. Когда больше не осталось сообщений для обработки, он должен завершиться. – Asdfg

+0

@Asdfg: Я предлагаю вам этого не делать. Вы звоните в веб-службы - вы все равно не будете ограничены ЦП. Даже если у вас только один процессор, вы можете одновременно отправить 100 запросов на веб-службу в полете. У вас уже есть * рабочий * код? Прежде чем вы начнете пытаться точно настроить это, сделайте его как можно более простым., –

+0

Правда, но я запускаю несколько узлов, поэтому я не хочу блокировать сообщение, если он будет ждать завершения других задач. – Asdfg