2014-11-11 3 views
1

У меня есть ситуация, когда мне нужно использовать собственный планировщик для запуска задач (это должны быть задачи), и планировщик не устанавливает контекст синхронизации (поэтому нет ObserveOn, SubscribeOn, SynchronizationContextScheduler и т. Д. Я собираюсь). Вот как я это сделал. Теперь, мне интересно, я не уверен, что это самый подходящий способ делать асинхронные вызовы и ждать их результатов. Это все в порядке или есть более надежный или идиоматический способ?Является ли вообще сомнительным вызов Task.Factory.StartNew (async() => {}) в Subscribe?

var orleansScheduler = TaskScheduler.Current; 
var someObservable = ...; 
someObservable.Subscribe(i => 
{ 
    Task.Factory.StartNew(async() => 
    { 
     return await AsynchronousOperation(i); 
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler);   
}); 

Что делать, если ожидание не понадобится?

< редактировать: Я нашел бетон и упрощенный пример того, что я делаю here. В основном я использую Rx в Орлеане, и приведенный выше код - это голые иллюстрации того, что я делаю. Хотя меня тоже интересует эта ситуация в целом.

Окончательный код Получается, что это было немного сложно в контексте Орлеана. Я не вижу, как я мог бы использовать ObserveOn, что было бы просто тем, что я хотел бы использовать. Проблема в том, что при использовании этого параметра Subscribe никогда не будет вызван. Код:

var orleansScheduler = TaskScheduler.Current; 
var factory = new TaskFactory(orleansScheduler); 
var rxScheduler = new TaskPoolScheduler(factory); 
var someObservable = ...; 
someObservable 
//.ObserveOn(rxScheduler) This doesn't look like useful since... 
.SelectMany(i => 
{ 
    //... we need to set the custom scheduler here explicitly anyway. 
    //See Async SelectMany at http://log.paulbetts.org/rx-and-await-some-notes/. 
    //Doing the "shorthand" form of .SelectMany(async... would call Task.Run, which 
    //in turn runs always on .NET ThreadPool and not on Orleans scheduler and hence 
    //the following .Subscribe wouldn't be called. 
    return Task.Factory.StartNew(async() => 
    { 
     //In reality this is an asynchronous grain call. Doing the "shorthand way" 
     //(and optionally using ObserveOn) would get the grain called, but not the 
     //following .Subscribe. 
     return await AsynchronousOperation(i); 
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler).Unwrap().ToObservable(); 
}) 
.Subscribe(i => 
{ 
    Trace.WriteLine(i); 
}); 

Кроме того, ссылка на соответствующую резьбу в Codeplex Orleans forums.

+2

'ожидание в порядке не понадобится'. Не могли бы вы пояснить, что это значит? –

+3

Если операция асинхронна, почему вы используете для нее поток threadpool? –

+0

Я удалил упоминание «в порядке» и очистил некоторые другие моменты. – Veksi

ответ

4

Я настоятельно рекомендую против StartNew для любого современного кода. У него есть прецедент, но это очень редко.

Если вам нужно использовать персонализированный планировщик задач, я рекомендую использовать ObserveOn с TaskPoolScheduler, построенным из обертки вокруг вашего планировщика. Это полный рот, так вот общая идея:

var factory = new TaskFactory(customScheduler); 
var rxScheduler = new TaskPoolScheduler(factory); 
someObservable.ObserveOn(rxScheduler)... 

Тогда вы могли бы использовать SelectMany, чтобы начать асинхронную операцию для каждого события в потоке источника по мере их поступления.

Альтернативное, менее идеальное решение - использовать async void для подписки «События». Это приемлемо, но вы должны следить за обработкой ошибок. Как правило, не допускайте распространения исключений из метода async void.

Существует третья альтернатива, в которой вы подключаете наблюдаемый в блок потока данных TPL. Блок, такой как , может указать свой планировщик задач, а Dataflow естественно понимает асинхронные обработчики. Обратите внимание, что по умолчанию блоки потока данных будут дросселировать обработку на один элемент за раз.

+0

Это выглядит правильно. Я попытался создать планировщик из '' SynchronizatioContext.Current'', но это '' null'' в коде зерна Орлеана, а затем я не понял другого способа сделать это. Хотя, интересно, теперь я мог бы использовать этот '' rxScheduler'', а также планировщик в '' Observable.Create'' вместо или в дополнение к 'ObserveOn'' (для справки, [Using Scheduler] (http://msdn.microsoft.com/en-us/library/hh242963%28v=vs.103%29.aspx)). Я думаю, было бы интересно обдумать эффекты в контексте Орлеана, но это в другое время. Я буду на моем компьютере через ~ 8 часов, мои извинения. – Veksi

3

Вообще говоря, вместо того, чтобы подписываться на выполнение, лучше/более идиоматично проецировать параметры задачи в выполнение задачи и подписываться только на результаты. Таким образом, вы можете сочинять с дальнейшим Rx вниз по течению.

например. Учитывая случайный задача, как:

static async Task<int> DoubleAsync(int i, Random random) 
{ 
    Console.WriteLine("Started"); 
    await Task.Delay(TimeSpan.FromSeconds(random.Next(10) + 1)); 
    return i * 2; 
} 

Тогда вы могли бы сделать:

void Main() 
{ 
    var random = new Random(); 

    // stream of task parameters 
    var source = Observable.Range(1, 5); 

    // project the task parameters into the task execution, collect and flatten results 
    source.SelectMany(i => DoubleAsync(i, random)) 

      // subscribe just for results, which turn up as they are done 
      // gives you flexibility to continue the rx chain here 
      .Subscribe(result => Console.WriteLine(result), 
        () => Console.WriteLine("All done.")); 
} 
+1

«Это вообще сомнительно ...» Я бы сказал, что это не только сомнительно, но и всегда неправильно ». Джеймс имеет правильную идею здесь –

+0

Cheers @PaulBetts. Я думаю, вам удалось хорошо прорезать мой британский резерв ... :) –

+0

Хе-хе, это не больно спрашивать, если это кажется сомнительным. :) Действительно, я не уверен, как мне удалось замалчивать факт, что SelecMany отлично справляется с задачами. Итак, я вижу, что '' SelectMany' здесь также может быть '' .SelectMany (i => Task.Factory.StartNew (() => {}). ToObservable()) '', и я мог бы добавить в планировщик клиентов там. Приносим извинения за задержку, мне нужно добраться до моего компьютера вечером (~ 8 часов), чтобы провести более тщательное исследование. – Veksi

 Смежные вопросы

  • Нет связанных вопросов^_^