2016-01-06 8 views
1

Я хочу, чтобы каждый рабочий-python запускал R-оболочку с помощью rpy2. Могу ли я это сделать на каком-то этапе настройки, аналогично тому, как я предполагаю, что это произойдет, когда вы импортируете модуль Python, который будет использоваться для последующих задач-исполнителей? Например:Могу ли я подключить внешний (R) процесс к каждому работнику pyspark во время установки

import numpy as np 

df.mapPartitions(lambda x: np.zeros(x)) 

В моем случае я хочу, чтобы вместо того, чтобы начать R оболочку на каждом исполнитель и импорт R библиотеке, которая будет выглядеть примерно так:

import rpy2.robjects as robjects 
from rpy2.robjects.packages import importr 
rlibrary = importr('testrlibrary') 

df.mapPartitions(lambda x: rlibrary.rfunc(x)) 

Но я не хочу этого чтобы произойти внутри вызова до mapPartitions, потому что тогда это произойдет на уровне задачи, а не один раз на ядро ​​исполнителя. Этот подход работает и больше похож на приведенный ниже пример, но не полезен для меня.

def model(partition): 
    import rpy2.robjects as robjects 
    from rpy2.robjects.packages import importr 
    rlibrary = importr('testrlibrary') 
    rlibrary.rfunc(partition) 

df.mapPartitions(model) 

ответ

1

Что-то, как это должно работать нормально:

import rpy2.robjects as robjects 
from rpy2.robjects.packages import importr 

def length_(s): 
    stringi = importr("stringi") 
    return stringi.stri_length(s)[0] 

sc.parallelize(["foo", "bar", "foobar"]).map(length_) 

R object, который представляет R интерпретатор, is a singleton так будет инициализирован только один раз и R не реимпорта уже присоединенные библиотеки. Существует некоторые накладные расходы от вызова require несколько раз, но это должно быть незначительным по сравнению со стоимостью прохождения данных и от R.

Если вы хотите что-то более сложные вы можете создать свой собственный singleton module или использовать Borg pattern для обработки импорта, но это может быть перебор.

Я предполагаю, что это произойдет, когда вы импортировать модуль питона, которые будут использоваться для последующих задач ИСПОЛНИТЕЛЬ

Это на самом деле зависит от конфигурации. По умолчанию Spark повторно использует интерпретаторы между задачами, но это поведение может быть изменено.

Я привел несколько примеров в качестве ответа на вопрос In Apache spark, what is the difference between using mapPartitions and combine use of broadcast variable and map. Может быть, вы найдете их полезными.

+0

См. Соответствующий вопрос здесь: [Как я могу разделить pyspark RDDs, удерживающие функции R] (http://stackoverflow.com/questions/34669751/) – retrocookie