2016-10-20 2 views
0

У меня есть rdd, читая коллекцию mongodb, и теперь я хочу изменить некоторое значение и обновить/загрузить эти данные в те же или другие коллекции.применить функцию карты на pyspark RDD

mr1 = sc.mongoRDD('mongodb://localhost:27017/test_database.test2') 
type(mr1) #<class 'pyspark.rdd.PipelinedRDD'> 
mr1.collect() 
#[{u'_id': ObjectId('58089490d7531cd8b071f48c'), u'name': u'ravi', u'sal': u'2000'}, {u'_id': ObjectId('58089491d7531cd8b071f48d'), u'name': u'ravi', u'sal': u'3000'}] 
#I want to change the name 'ravi' to 'Satya' 
mr2 = mr1.map(lambda x: x['name'].replace('ravi','SATYA')) 
#o/p: [u'SATYA', u'SATYA'] ##not all values 
#Expected: [{u'_id': ObjectId('58089490d7531cd8b071f48c'), u'name': u'SATYA', u'sal': u'2000'}, {u'_id': ObjectId('58089491d7531cd8b071f48d'), u'name': u'SATYA', u'sal': u'3000'}] 

Пожалуйста, помогите, как применять функцию карты здесь, чтобы получить обратно ту же РДД MR1 с именами заменены.

Спасибо.

ответ

3

Try:

def replace(x, key, fr, to): 
    d = x.copy() 
    if key in d: 
     d[key] = d[key].replace('ravi','SATYA') 
    return d 

mr1.map(lambda x: replace(x, 'name', 'ravi','SATYA) 
2

Понял worked-

def rep(x): 
    if x['name'] == 'ravi': 
     x['name']='SATYA' 
    return x 
mr2 = mr1.map(lambda x: rep(x))