, поэтому у меня есть искробезопасный код, который извлекает некоторые документы из mongodb, выполняет некоторые преобразования и пытается сохранить его обратно в mongodb.Как сохранить список scala для mongodb, используя искру
Проблема происходит, когда я пытаюсь упорствовать объект List, используя следующие функции:
Сначала я генерировать некоторые кортежи, используя эту функцию:
val usersRDD = rdd.flatMap(breakoutFileById).distinct().groupByKey().mapValues(_.toList)
Затем я конвертировать поля кортежей документов с использованием пользовательские функции mapToDocument
и вызовите функцию saveToMongoDB
:
usersRDD.map(mapToDocument).saveToMongoDB()
Я получаю следующее сообщение об ошибке Сообща е:
org.bson.codecs.configuration.CodecConfigurationException: Can't find a codec for class scala.collection.immutable.$colon$colon.
at org.bson.codecs.configuration.CodecCache.getOrThrow(CodecCache.java:46)
at org.bson.codecs.configuration.ProvidersCodecRegistry.get(ProvidersCodecRegistry.java:63)
at org.bson.codecs.configuration.ChildCodecRegistry.get(ChildCodecRegistry.java:51)
at org.bson.codecs.DocumentCodec.writeValue(DocumentCodec.java:174)
at org.bson.codecs.DocumentCodec.writeMap(DocumentCodec.java:189)
at org.bson.codecs.DocumentCodec.encode(DocumentCodec.java:131)
at org.bson.codecs.DocumentCodec.encode(DocumentCodec.java:45)
at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:63)
at org.bson.codecs.BsonDocumentWrapperCodec.encode(BsonDocumentWrapperCodec.java:29)
at com.mongodb.connection.InsertCommandMessage.writeTheWrites(InsertCommandMessage.java:101)
at com.mongodb.connection.InsertCommandMessage.writeTheWrites(InsertCommandMessage.java:43)
at com.mongodb.connection.BaseWriteCommandMessage.encodeMessageBodyWithMetadata(BaseWriteCommandMessage.java:129)
at com.mongodb.connection.RequestMessage.encodeWithMetadata(RequestMessage.java:160)
at com.mongodb.connection.WriteCommandProtocol.sendMessage(WriteCommandProtocol.java:212)
at com.mongodb.connection.WriteCommandProtocol.execute(WriteCommandProtocol.java:101)
at com.mongodb.connection.InsertCommandProtocol.execute(InsertCommandProtocol.java:67)
at com.mongodb.connection.InsertCommandProtocol.execute(InsertCommandProtocol.java:37)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:159)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:286)
at com.mongodb.connection.DefaultServerConnection.insertCommand(DefaultServerConnection.java:115)
at com.mongodb.operation.MixedBulkWriteOperation$Run$2.executeWriteCommandProtocol(MixedBulkWriteOperation.java:455)
at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:646)
at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:401)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:179)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:230)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:221)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
at com.mongodb.Mongo.execute(Mongo.java:781)
at com.mongodb.Mongo$2.execute(Mongo.java:764)
at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:323)
at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:311)
at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:132)
at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:132)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:132)
at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:131)
at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:131)
at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:130)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Если удалить список (не ставят в качестве поля в документе) в функции mapToDocument
, все работает. Я уже искал через Интернет для подобных проблем, и я не мог найти решение, которое подходит. У кого-нибудь есть ключ к тому, как его решить?
Заранее спасибо