2016-09-27 2 views
0

Ниже приведен скрипт python, который я использую для записи в HDFS. RDD - это пара RDD. Скрипт отлично работает, но он создает запись как кортеж в HDFS. Можно удалить кортеж и просто создать записи с разделенной запятой в HDFS.Сохраните файл в HDFS с пары RDD

import sys 
from pyspark import SparkContext 

if len(sys.argv) < 2: 
    print 'Insufficient arguments' 
    sys.exit() 

sc = SparkContext() 
initialrdd1 = sc.textFile(sys.argv[1]) 
finalRDD1 = initialrdd1.map(lambda x:x.split(',')).map(lambda x :(x[1],x[0])).sortByKey() 
print finalRDD1.getNumPartitions() 
finalRDD1.saveAsTextFile('/export_dir/result3/') 

файл, хранящий в HDFS в формате ниже

(u'Alpha', u'E03') 
(u'Beta', u'E02') 
(u'Gamma', u'E05') 
(u'Delta', u'E09') 

ответ

1

Почему не первая карта кортеж в строку, а затем сохранить его -

finalRDD1.map(lambda x: ','.join(str(s) for s in x)).saveAsTextFile('/export_dir/result3/') 
+0

Хотя этот код может помочь ответить на вопрос, добавление некоторых объяснений поможет сделать ответ более полезным, особенно если он появится в поиске. – paisanco

+0

Я думал, что это было незначительное и самоочевидное предложение, но, конечно, хорошо. Обновлено. –

0
finalRDD1 = initialrdd1.map(lambda x:x.split(',')).map(lambda x :(x[1],x[0])).sortByKey() 

понимаю ваш код. В вашем исходном RDD вы сопоставляете каждую запись с кортежем. карта (лямбда-х: (х [1], х [0]))

finalRDD1.saveAsTextFile('/export_dir/result3/') 

После операции sortByKey, вы непосредственно перейти к сохранить RDD в текстовом файле.

Для того, чтобы сохранить записи как CSV, вы должны указать его в явном виде, как так -

def csv_format(data): 
    return ','.join(str(d) for d in data) 

# Rest of the code ... 

finalRDD1.map(csv_format).saveAsTextFile('/export_dir/result3/') 
0

У меня был подобный вопрос. Проблема с

map(lambda x: ','.join(str(s) for s in x)).saveAsTextFile(....) 

является то, что это позволит сэкономить «присоединиться» в одну строку и скроет запятой и может быть головная боль, если вы планируете использовать для анализа, как загрузка, как панд ФР. Таким образом, вы строка будет выглядеть следующим образом

[ 'Alpha, E03', 'Beta, E02',....] 

простым решением было бы вставить другой раскол карты перед тем saveAsTextFile()

.map(lambda x: x.split(',')).saveAsTextFile(....) 

Так окончательный код будет выглядеть следующим образом

finalRDD1.map(csv_format).map(lambda x: ','.join(str(s) for s in x)).map(lambda x: x.split(',')).saveAsTextFile('/export_dir/result3/') 

Теперь ваш csv будет выглядеть следующим образом:

[ 'Alpha', 'E03'] 
['Beta', 'E02'] 
..... 
.....