2017-01-25 7 views
2

У меня есть проблема с остановкой параллели для каждого цикла.stop Параллельный.ForEach немедленно

Я повторяю набор из около 40.000 DataRows, извлеченных из таблицы, и мне нужно немедленно остановить цикл, когда у меня есть 100 элементов в моем наборе результатов. Проблема в том, что когда я запускаю метод Stop на ParallelLoopState, итерация не останавливается немедленно, вызывая несогласованность в моем наборе результатов (либо до нескольких, либо ко многим элементам).

Нет ли способа убедиться, что я убью все потоки, как только я нажму стоп?

List<DataRow> rows = new List<DataRow>(dataTable.Select()); 
    ConcurrentDictionary<string, object> resultSet = new ConcurrentDictionary<string, object>(); 

    rows.EachParallel(delegate (DataRow row, ParallelLoopState state) 
    { 
    if (!state.IsStopped) 
    { 
     using (SqlConnection sqlConnection = new SqlConnection(Global.ConnStr)) 
     { 
     sqlConnection.Open(); 

     //{ 
     // Do some processing....... 
     //}  

     var sourceKey = "key retrieved from processing"; 
     if (!resultSet.ContainsKey(sourceKey)) 
     { 
      object myCustomObj = new object(); 

      resultSet.AddOrUpdate(
      sourceKey, 
      myCustomObj, 
      (key, oldValue) => myCustomObj); 
     } 

     if (resultSet.Values.Count == 100) 
      state.Stop(); 
     } 
    } 
    }); 
+1

* Почему вы пытаетесь выполнить SQL-команды параллельно? Это не приведет к тому, что неудачный запрос будет выполняться быстрее. Какова ваша * актуальная проблема? –

+0

использование блокировки в методе –

+4

BTW 40K строк нет данных вообще. Загрузка всех 40K из них, когда вам нужно всего лишь 100, является ошибкой. Почему бы вам просто не использовать «SELECT TOP 100»? –

ответ

4

Документация страница ParallelLoopState.Stop объясняет, что вызов Stop() будет предотвратить новые итерации от начала. Он не отменяет никаких существующих итераций.

Stop() также устанавливает IsStopped на true. Длинные итерации могут проверять значение IsStopped и, если необходимо, выйти преждевременно.

Это называется совместное аннулирование, которое намного лучше, чем прерывание потоков. Прерывание потока дорогое и затрудняет очистку. Представьте, что произойдет, если исключение ThreadAbort было брошено только тогда, когда вы хотели совершить свою работу.

Cooperative отмена с другой стороны, позволяет задачу выйти грациозно после или прерывания совершала сделки по мере необходимости, закрытие соединения, уборки другого состояния и файлов и т.д.

Кроме того, Parallel использует задачи, а не потоки, для обработки куски данных. Одним из этих потоков является исходный поток, который запустил параллельную операцию. Отмена не будет просто тратить резьбу нити, она также убьет главную нить.

Это не ошибка также - Parallel предназначена для решения проблем параллелизма данных, а не для асинхронного выполнения. В этом случае требуется, чтобы система использовала как можно больше задач для обработки данных и продолжала работу после завершения этой обработки.

+0

«Это называется совместной отменой, которая намного лучше, чем прерывание потоков» - конечно ... но это не всегда делает то же самое. Если тело цикла состоит в основном из одного вызова, который займет много времени (скажем, для загрузки большого файла), ваш токен бесполезен. Если я прекращу, я получу то, что хочу. –

+0

@EdS. что смешивает параллельную обработку с асинхронным IO. Помимо асинхронных методов ввода-вывода * do * поддерживают маркеры отмены и отмены, где это имеет смысл. Вы не можете остановить HTTP-сервер от * отправки * ваших данных, вы можете перестать ждать его прибытия. Вы можете прекратить чтение из потока ответов. Вы можете использовать токен отмены, чтобы очистить соединение после прерывания. Вы не можете ничего сделать с помощью Thread.Abort –

+0

@EdS. кроме того, использование * потока * для блокировки IO было бы расточительным, даже если у вас не было задач. Win32 предлагает обратные вызовы, порты завершения для async IO и события, ожидающие их. –