2017-02-02 15 views
1

Я пытаюсь использовать Spark для извлечения префиксов zip-кода, а сгенерированный Spark код не может скомпилироваться из-за попытки инициализировать java.lang.Double с org.apache.spark. unsafe.types.UTF8String как аргумент. Для меня это не совсем понятно, если это проблема с Spark или как я ее использую. Я использую Java 1.8 и spark-mllib_2.10 в локальном режиме. Неисправный код:Spark CompileException в dataset.groupByKey()

public static void read(Dataset<ZipCodeLatLon> zipCodes) { 
    zipCodes.groupByKey(new MapFunction<ZipCodeLatLon, String>() { 
     @Override 
     public String call(ZipCodeLatLon value) throws Exception { 
      return value.getZip().substring(0, 3); 
     } 
    }, Encoders.STRING()).keys().show(); 
} 

Результаты в

вызвано следующими причинами: org.codehaus.commons.compiler.CompileException: Файл 'generated.java', строка 50, столбец 58: Нет применимый конструктор/метод найдено фактические параметры "org.apache.spark.unsafe.types.UTF8String"; кандидаты: «java.lang.Double (double)», «java.lang.Double (java.lang.String)»

Сгенерированный код довольно длинный, поэтому я не буду размещать здесь всю вещь, но ключевой раздел, который вызывает его потерпеть неудачу:

private UTF8String argValue; 
final alex.floyd.lc.geocode.ZipCodeLatLon value1 = false ? null : new alex.floyd.lc.geocode.ZipCodeLatLon(); 
... 
public java.lang.Object apply(java.lang.Object _i) { 
... 
    resultIsNull = false; 
    if (!resultIsNull) { 
     boolean isNull3 = i.isNullAt(1); 
     UTF8String value3 = isNull3 ? null : (i.getUTF8String(1)); 
     resultIsNull = isNull3; 
     argValue = value3; 
    } 

    final java.lang.Double value2 = resultIsNull ? null : new java.lang.Double(argValue); 
    javaBean.setLat(value2); 
... 
} 

ошибка кажется не зависит от типа возвращаемой функции groupByKey (я пытался Integer и боб Java вместо строки). Однако, если я изменил тип входного набора данных на что-то еще, например String, вместо ZipCodeLatLon, этот код работает. Однако из того, что я могу сказать, ZipCodeLatLon, похоже, выполняет все необходимые соглашения Java-компонента, поэтому я не уверен, что мне нужно будет сделать, чтобы изменить его. Я также использовал Spark для чтения в ZipCodeLatLon из CSV, поэтому Spark может обрабатывать класс, просто не в контексте метода groupByKey.

public class ZipCodeLatLon implements Serializable{ 
private String zip; 
private Double lat; 
private Double lng; 
public String getZip() { 
    return zip; 
} 
public void setZip(String zip) { 
    this.zip = zip; 
} 
public Double getLat() { 
    return lat; 
} 
public void setLat(Double lat) { 
    this.lat = lat; 
} 
public Double getLng() { 
    return lng; 
} 
public void setLng(Double lng) { 
    this.lng = lng; 
} 
} 

Некоторая дополнительная информация: Это, кажется, связано с тем, как ZipCodeLatLon считывается из CSV. При ручном создании набора данных код работает нормально.

Полностью нормально:

ZipCodeLatLon l = new ZipCodeLatLon(); 
l.setZip("12345"); 
l.setLat(0.0); 
l.setLng(0.0); 
read(spark.createDataset(Lists.newArrayList(l, l), Encoders.bean(ZipCodeLatLon.class))); 

неработоспособной:

Dataset<ZipCodeLatLon> dataset = spark.read() 
    .option("header", true) 
    .csv(zipCodeData.getAbsolutePath()) 
    .as(Encoders.bean(ZipCodeLatLon.class)); 
dataset.show(); // works - reading in the CSV succeeds 
read(dataset); // fails on groupByKey 

ответ

1

Фигурные его. Вам нужно создать схему для чтения csv. Я предположил, что кодировщик предоставит схему, но это не так. Желательно, чтобы сообщение об ошибке было более полезным!

До:

public static Dataset<ZipCodeLatLon> read(SparkSession spark) { 
    return spark.read() 
      .option("header", true) 
      .csv(ZIP_CODE_DATA.getAbsolutePath()) 
      .as(Encoders.bean(ZipCodeLatLon.class)); 
} 

После:

public static Dataset<ZipCodeLatLon> read(SparkSession spark) { 
    return spark.read() 
      .option("header", true) 
      .option("inferSchema", "true") 
      .csv(ZIP_CODE_DATA.getAbsolutePath()) 
      .as(Encoders.bean(ZipCodeLatLon.class)); 
}