Я использую Datastax Enterprise 4.5. Надеюсь, я правильно настроил конфигурацию, я сделал это, как на веб-сайте datastax. Я могу написать в Cassandra DB с помощью Windowsservice, это работает, но я не могу запросить Spark, используя функцию where.Datastax DSE Cassandra, Spark, Shark, автономная программа
Я запускаю узел Cassandra (есть только один для тестовой цели) с помощью «./dse cassandra -k -t» (в папке/bin), так что hasoop и искра работают одновременно. Я могу написать в Кассандру без проблем.
Таким образом, вы не можете использовать предложение «где» в запросе Cassandra, когда «где» не является RowKey. Поэтому мне нужно использовать Spark/Shark. Я могу начать и использовать все запросы, которые мне нужны с акулой (./dse shark), но мне нужно написать автономную программу в Scala или Java.
Так что я попробовал эту ссылку: https://github.com/datastax/spark-cassandra-connector
И я могу запросить простое заявление, как:
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "MY_IP")
.setMaster("spark://MY_IP:7077")
.setAppName("SparkTest")
// Connect to the Spark cluster:
lazy val sc = new SparkContext(conf)
val rdd = sc.cassandraTable("keyspace", "tablename")
println(rdd.first)
и это работает хорошо, но если бы я попросить больше строки или считать:
println(rdd.count)
rdd.toArray.foreach(println)
, тогда я получаю это исключение:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Когда я пробую это в Java, у меня такая же проблема. Кто-нибудь знает эту проблему? Я не знаю, правильно ли настроен DB или работает ли scala/Javaprogram. Возможно, некоторые порты заблокированы, но 7077 и 4040 открыты.
Sidenote: Если я начинаю искру на Кассандре БД, я могу сделать запросы, как:
sc.cassandraTable("test","words").select("word").toArray.foreach(println)
Но если я использую «где» пункт как:
sc.cassandraTable("test","words").select("word").where("word = ?","foo").toArray.foreach(println)
Я получаю это исключение :
java.io.IOException: Exception during query execution: SELECT "word" FROM "test"."words" WHERE token("word") > 0 AND word = ? ALLOW FILTERING
У вас есть идея, почему? Я думал, что могу использовать там предложения в искры?
Спасибо!
Версия 1.1 разъема будет иметь поддержку для первичных ключей в 'где '. Патч для этого уже был привязан к основной ветке. –
Но когда 'Все мастера не реагируют! ', Почему он работает над одним запросом типа' .first'. Это подразумевает, что соединение и порт-порт работают нормально, не так ли? Или есть ошибка? Работает ли он по-разному (используйте другие порты и т. Д.), Если используется '.first'? – richie676
Это означает, что соединение и порт передачи работают нормально, не так ли? Или есть ошибка? Работает ли он по-разному (используйте другие порты и т. Д.), Если используется '.first'? Я видел функцию '.filter', но это загружает все данные в программу и делает фильтрацию там, но это определенно должно замедляться. Я просто хочу иметь функциональность, такую же, как акула, в моей автономной программе, я думал, что искра это сделает. Если нет, скажите, пожалуйста, что использовать. – richie676