2009-09-30 3 views
8
 
(fileNameToCharStream "bigfile" 
|>> fuse [length; 
      splitBy (fun x -> x = ' ' || x = '\n') removeEmpty |>> length; 
      splitBy (fun x -> x = '\n') keepEmpty |>> length; 
     ]) 
    (*fuse "fuses" the three functions to run concurrently*) 
|> run 2 (*forces to run in parallel on two threads*) 
|> (fun [num_chars; num_words; num_lines] -> 
     printfn "%d %d %d" 
      num_chars num_words, num_lines)) 

Я хочу, чтобы этот код работал следующим образом: разделил исходный поток на два точно посередине; то для каждой половины запускает отдельное вычисление, которое вычисляет 3 вещи: длину (т. е. количество символов), количество слов, количество строк. Однако я не хочу иметь проблемы, если Я ошибочно разбил слово. Это должно быть позаботился. Файл следует читать только один раз.Параллельная конвейерная обработка

Как программировать указанные функции и оператор | >>? Возможно ли это?

+0

Это может быть то, что США еще не проснулся, но в ожидании того, вы можете захотеть взглянуть на ключевое слово «асинхр», чтобы получить лучшее представление о том, что возможно. – Benjol

+0

Какие подписи вы представляете себе плавким, бегущим и | >>? Например, где ваш список из трех элементов превращается в 3-кортеж? – Gabriel

+0

Правильно, я имею в виду: |> (fun [num_chars; num_words; num_lines] -> –

ответ

8

Похоже, что вы просите совсем немного. Я оставлю это для вас, чтобы понять манипуляции с строкой, но я покажу вам, как определить оператор, который выполняет параллельную серию операций.

Шаг 1: Написать fuse функция

Ваша функция предохранитель появляется на карте один вход с использованием множества функций, которые достаточно легко написать следующим образом:

//val fuse : seq<('a -> 'b)> -> 'a -> 'b list 
let fuse functionList input = [ for f in functionList -> f input] 

Обратите внимание, что все ваши функции отображения должны иметь один и тот же тип.

Шаг 2: Определение оператора для выполнения функций параллельно

Стандартная параллельная функция карты может быть записана следующим образом:

//val pmap : ('a -> 'b) -> seq<'a> -> 'b array 
let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

Насколько мне известно, Async.Parallel будет выполнять операции асинхронных параллельно, где количество параллельных задач, выполняемых в любой момент времени, равно числу ядер на машине (кто-то может исправить меня, если я ошибаюсь). Таким образом, на двухъядерной машине на этой машине должно выполняться не более двух потоков, когда эта функция вызывается. Это хорошо, так как мы не ожидаем ускорения за счет запуска более одного потока на ядро ​​(на самом деле дополнительное переключение контекста может замедлить работу).

Мы можем определить оператор |>> в терминах pmap и fuse:

//val (|>>) : seq<'a> -> seq<('a -> 'b)> -> 'b list array 
let (|>>) input functionList = pmap (fuse functionList) input 

Так оператор |>> занимает кучу входов и отображает их, используя множество различных выходов. До сих пор, если мы помещаем все это вместе, мы получаем следующее (в FSI):

> let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'];; 

val countOccurrences : 'a -> seq<'a> -> int 
val length : string -> int 
val testData : string [] = 
    [|"Juliet is awesome"; "Someone should give her a medal"|] 
val testOutput : int list array = [|[17; 1; 1]; [31; 0; 3]|] 

testOutput содержит два элемента, оба из которых были вычислены параллельно.

Шаг 3: Совокупные элементы в единый выходной

Хорошо, так что теперь у нас есть частичные результаты, представленных каждый элемент в массиве, и мы хотим объединить наши частичные результаты в единый агрегат. Я предполагаю, что каждый элемент массива должен быть объединен с одной и той же функцией, поскольку каждый элемент на входе имеет один и тот же тип данных.

Вот действительно некрасиво функция, которую я написал для работы:

> let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]);; 

val reduceMany : ('a -> 'a -> 'a) -> seq<'a list> -> 'a list 

> reduceMany (+) testOutput;; 
val it : int list = [48; 1; 4] 

reduceMany принимает последовательность последовательностей п длины, и он возвращает массив п длины в качестве выходного сигнала. Если вы можете придумать лучший способ, чтобы написать эту функцию, быть моим гостем :)

Чтобы декодировать результат выше:

  • 48 = сумма длин двух моих входных строк. Обратите внимание, что исходная строка была 49 символов, но разделение ее на «|» съели один символ за «|».
  • 1 = сумма всех экземпляров «J» на моем входе
  • 4 = сумма всех экземпляров «O».

Шаг 4: Положите все вместе

let pmap f l = 
    seq [for a in l -> async { return f a } ] 
    |> Async.Parallel 
    |> Async.RunSynchronously 

let fuse functionList input = [ for f in functionList -> f input] 

let (|>>) input functionList = pmap (fuse functionList) input 

let reduceMany f input = 
    input 
    |> Seq.reduce (fun acc x -> [for (a, b) in Seq.zip acc x -> f a b ]) 

let countOccurrences compareChar source = 
    source |> Seq.sumBy(fun c -> if c = compareChar then 1 else 0) 

let length (s : string) = s.Length 

let testData = "Juliet is awesome|Someone should give her a medal".Split('|') 
let testOutput = 
    testData 
    |>> [length; countOccurrences 'J'; countOccurrences 'o'] 
    |> reduceMany (+)