2016-10-17 3 views
1

я загрузить мой CSV, используя DataFrame тогда я преобразовал в DataSet, но это показывает, как этогоКак работать с DataSet в Spark с помощью scala?

нескольких маркеров в этой строке:
- Невозможно найти кодер типа, хранящегося в Dataset. Примитивные типы (Int, String и т. Д.) И типы продуктов (классы case) поддерживаются путем импорта
spark.implicits._ Поддержка сериализации других типов будет добавлена ​​в будущих версиях.
- недостаточно аргументов для метода as: (неявные доказательства $ 2:
org.apache.spark.sql.Encoder [DataSet.spark.aacsv]) org.apache.spark.sql.Dataset [DataSet.spark.aacsv] , Неопределенный показатель параметра стоимости $ 2

Как это решить? Мой код -

case class aaCSV(
    a: String, 
    b: String 
    ) 

object WorkShop { 

    def main(args: Array[String]) = { 
    val conf = new SparkConf() 
     .setAppName("readCSV") 
     .setMaster("local") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 
    val customSchema = StructType(Array(
     StructField("a", StringType, true), 
     StructField("b", StringType, true))) 

    val df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").schema(customSchema).load("/xx/vv/ss.csv") 
    df.printSchema() 
    df.show() 
    val googleDS = df.as[aaCSV] 
    googleDS.show() 

    } 

} 

Теперь я изменил основную функцию, как это -

def main(args: Array[String]) = { 
    val conf = new SparkConf() 
     .setAppName("readCSV") 
     .setMaster("local") 
    val sc = new SparkContext(conf) 
    val sqlContext = new SQLContext(sc) 
import sqlContext.implicits._; 
    val sa = sqlContext.read.csv("/xx/vv/ss.csv").as[aaCSV] 
    sa.printSchema() 
    sa.show() 
} 

Но он бросает ошибку - Исключение в потоке "основного" org.apache.spark.sql.AnalysisException: не может Разрешить 'Adj_Close' с входными столбцами: [_c1, _c2, _c5, _c4, _c6, _c3, _c0]; строка 1 поз. 7. Что мне делать ?

Теперь я выполняю свой метод, используя на основе заданного интервала времени, используя искрогасификатор. Но я ссылаюсь на эту ссылку - https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. Пожалуйста, помогите нам.

+1

«не хватает аргументов для метода» ... Какой метод? Где ваш код? –

+0

Хм. Пожалуйста, не используйте комментарии для кода. [править] ваш вопрос и отформатировать его соответствующим образом. Спасибо –

+0

@Sarathkumar Vulchi: Можете ли вы попробовать добавить эту строку 'sqlContext.implicits._', прежде чем конвертировать df в ds. – Shankar

ответ

0

У вас есть заголовок (имена столбцов) в ваших CSV-файлов? Если да, попробуйте добавить .option("header","true") в инструкции чтения. Пример: sqlContext.read.option("header","true").csv("/xx/vv/ss.csv").as[aaCSV].

Ниже блога есть различные примеры для Dataframes и Dataset: http://technippet.blogspot.in/2016/10/different-ways-of-creating.html

+0

Спасибо, друг. Его рабочий штраф. –

0

Попробуйте добавить импорт, прежде чем конвертировать DF в DS.

sc.implicits._ 

ИЛИ

sqlContext.implicits._ 

Для получения дополнительной информации о работе с DataSet https://spark.apache.org/docs/latest/sql-programming-guide.html#creating-datasets

+0

Спасибо большое приятель. Я попробовал другой подход: val sa = sqlContext.read.csv («/ home/kenla/Spark_Samples/google.csv»). [GoogleCSV] –

+0

Я пробовал использовать другой подход: val sa = sqlContext.read.csv (" /home/kenla/Spark_Samples/google.csv").as[googleCSV], но выдает ошибку «Исключение в потоке» main «org.apache.spark.sql.AnalysisException: не может разрешить« Date »с учетом входных столбцов: [_c3 , _c4, _c0, _c1, _c5, _c6, _c2]; ". Пожалуйста, помогите нам. –

 Смежные вопросы

  • Нет связанных вопросов^_^