3

Я новичок в Spark и Cassandra. При попытке представить искровое задание, я получаю сообщение об ошибке при подключении к Кассандре.Разъем Spark-Cassandra: Не удалось открыть собственное подключение к Cassandra

Деталь:

Версия:

Spark : 1.3.1 (build for hadoop 2.6 or later : spark-1.3.1-bin-hadoop2.6) 
Cassandra : 2.0 
Spark-Cassandra-Connector: 1.3.0-M1 
scala : 2.10.5 

искровых и Кассандра находится на виртуальных кластер деталей боеприпасов:

Spark Master : 192.168.101.13 
Spark Slaves : 192.168.101.11 and 192.168.101.12 
Cassandra Nodes: 192.168.101.11 (seed node) and 192.168.101.12 

Я пытаюсь представить работу через клиентскую машину (ноутбук) - 172.16.0.6. После того, как я запустил эту ошибку, я убедился, что могу выполнить ping все машины на кластере с клиентской машины: искровой мастер/slaves и узлы cassandra, а также отключить брандмауэр на всех машинах. Но я все еще борется с этой ошибкой.

Cassandra.yaml

listen_address: 192.168.101.11 (192.168.101.12 on other cassandra node) 
start_native_transport: true 
native_transport_port: 9042 
start_rpc: true 
rpc_address: 192.168.101.11 (192.168.101.12 on other cassandra node) 
rpc_port: 9160 

Я пытаюсь запустить минимальный пример работы

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import com.datastax.spark.connector._ 

val rdd = sc.cassandraTable("test", "words") 
rdd.toArray.foreach(println) 

Чтобы отправить задание, я использую искровой скорлупу (: вставьте код в свече оболочки):

spark-shell --jars "/home/ameya/.m2/repository/com/datastax/spark/spark-cassandra-connector_2.10/1.3.0-M1/spark-cassandra-connector_2.10-1.3.0-M1.jar","/home/ameya/.m2/repository/com/datastax/cassandra/cassandra-driver-core/2.1.5/cassandra-driver-core-2.1.5.jar","/home/ameya/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar","/home/ameya/.m2/repository/io/netty/netty/3.8.0.Final/netty-3.8.0.Final.jar","/home/ameya/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar","/home/ameya/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar","/home/ameya/.m2/repository/org/slf4j/slf4j-api/1.7.10/slf4j-api-1.7.10.jar","/home/ameya/.m2/repository/com/google/collections/google-collections/1.0/google-collections-1.0.jar","/home/ameya/.m2/repository/io/netty/netty/3.8.0.Final/netty-3.8.0.Final.jar","/home/ameya/.m2/repository/com/google/guava/guava/14.0.1/guava-14.0.1.jar","/home/ameya/.m2/repository/org/apache/cassandra/cassandra-clientutil/2.1.5/cassandra-clientutil-2.1.5.jar","/home/ameya/.m2/repository/joda-time/joda-time/2.3/joda-time-2.3.jar","/home/ameya/.m2/repository/org/apache/cassandra/cassandra-thrift/2.1.3/cassandra-thrift-2.1.3.jar","/home/ameya/.m2/repository/org/joda/joda-convert/1.2/joda-convert-1.2.jar","/home/ameya/.m2/repository/org/apache/thrift/libthrift/0.9.2/libthrift-0.9.2.jar","/home/ameya/.m2/repository/org/apache/thrift/libthrift/0.9.2/libthrift-0.9.2.jar" --master spark://192.168.101.13:7077 --conf spark.cassandra.connection.host=192.168.101.11 --conf spark.cassandra.auth.username=cassandra --conf spark.cassandra.auth.password=cassandra 

ошибка я получаю:

warning: there were 1 deprecation warning(s); re-run with -deprecation for details 
**java.io.IOException: Failed to open native connection to Cassandra at {192.168.101.11}:9042** 
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:181) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:167) 
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:167) 
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31) 
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:56) 
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:76) 
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:104) 
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:115) 
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:243) 
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.tableDef(CassandraTableRowReaderProvider.scala:49) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef$lzycompute(CassandraTableScanRDD.scala:59) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.tableDef(CassandraTableScanRDD.scala:59) 
    at com.datastax.spark.connector.rdd.CassandraTableRowReaderProvider$class.verify(CassandraTableRowReaderProvider.scala:148) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.verify(CassandraTableScanRDD.scala:59) 
    at com.datastax.spark.connector.rdd.CassandraTableScanRDD.getPartitions(CassandraTableScanRDD.scala:118) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1512) 
    at org.apache.spark.rdd.RDD.collect(RDD.scala:813) 
    at org.apache.spark.rdd.RDD.toArray(RDD.scala:833) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:33) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:52) 
    at $iwC$$iwC$$iwC.<init>(<console>:54) 
    at $iwC$$iwC.<init>(<console>:56) 
    at $iwC.<init>(<console>:58) 
    at <init>(<console>:60) 
    at .<init>(<console>:64) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) 
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
**Caused by: com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: /192.168.101.11:9042 (com.datastax.driver.core.TransportException: [/192.168.101.11:9042] Connection has been closed))** 
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:223) 
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78) 
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1236) 
    at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:333) 
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:174) 
    ... 71 more 

Может ли кто-нибудь указать, что я делаю неправильно здесь?

ответ

3

Вы не указали spark.cassandra.connection.host по умолчанию spark предположим, что хост cassandra такой же, как и главный узел искры.

var sc:SparkContext=_ 
val conf = new SparkConf().setAppName("Cassandra Demo").setMaster(master) 
.set("spark.cassandra.connection.host", "192.168.101.11") 
c=new SparkContext(conf) 

val rdd = sc.cassandraTable("test", "words") 
rdd.toArray.foreach(println) 

он должен работать, если вы правильно установить семя nodein cassandra.yaml

+0

Мой плохой я забыл упомянуть об этой конфигурации. Я сделал это. Однако проблема решена. Это была проблема зависимости. Когда я поставил жирную банку с зависимостями, она сработала. – ameyamm

2

этот вопрос решен. Это было связано с некоторыми беспорядками с зависимостями. Я построил банку с зависимостями и передал ее, чтобы исправить-submit, вместо указания зависимых банок отдельно.

+1

У меня такая же проблема. Можете ли вы поделиться именем банки, с которой вы работали? Или как вы построили его с зависимостями? – Nadine

+1

У меня такая же проблема. Не могли бы вы описать, какие банки были необходимы? –

0

Это проблема с версией зависимостей банки-кассира-драйвера-ядра.

The provided cassandra's version is 2.0 
The provided cassandra-driver-core jar's version is 2.1.5 

Банкомат должен быть таким же, как и версия хода кассандры.

In this case, the included jar file should be cassandra-driver-core-2.0.0.jar 
3

Я боролся с этой проблемой в одночасье и, наконец, получил комбинацию, которая работает. Я пишу это для тех, кто может столкнуться с подобной проблемой.

Прежде всего, это проблема зависимости cassandra-kernel от версии. Но для отслеживания точной комбинации, которая работает, у меня довольно много времени.

Во-вторых, это комбинация, которая работает для меня.

  1. Искры 1.6.2 с Hadoop 2.6, Cassandra 2.1.5 (Ubuntu 14.04, Java, 1.8),
  2. В built.sbt (SBT сборки, scalaVersion: = "2.10.5"), использовать

"com.datastax.spark" %% "spark-cassandra-connector" % "1.4.0", "com.datastax.cassandra" % "cassandra-driver-core" % "2.1.5"

В-третьих, позвольте мне уточнить мои разочарования. С разъемом spark-cassandra 1.5.0 я могу запустить сборку с помощью spark-submit с -master «local [2]» на той же машине с удаленным подключением cassandra без каких-либо проблем. Любая комбинация разъемов 1.5.0, 1.6.0 с Cassandra 2.0, 2.1, 2.2, 3,4 работает хорошо. Но если я попытаюсь отправить задание кластеру с того же компьютера (NodeManager) с кластером -master yarn --deploy-mode, тогда я всегда буду сталкиваться с проблемой: Не удалось открыть собственное подключение к Cassandra по адресу {192.168. 122.12}: 9042

Что здесь происходит? Любой из DataStarX может взглянуть на эту проблему? Я могу только догадываться, что это имеет какое-то отношение к «cqlversion», которое должно соответствовать версии кластера Cassandra.

Кто-нибудь знает лучшее решение? [Cassandra], [апач искровой]

1

Он работал, наконец:

шагов:

  1. набора listen_address частной IP экземпляра EC2.
  2. не устанавливает какой-либо broadcast_address
  3. набора rpc_address к 0.0.0.0
  4. множество broadcast_rpc_address общественной ф экземпляра EC2.