0

Я изучаю потоки Spark и пытаясь сохранить свой образец Данные о запасах (только строки, такие как «MSFT: 28.29»), полученные от темы Кафка до Кассандры с использованием искрового потока и разъема Cassandra Spark.Сохранить данные из темы Кафка в Кассандру

Без сохранения в Cassandra мой код отлично работает (получите данные от Kafka и выполните некоторые тривиальные вычисления статистики). Кассандра настроена и установлено соединение.

Но если я пытаюсь добавить строку ниже, чтобы сохранить исходные данные в таблице Кассандры до обработки:

stockParsed.saveToCassandra("dashboard","raw_tick") 

в Спарк потокового интерфейса Я вижу 1 партия висит в «обработке» состоянии, а все остальное - в статусе «Очередь» и никаких данных в Кассандре.

В Спарк консоли я вижу только строки нравится:

16/02/16 10:18:40 INFO JobScheduler: Added jobs for time 1455635920000 ms 
16/02/16 10:18:50 INFO JobScheduler: Added jobs for time 1455635930000 ms 
16/02/16 10:19:00 INFO JobScheduler: Added jobs for time 1455635940000 ms 

Вот мой код:

case class Stock(ticker: String, price: Double) 
// .... 

val conf = new SparkConf().setAppName("KafkaStream").setMaster("local[*]") 
    .set("spark.cassandra.connection.host", "localhost") 
    .set("spark.cassandra.auth.username", "cassandra") 
    .set("spark.cassandra.auth.password", "cassandra") 
    .set("spark.cassandra.connection.keep_alive_ms","60000") 
    .set("spark.cassandra.input.split.size_in_mb","1") 

val ssc = new StreamingContext(conf, Seconds(10)) 

val topicMap = Map("test" -> 1) 

val lines = KafkaUtils.createStream(ssc, "localhost:2181", "test-group", topicMap).map(_._2) 

val stockParsed = lines.map(line => line.split(':')).map(s => Stock(s(0).toString, s(1).toDouble)) 

//Problem here 
stockParsed.saveToCassandra("dashboard","raw_tick",SomeColumns("ticker", "price")) 

//Some processing below 

Мой build.sbt:

import sbt.Keys._ 

name := "KafkaStreamSbt" 

version := "1.0" 

scalaVersion := "2.10.6" 

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" % "provided" 
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-assembly" % "1.6.0" 
libraryDependencies += "com.datastax.spark" %% "spark-cassandra-connector-java" % "1.5.0-RC1" 
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.16" 

Любые идеи, как исправить Это?

+0

У вас есть как минимум 2 ядра? – RussS

+0

2 ядра для чего? Я запускаю Spark локально с опцией «local [*]» – szu

+0

Это должно установить все ядра на машине как доступные. Для запуска приемника требуется, по крайней мере, одно ядро ​​исполнителя (если вы работаете в режиме приемника). Если доступно только одно ядро, вы можете запускать ресивер и фактически обрабатывать данные. – RussS

ответ

0

Вопрос решён: у меня была ошибка в конфигурации ключей в Cassandra. После отдыха ключевого пространства с помощью этого сценария:

CREATE KEYSPACE tutorial WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}; 

код работает нормально.