0

У меня есть блок кода, который обрабатывает StoreProducts, а затем добавляет или обновляет их в базе данных в a для каждого цикла. Но это медленно. Когда я конвертирую блок Parallel.ForEach в код, то те же самые продукты одновременно добавляются и обновляются. Я не мог понять, как безопасно использовать для следующих функций, любая помощь будет оценена по достоинству.Parallel.ForEach Список источников с Где Условие

var validProducts = storeProducts.Where(p => p.Price2 > 0 
                && !string.IsNullOrEmpty(p.ProductAtt08Desc.Trim()) 
                && !string.IsNullOrEmpty(p.Barcode.Trim()) 
      ).ToList(); 

var processedProductCodes = new List<string>(); 

var po = new ParallelOptions() 
     { 
      MaxDegreeOfParallelism = 4 
     }; 

Parallel.ForEach(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)), po, 
      (product) => 
{ 
      lock (_lockThis) 
      { 
       processedProductCodes.Add(product.ProductCode); 
      } 

    // Check if Product Exists in Db 

    // if product is not in Db Add to Db 

    // if product is in Db Update product in Db 

} 

Дело здесь есть, список validProducts может иметь более чем одну такую ​​же ProductCode, поэтому они являются вариантами, и я должен управлять, что даже один из них находится в процессе обработки он не должен быть обработан снова.

Так где условие, которое находится в параллельном Еогеасп «validProducts.Where (р =>! ProcessedProductCodes.Contains (p.ProductCode)» не работает, как ожидалось, как в обычном для каждого.

ответ

0

Parallel.ForEach буферные элементы внутри для каждого потока, один вариант, вы можете сделать, это перейти к секционирования, который не использует буферизацию

var pat = Partitioner.Create(validProducts.Where(p => !processedProductCodes.Contains(p.ProductCode)) 
          ,EnumerablePartitionerOptions.NoBuffering); 

Parallel.ForEach(pat, po, (product) => ... 

это поможет вам ближе, но вы все еще есть гонки условия, в которых два из того же объекта могут быть обработаны, потому что вы не выходите из цикла, если найдете дубликат.

Лучшим вариантом является переключатель processedProductCodes в HashSet<string> и изменить свой код

var processedProductCodes = new HashSet<string>(); 

var po = new ParallelOptions() 
     { 
      MaxDegreeOfParallelism = 4 
     }; 

Parallel.ForEach(validProducts, po, 
      (product) => 
{ 
      //You can safely lock on processedProductCodes 
      lock (processedProductCodes) 
      { 
       if(!processedProductCodes.Add(product.ProductCode)) 
       { 
        //Add returns false if the code is already in the collection. 
        return; 
       } 
      } 

    // Check if Product Exists in Db 

    // if product is not in Db Add to Db 

    // if product is in Db Update product in Db 

} 

HashSet имеет гораздо более быстрый поиск и построен в функции Add.

+1

Вы также можете захотеть выполнить запрос «вставить или обновить» вместо проверки своего списка в памяти. В базах данных, которые не поддерживают это напрямую, вы можете обычно выполнять обновление, проверять # затронутых записей и вставлять, если 0 всего в одном запросе. Но я был бы очень обеспокоен блокировкой/блокировкой БД во время этого процесса (см. Мой ответ выше), особенно если выполняются другие рабочие нагрузки. –

+0

@Scott Chamberlain благодарит вас за ваше время и ответ, я использовал разделитель, hashset и return (как вы указали, и я думаю, что это сделал трюк) все прошло гладко. – Buyukcaglar

1

Основная часть моего ответа меньше, поэтому ответ на ваш вопрос и другие рекомендации - если бы вы предоставили несколько технических подробностей, я, возможно, помогу более точно.

A Parallel.ForEach, вероятно, не лучшее решение здесь - особенно если у вас есть общий список или занятый сервер.

Вы блокируете запись, но не читаете из этого общего списка. Поэтому я удивлен, что он не метался во время Where. Поверните List<string> в ConcurrentDictionary<string, bool> (просто для создания простой параллельной хеш-таблицы), тогда вы получите лучшую пропускную способность записи и не будете бросать во время чтения.

Но у вас будет проблемы с контентом базы данных (при использовании нескольких соединений), потому что для вашей вставки, вероятно, по-прежнему потребуются блокировки. Даже если вы просто разделите рабочую нагрузку, вы столкнетесь с этим. Эта блокировка БД может привести к блокировкам/взаимоблокировкам, чтобы они могли оказаться медленнее, чем оригинал. Если вы используете одно соединение, вы обычно не можете распараллелить команды.

Я хотел бы попробовать обертывание большинство вставок в операции, содержащей партии, скажем, 1000 вставок или поместить всю нагрузку в одну массовой вставки. Затем база данных будет хранить данные в памяти и завершать всю вещь на диске по завершении (вместо одной записи за раз).

В зависимости от вашей типичной рабочей нагрузки вы можете попробовать различные решения для хранения. Базы данных обычно плохо для вставки больших объемов записей ... вы, вероятно, увидите гораздо лучшую производительность с альтернативными решениями (такими как хранилища ключей).Или поместите данные в нечто вроде Redis и медленно сохраняйте базу данных в фоновом режиме.

+0

благодарим вас за ваш драгоценный вклад. так как я не изменяю список источников, но список, который я сравниваю, не происходит никаких изменений, связанных с изменением коллекции. С другой стороны, я не могу обновить db на массовых средствах, как вы предполагали, поскольку я использую структуру сущностей с модулем efcache, поэтому внешние транзакции ef не обсуждаются. – Buyukcaglar

+0

В этом случае я рекомендую просмотреть https://efbulkinsert.codeplex.com/ –

 Смежные вопросы

  • Нет связанных вопросов^_^