2016-04-08 4 views
1

Возможно ли инкапсулировать повторяющиеся отправки/ответы на один и тот же изолят дротика в рамках одной асинхронной функции?инкапсулировать повторенную отправку/ответы на один и тот же Dart-изолятор в пределах одной асинхронной функции

фона:

Для того, чтобы разработать удобный API, я хотел бы иметь функцию асинхронно возвращает результат, генерируемый изолята, например,

var ans = await askIsolate(isolateArgs); 

Это прекрасно работает, если я непосредственно использовать ответ, генерируемый при вызове spawnUri, например

Future<String> askIsolate(Map<String,dynamic> isolateArgs) { 

ReceivePort response = new ReceivePort(); 
var uri = Uri.parse(ISOLATE_URI); 

Future<Isolate> remote = Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort); 
return remote.then((i) => response.first) 
       .catchError((e) { print("Failed to spawn isolate"); }) 
       .then((msg) => msg.toString()); 
} 

Недостатком описанного выше подхода, однако, заключается в том, что если мне нужно повторно позвонить askIsolate, в изолят должен появляться каждый раз.

Я бы хотел общаться с работающим изолятом, что, безусловно, возможно, если изоляция возвращает sendPort вызывающему. Но я считаю, что с 2013 Isolate refactoring это требует, чтобы вызывающий абонент прослушивал последующие сообщения на приемном устройстве, что делало невозможным инкапсуляцию внутри одной асинхронной функции.

Есть ли какой-то механизм для достижения этого, которого я не хватает?

+1

Это в то время как я играл с изолятов. Предполагается, что https://pub.dartlang.org/packages/isolate предоставит хороший API для изоляции. Я предполагаю, что стоит поближе познакомиться. –

+1

'IsolateRunner' в' package: isolate' предназначен для вызова функции в изолированном изоляторе более одного раза. Я думаю, что это сработает для этой проблемы: 'var runner = await IsolateRunner.spawn(); for (var arg in something)} {... await runner.run (queryFunction, arg); ...} await runner.close(); ' – lrn

+1

Другой вариант заключается в том, что выполняется изоляция службы, но вместо того, чтобы каждый раз возвращать результат на один и тот же порт, каждый запрос может отправлять с ним свой собственный« SendPort ». Затем каждый запрос может создать «ResponsePort» и вернуть «first» getter для ответа: «Future askIsolate (isolateArgs)» {var p = new ReceivePort(); runningIsolatePort.send ([isolateArgs, p.sendPort]); return p.first; } '. – lrn

ответ

1

Ниже приведен быстрый рабочий пример, основанный на комментарии lrn выше. Пример инициализирует изоляцию через spawnURI, а затем связывается с изолятом, передавая новый ReceivePort, на котором ожидается ответ. Это позволяет askIsolate напрямую возвращать ответ из изолятора runwnURI.

Замечание об обработке ошибок для ясности было опущено.

код изолят:

import 'dart:isolate'; 
import 'dart:convert' show JSON; 

main(List<String> initArgs, SendPort replyTo) async { 
    ReceivePort receivePort = new ReceivePort(); 
    replyTo.send(receivePort.sendPort); 

    receivePort.listen((List<dynamic> callArgs) async { 
    SendPort thisResponsePort = callArgs.removeLast(); //last arg must be the offered sendport 
    thisResponsePort.send("Map values: " + JSON.decode(callArgs[0]).values.join(",")); 
    }); 
} 

телефонный код:

import 'dart:async'; 
import 'dart:isolate'; 
import 'dart:convert'; 


const String ISOLATE_URI = "http://localhost/isolates/test_iso.dart"; 
SendPort isolateSendPort = null; 

Future<SendPort> initIsolate(Uri uri) async { 
    ReceivePort response = new ReceivePort(); 
    await Isolate.spawnUri(uri, [], response.sendPort, errorsAreFatal: true); 
    print("Isolate spawned from $ISOLATE_URI"); 
    return await response.first; 
} 


Future<dynamic> askIsolate(Map<String,String> args) async { 
    if (isolateSendPort == null) { 
    print("ERROR: Isolate has not yet been spawned"); 
    isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI)); //try again 
    } 

    //Send args to the isolate, along with a receiveport upon which we listen for first response 
    ReceivePort response = new ReceivePort(); 
    isolateSendPort.send([JSON.encode(args), response.sendPort]); 
    return await response.first; 
} 

main() async { 
    isolateSendPort = await initIsolate(Uri.parse(ISOLATE_URI)); 

    askIsolate({ 'foo':'bar', 'biz':'baz'}).then(print); 
    askIsolate({ 'zab':'zib', 'rab':'oof'}).then(print); 
    askIsolate({ 'One':'Thanks', 'Two':'lrn'}).then(print); 
} 

Выход

Isolate spawned from http://localhost/isolates/test_iso.dart 
Map values: bar,baz 
Map values: zib,oof 
Map values: Thanks,lrn 
2

Ответ зависит от того, как вы собираетесь использовать изолят

  • Вы намерены сохранить это работает на неопределенный срок, посылая его входы и ожидает получить ответы асинхронно?

  • Вы хотите отправить изолят многих (но конечных) входов сразу, ожидать получать ответы асинхронно, а затем закрыть изоляцию?

Я предполагаю, что последний, и ваша askIsolate() функция должна немедленно вернуть Future, чем завершается, когда он получает ответы на все вопросы.

Цикл await for может использоваться для прослушивания потока и потребления событий до его закрытия.

Я не знаком с изолятами, поэтому надеюсь, что все в порядке, я его не тестировал. Я предположил, что изоляция завершается, и реакция закрывается.

String askIsolate(Map<String,dynamic> isolateArgs) async { 

    ReceivePort response = new ReceivePort(); 
    var uri = Uri.parse(ISOLATE_URI); 

    Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort) 
    .catchError((e)) { 
    throw ...; 
    }); 

    List<String> answers = new List<String>; 

    await for(var answer in response) { 
    out.add(answer.toString()); 
    } 

    return answers; 
} 

Примечание:

  • response это поток вы слушаете ответы. Создано до, создавая изолят, поэтому вам не нужно (и, вероятно, не стоит) ждать завершения изолята, прежде чем слушать его.

  • Я сделал askIsolate() асинхр потому, что делает его очень легко сразу возвращать будущее, которое завершает, когда функция возвращает - не все, что утомительно отводом о с .then(...) цепями, которые я лично считаю запутанным и трудно читать.

BTW, исходный код then(...).catchError(...) стиля будет лучше написано так:

Isolate.spawnUri(uri, [JSON.encode(isolateArgs)], response.sendPort) 
    .catchError((e) { ... }); 

    return response.first) 
    .then((msg) => msg.toString()); 

Я считаю, что отсрочка присоединения обработчика catchError к линии после создания изолят, возможно, поможет в будущем укомплектовать ошибка до обработчик на месте.

См. https://www.dartlang.org/articles/futures-and-error-handling/#potential-problem-failing-to-register-error-handlers-early.

+0

Собственно, первый в ваших двух пулях - это то, что я хотел бы. Спасибо за уловку тогдашнего (...) вопроса catchError (...)! – ilikerobots

+0

Вы хотите реализовать функцию, которая отправляет вход в изоляцию, и возвращает будущее, которое завершается, когда изолятор отправляет результат, который вы можете вызывать повторно? –

2

Я также рекомендую посмотреть на IsolateRunner в package:isolate, он предназначен для решения таких проблем, как это - вызов функции в одних и тех же изолят несколько раз, а не только один раз, когда создается изолят.

Если вы не хотите этого, есть и другие, более примитивные, варианты

функции Async могут ждать фьючерсов или потоков и ReceivePort представляет собой поток. Для быстрого взлома вы можете что-то сделать с await for в потоке ответа, но это будет не очень удобно.

Обертывание ReceivePort в StreamQueue от package:async - лучший выбор. Это позволяет конвертировать отдельные события в фьючерсы. Что-то вроде:

myFunc() async { 
    var responses = new ReceivePort(); 
    var queue = new StreamQueue(responses); 
    // queryFunction sends its own SendPort on the port you pass to it. 
    var isolate = await isolate.spawn(queryFunction, [], responses.sendPort); 
    var queryPort = await queue.next(); 
    for (var something in somethingToDo) { 
    queryPort.send(something); 
    var response = await queue.next(); 
    doSomethingWithIt(response); 
    } 
    queryPort.send("shutdown command"); 
    // or isolate.kill(), but it's better to shut down cleanly. 
    responses.close(); // Don't forget to close the receive port. 
}