2014-09-01 1 views
2

Я использую Datastax Enterprise 4.5. Надеюсь, я правильно настроил конфигурацию, я сделал это, как на веб-сайте datastax. Я могу написать в Cassandra DB с помощью Windowsservice, это работает, но я не могу запросить Spark, используя функцию where.Datastax DSE Cassandra, Spark, Shark, автономная программа

Я запускаю узел Cassandra (есть только один для тестовой цели) с помощью «./dse cassandra -k -t» (в папке/bin), так что hasoop и искра работают одновременно. Я могу написать в Кассандру без проблем.

Таким образом, вы не можете использовать предложение «где» в запросе Cassandra, когда «где» не является RowKey. Поэтому мне нужно использовать Spark/Shark. Я могу начать и использовать все запросы, которые мне нужны с акулой (./dse shark), но мне нужно написать автономную программу в Scala или Java.

Так что я попробовал эту ссылку: https://github.com/datastax/spark-cassandra-connector

И я могу запросить простое заявление, как:

val conf = new SparkConf(true) 
    .set("spark.cassandra.connection.host", "MY_IP") 
    .setMaster("spark://MY_IP:7077") 
    .setAppName("SparkTest") 

// Connect to the Spark cluster: 
lazy val sc = new SparkContext(conf) 

val rdd = sc.cassandraTable("keyspace", "tablename") 
println(rdd.first) 

и это работает хорошо, но если бы я попросить больше строки или считать:

println(rdd.count) 
rdd.toArray.foreach(println) 

, тогда я получаю это исключение:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up. 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1049) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1033) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1031) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1031) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:635) 
at scala.Option.foreach(Option.scala:236) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:635) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1234) 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Когда я пробую это в Java, у меня такая же проблема. Кто-нибудь знает эту проблему? Я не знаю, правильно ли настроен DB или работает ли scala/Javaprogram. Возможно, некоторые порты заблокированы, но 7077 и 4040 открыты.

Sidenote: Если я начинаю искру на Кассандре БД, я могу сделать запросы, как:

sc.cassandraTable("test","words").select("word").toArray.foreach(println) 

Но если я использую «где» пункт как:

sc.cassandraTable("test","words").select("word").where("word = ?","foo").toArray.foreach(println) 

Я получаю это исключение :

java.io.IOException: Exception during query execution: SELECT "word" FROM "test"."words" WHERE token("word") > 0 AND word = ? ALLOW FILTERING 

У вас есть идея, почему? Я думал, что могу использовать там предложения в искры?

Спасибо!

ответ

0

Пока это мое решение. Это не ответ на все мои вопросы, но он работает для меня, и я хочу поделиться им с вами.

Я использую драйвер hive jdbc для доступа к SharkServer с Java.Как это работает:

Начало sharkserver: bin/dse shark --service sharkserver -p <port>

Зависимости для Maven:

<dependency> 
    <groupId>org.apache.hive</groupId> 
    <artifactId>hive-jdbc</artifactId> 
    <version>0.13.1</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.hadoop</groupId> 
    <artifactId>hadoop-core</artifactId> 
    <version>0.20.2</version> 
</dependency> 

Java код:

import java.sql.Connection; 
import java.sql.DriverManager; 
import java.sql.ResultSet; 
import java.sql.SQLException; 
import java.sql.Statement; 

public class HiveJdbcClient { 
    private static String driverName = "org.apache.hadoop.hive.jdbc.HiveDriver"; 

    public static void main(String[] args) throws SQLException { 
    try { 
     Class.forName(driverName); 
    } catch (ClassNotFoundException e) { 
     e.printStackTrace(); 
     System.exit(1); 
    } 
    Connection con = DriverManager.getConnection("jdbc:hive://YOUR_IP:YOUR_PORT/default", "", ""); 
    Statement stmt = con.createStatement(); 
    String sql; 
    ResultSet res; 



    sql = "SELECT * FROM keyspace.colFam WHERE name = 'John'"; 
    res = stmt.executeQuery(sql); 
    while (res.next()) { 
     System.out.println(res.getString("name")); 
    } 
} 
} 
2
All masters are unresponsive! 

Подразумевает, что IP-адрес, к которому вы пытаетесь подключиться, фактически не связан искрой. Таким образом, это в основном ошибка конфигурации сети. Сканируйте, какие интерфейсы прослушивают 7077 и убедитесь, что вы подключаетесь к правильному интерфейсу.

Что касается второго вопроса, то оператор where подразумевает, что вы намерены выполнить предикат в этом разделе. В настоящее время вы не можете делать это с помощью первичных ключей. Если вы хотите, чтобы where на одном первичном ключе, вы можете сделать filter, чтобы выполнить это, но вы не увидите большой производительности, так как это будет выполнять сканирование всей таблицы.

+1

Версия 1.1 разъема будет иметь поддержку для первичных ключей в 'где '. Патч для этого уже был привязан к основной ветке. –

+0

Но когда 'Все мастера не реагируют! ', Почему он работает над одним запросом типа' .first'. Это подразумевает, что соединение и порт-порт работают нормально, не так ли? Или есть ошибка? Работает ли он по-разному (используйте другие порты и т. Д.), Если используется '.first'? – richie676

+0

Это означает, что соединение и порт передачи работают нормально, не так ли? Или есть ошибка? Работает ли он по-разному (используйте другие порты и т. Д.), Если используется '.first'? Я видел функцию '.filter', но это загружает все данные в программу и делает фильтрацию там, но это определенно должно замедляться. Я просто хочу иметь функциональность, такую ​​же, как акула, в моей автономной программе, я думал, что искра это сделает. Если нет, скажите, пожалуйста, что использовать. – richie676

 Смежные вопросы

  • Нет связанных вопросов^_^