У меня есть сценарий для анализа дампов BSON, однако он работает только с несжатыми файлами. Я получаю пустой RDD при чтении gz-файлов bson.PySpark: пустой RDD при чтении gzipped файлов BSON
pyspark_location = 'lib/pymongo_spark.py'
HDFS_HOME = 'hdfs://1.1.1.1/'
INPUT_FILE = 'big_bson.gz'
class BsonEncoder(JSONEncoder):
def default(self, obj):
if isinstance(obj, ObjectId):
return str(obj)
elif isinstance(obj, datetime):
return obj.isoformat()
return JSONEncoder.default(self, obj)
def setup_spark_with_pymongo(app_name='App'):
conf = SparkConf().setAppName(app_name)
sc = SparkContext(conf=conf)
sc.addPyFile(pyspark_location)
return sc
def main():
spark_context = setup_spark_with_pymongo('PysparkApp')
filename = HDFS_HOME + INPUT_FILE
import pymongo_spark
pymongo_spark.activate()
rdd = spark_context.BSONFileRDD(filename)
print(rdd.first()) #Raises ValueError("RDD is empty")
Я использую Монго-Java-драйвер-3.2.2.jar, Монго-Hadoop-искровым-1.5.2.jar, PyMongo-3.2.2-py2.7-Linux-x86_64 и pymongo_spark в наряду с искрами-submit. Версия развертки Spark - 1.6.1 вместе с Hadoop 2.6.4.
Я знаю, что библиотека не поддерживает разделение сжатых файлов BSON, однако это должно быть с одним разделом. У меня есть сотни сжатых файлов BSON для анализа и дефлирования каждого из них, похоже, не является жизнеспособным вариантом.
Любая идея, как мне идти дальше? Спасибо заранее!