2013-04-23 1 views
2

Я хочу перебрать некоторые асинхронные результаты с параллельной карты ipython по мере их поступления. Единственный способ, который я могу найти для этого, - перебрать объект результатов. Однако, если одна из задач вызывает исключение, итерация завершается. Есть какой-либо способ сделать это? См. Следующий код, итерация завершается, когда второе задание вызывает исключение.Исключения для обработки, ожидая следующего результата параллельной карты ipython

from IPython import parallel 

def throw_even(i): 
    if i % 2 == 0: 
     raise RuntimeError('ERROR: %d' % i) 
    return i 

rc = parallel.Client() 
lview = rc.load_balanced_view() # default load-balanced view 

# map onto the engines. 
args = range(1, 5) 
print args 
async_results = lview.map_async(throw_even, range(1, 5), ordered=True) 

# get results 
args_iter = iter(args) 
results_iter = iter(async_results) 
while True: 
    try: 
     arg = args_iter.next() 
     result = results_iter.next() 
     print 'Job %s completed: %d' % (arg, result)    
    except StopIteration: 
     print 'Finished iteration' 
     break 
    except Exception as e: 
     print '%s: Job %d: %s' % (type(e), arg, e) 

дает следующий результат, который останавливается перед тем заданиями 3 и 4, как сообщает

[1, 2, 3, 4] 
Job 1 completed: 1 
<class 'IPython.parallel.error.RemoteError'>: Job 2: RuntimeError(ERROR: 2) 
Finished iteration 

Есть ли способ сделать это?

+0

Я понял, что идиома карты не является подходящим способом для этого. Мне лучше просто использовать lview.apply и обрабатывать каждый результат индивидуально. – John

ответ

0

Возможно, это question. Я действительно не понимаю, почему вы хотите выбросить исключение из удаленного механизма. Хотя, если вы действительно хотите это сделать, я думаю, вы можете сделать это так же, как я ответил на упомянутый вопрос. Который я теперь вижу, что вы уже поняли в своих комментариях, но это все равно должно это сделать.

def throw_even(i): 
    if i%2: 
     return i 
    raise(RuntimeError('Error %d'%i) 

params = range(1,5) 

n_cores = len(c.ids) 
for n,p in enumerate(params): 
    core = c.ids[n%n_cores] 
    calls.append(c[core].apply_async(throw_even, p)) 

#then you get the results 

while calls != []: 
    for c in calls: 
     try: 
      result = c.get(1e-3) 
      print(result[0]) 
      calls.remove(c) 
      #in the case your call failed, you can apply_async again. 
      # and append the call to calls. 
     except parallel.TimeoutError: 
      pass 
     except Exception as e: 
      knock_yourself_out(e) 
+1

Не обязательно, чтобы вы _want_ поднимали исключения на удаленном движке, это то, что ваш код/​​данные находят новые и интересные способы взломать удаленные механизмы;), и это очень раздражает, когда вы не можете получить более 500 результатов потому что у 7 из них были жуткие данные. – tacaswell

+0

Несомненно, поэтому создание другого представления для каждого параметра должно содержать исключение, заключенное в капсулу. –

0

Подлый способ обойти это, чтобы достичь во внутреннюю часть AsyncMapResult и захватить _result который представляет собой список результатов. Это не поможет вам напрямую, но только после того, как:

tt = async_results._results 
fail_indx = [j for j, r in enumerate(tt) if isinstance(r, IPython.parallel.error.RemoteError)] 
good_indx = [j for j, r in enumerate(tt) if not isinstance(r, IPython.parallel.error.RemoteError)] 

just_the_results = [r for r in tt if not isinstance(r, IPython.parallel.error.RemoteError)]