Я изучаю потоки Spark и пытаясь сохранить свой образец Данные о запасах (только строки, такие как «MSFT: 28.29»), полученные от темы Кафка до Кассандры с использованием искрового потока и разъема Cassandra Spark.Сохранить данные из темы Кафка в Кассандру
Без сохранения в Cassandra мой код отлично работает (получите данные от Kafka и выполните некоторые тривиальные вычисления статистики). Кассандра настроена и установлено соединение.
Но если я пытаюсь добавить строку ниже, чтобы сохранить исходные данные в таблице Кассандры до обработки:
stockParsed.saveToCassandra("dashboard","raw_tick")
в Спарк потокового интерфейса Я вижу 1 партия висит в «обработке» состоянии, а все остальное - в статусе «Очередь» и никаких данных в Кассандре.
В Спарк консоли я вижу только строки нравится:
16/02/16 10:18:40 INFO JobScheduler: Added jobs for time 1455635920000 ms
16/02/16 10:18:50 INFO JobScheduler: Added jobs for time 1455635930000 ms
16/02/16 10:19:00 INFO JobScheduler: Added jobs for time 1455635940000 ms
Вот мой код:
case class Stock(ticker: String, price: Double)
// ....
val conf = new SparkConf().setAppName("KafkaStream").setMaster("local[*]")
.set("spark.cassandra.connection.host", "localhost")
.set("spark.cassandra.auth.username", "cassandra")
.set("spark.cassandra.auth.password", "cassandra")
.set("spark.cassandra.connection.keep_alive_ms","60000")
.set("spark.cassandra.input.split.size_in_mb","1")
val ssc = new StreamingContext(conf, Seconds(10))
val topicMap = Map("test" -> 1)
val lines = KafkaUtils.createStream(ssc, "localhost:2181", "test-group", topicMap).map(_._2)
val stockParsed = lines.map(line => line.split(':')).map(s => Stock(s(0).toString, s(1).toDouble))
//Problem here
stockParsed.saveToCassandra("dashboard","raw_tick",SomeColumns("ticker", "price"))
//Some processing below
Мой build.sbt:
import sbt.Keys._
name := "KafkaStreamSbt"
version := "1.0"
scalaVersion := "2.10.6"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-assembly" % "1.6.0"
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector-java" % "1.5.0-RC1"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.16"
Любые идеи, как исправить Это?
У вас есть как минимум 2 ядра? – RussS
2 ядра для чего? Я запускаю Spark локально с опцией «local [*]» – szu
Это должно установить все ядра на машине как доступные. Для запуска приемника требуется, по крайней мере, одно ядро исполнителя (если вы работаете в режиме приемника). Если доступно только одно ядро, вы можете запускать ресивер и фактически обрабатывать данные. – RussS