2016-10-21 4 views
1

Я использую DSE Spark Job Server. Задача, которую я пытаюсь выполнить, заключается в следующем:Java-программа в Spark Job Server бросает scala.MatchError

Ожидается, что искра, которую я создал на Java, выберет некоторые данные из db cassandra, и это будет развернуто в кластере DSE Analytics.

код следующим образом:

package com.symantec.nsp.analytics; 

import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions; 
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo; 

import java.io.Serializable; 
import java.util.List; 
import java.util.UUID; 

import org.apache.commons.lang.StringUtils; 
import org.apache.spark.SparkContext; 
import org.apache.spark.api.java.JavaSparkContext; 

import spark.jobserver.JavaSparkJob; 
import spark.jobserver.SparkJobInvalid; 
import spark.jobserver.SparkJobValid$; 
import spark.jobserver.SparkJobValidation; 

import com.symantec.nsp.analytics.model.Bucket; 
import com.typesafe.config.Config; 

public class JavaSparkJobBasicQuery extends JavaSparkJob { 

    public String runJob(JavaSparkContext jsc, Config config) { 
     try { 
      List<UUID> bucketRecords = javaFunctions(jsc).cassandraTable("nsp_storage", "bucket", mapRowTo(Bucket.class)) 
        .select("id", "deleted").filter(s -> s.getDeleted()).map(s -> s.getId()).collect(); 

      System.out.println(">>>>>>>> Total Buckets getting scanned by Spark :" + bucketRecords.size()); 
      return bucketRecords.toString(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
      return null; 
     } 
    } 

    public SparkJobValidation validate(SparkContext sc, Config config) { 
     return null; 
    } 

    public String invalidate(JavaSparkContext jsc, Config config) { 
     return null; 
    } 
} 

Выпуск:

При осуществлении этого кода я получаю ниже вопрос:

"status": "ERROR", 
    "result": 
    "message": "null", 
    "errorClass": "scala.MatchError", 
    "stack": ["spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.scala:244)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)", "java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)", "java.lang.Thread.run(Thread.java:745)"] 

Может кто-то разрешающую вопрос. Примечание: Я пытался очистить папку /tmp несколько раз. Не удалось решить эту проблему. Версия DSE, которую я использую, - 4.8.10.

ответ

0

Я не совсем уверен, не хотите ли вы вернуть null на исключение. Я оставил бы его для распространения.

0

Я попытался удалить нулевую инструкцию. Тем не менее проблема сохраняется. Я не вижу реальных образцов искры Java, которые сканируют таблицы cassandra. Может ли кто-нибудь подтвердить, правильна ли структура этой работы (например, переопределить runJob) и т. Д.? Если у кого-то есть какие-либо образцы искры Java, которые касаются сканирования таблицы cassandra db, это было бы полезно.