1

Я хотел бы работать в общей сложности nРезультаты исследования = 25 моделей Abaqus, каждый из которых использует X количество ядер, и я могу работать одновременно nParallelLoops = 5 этих моделей. Если один из текущих 5 анализов заканчивается, тогда еще один анализ должен начаться до тех пор, пока все nAnalysis завершены.Использование Concurrent.Futures.ProcessPoolExecutor запустить одновременно и независимцы модели ABAQUS

Я реализовал код ниже на основе решений, размещенных в и . Тем не менее, я что-то упустил, потому что все nAnalysis попытаться начать с «один раз», кодовые взаимоблокировки и анализ никогда не завершатся, так как многие из них могут захотеть использовать те же ядра, что и используемый начальный анализ.

  1. Using Python's Multiprocessing module to execute simultaneous and separate SEAWAT/MODFLOW model runs
  2. How to parallelize this nested loop in Python that calls Abaqus
def runABQfile(*args):  
    import subprocess 
    import os 

    inpFile,path,jobVars = args 

    prcStr1 = (path+'/runJob.sh') 

    process = subprocess.check_call(prcStr1, stdin=None, stdout=None, stderr=None, shell=True, cwd=path) 

def safeABQrun(*args): 
    import os 

    try: 
     runABQfile(*args) 
    except Exception as e: 
     print("Tread Error: %s runABQfile(*%r)" % (e, args)) 

def errFunction(ppos, *args): 
    import os 
    from concurrent.futures import ProcessPoolExecutor 
    from concurrent.futures import as_completed 
    from concurrent.futures import wait 

    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,nAnalysis)) # 5Nodes 
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

Единственный способ до сих пор я могу работать, что если я изменить errFunction использовать именно 5 анализа в то время, как показано ниже. Однако этот подход иногда приводит к тому, что один из анализов занимает гораздо больше времени, чем остальные 4 в каждой группе (каждый звонок ProcessPoolExecutor), и поэтому следующая группа из 5 не будет запускаться, несмотря на доступность ресурсов (ядер). В конечном итоге это дает больше времени для завершения всех 25 моделей.

def errFunction(ppos, *args): 
    import os 
    from concurrent.futures import ProcessPoolExecutor 
    from concurrent.futures import as_completed 
    from concurrent.futures import wait  

    # Group 1 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(0,5)) # 5Nodes   
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

    # Group 2 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(5,10)) # 5Nodes   
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

    # Group 3 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(10,15)) # 5Nodes   
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

    # Group 4 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(15,20)) # 5Nodes   
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

    # Group 5 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     future_to_file = dict((executor.submit(safeABQrun, inpFiles[k], aPath[k], jobVars), k) for k in range(20,25)) # 5Nodes   
     wait(future_to_file,timeout=None,return_when='ALL_COMPLETED') 

Я попытался с помощью функции as_completed, но, кажется, не работает.

Пожалуйста, вы можете помочь выяснить правильное распараллеливание, так что я могу запустить nРезультатов исследования, с всегда nParallelLoops работает одновременно? Ваша помощь приветствуется. Я использую Python 2.7

BESTS, David P.


UPDATE Июль 30/2016:

Я ввел петлю в safeABQrun и управлял 5 различных «очередей ». Цикл необходим, чтобы избежать случая анализа, пытающегося запустить в узле, пока еще один работает. Анализ предварительно настроен для запуска в одном из запрошенных узлов перед началом любого фактического анализа.

def safeABQrun(*list_args): 
    import os 

    inpFiles,paths,jobVars = list_args 

    nA = len(inpFiles) 
    for k in range(0,nA): 
     args = (inpFiles[k],paths[k],jobVars[k]) 
     try: 
      runABQfile(*args) # Actual Run Function 
     except Exception as e: 
      print("Tread Error: %s runABQfile(*%r)" % (e, args)) 

def errFunction(ppos, *args): 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes 

     for f in as_completed(futures): 
      print("|=== Finish Process Train %d ===|" % futures[f]) 
      if f.exception() is not None: 
       print('%r generated an exception: %s' % (futures[f], f.exception())) 

ответ

0

Это выглядит хорошо для меня, но я не могу запустить ваш код как есть. Как насчет того, чтобы попробовать что-то намного проще, затем добавить вещи к нему, пока не появится «проблема»? Например, показывает ли следующее поведение, какое вы хотите? Он работает на моей машине, но я запускаю Python 3.5.2. Вы говорите, что у вас 2,7, но concurrent.futures не существует в Python 2 - так что если вы используете 2.7, вы должны запускать чей-то бэкпорт библиотеки, и, возможно, проблема в этом. Попытка следующее должно помочь ответить на этот вопрос, является ли случай:

from concurrent.futures import ProcessPoolExecutor, wait, as_completed 

def worker(i): 
    from time import sleep 
    from random import randrange 
    s = randrange(1, 10) 
    print("%d started and sleeping for %d" % (i, s)) 
    sleep(s) 

if __name__ == "__main__": 
    nAnalysis = 25 
    nParallelLoops = 5 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     futures = dict((executor.submit(worker, k), k) for k in range(nAnalysis)) 
     for f in as_completed(futures): 
      print("got %d" % futures[f]) 

Типичный выход:

0 started and sleeping for 4 
1 started and sleeping for 1 
2 started and sleeping for 1 
3 started and sleeping for 6 
4 started and sleeping for 5 
5 started and sleeping for 9 
got 1 
6 started and sleeping for 5 
got 2 
7 started and sleeping for 6 
got 0 
8 started and sleeping for 6 
got 4 
9 started and sleeping for 8 
got 6 
10 started and sleeping for 9 
got 3 
11 started and sleeping for 6 
got 7 
12 started and sleeping for 9 
got 5 
... 
0

Я ввел петлю в safeABQrun и управлял 5 различных «очереди». Цикл необходим, чтобы избежать случая анализа, пытающегося запустить в узле, пока еще один работает. Анализ предварительно настроен для запуска в одном из запрошенных узлов перед началом любого фактического анализа.

def safeABQrun(*list_args): 
    import os 

    inpFiles,paths,jobVars = list_args 

    nA = len(inpFiles) 
    for k in range(0,nA): 
     args = (inpFiles[k],paths[k],jobVars[k]) 
     try: 
      runABQfile(*args) # Actual Run Function 
     except Exception as e: 
      print("Tread Error: %s runABQfile(*%r)" % (e, args)) 

def errFunction(ppos, *args): 
    with ProcessPoolExecutor(max_workers=nParallelLoops) as executor: 
     futures = dict((executor.submit(safeABQrun, inpF, aPth, jVrs), k) for inpF, aPth, jVrs, k in list_args) # 5Nodes 

     for f in as_completed(futures): 
      print("|=== Finish Process Train %d ===|" % futures[f]) 
      if f.exception() is not None: 
       print('%r generated an exception: %s' % (futures[f], f.exception()))