2015-11-29 1 views
0

Я новичок в искры, и у меня есть проблема. Я обрабатываю RDD, сгенерированный с помощью textFile(), который является файлом csv. Для каждой строки я хочу вернуть несколько строк в новый RDD (один, не один). Это мой код:Spark RDD map 1 to many

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
      new Function<String, Boolean>() { 
       public Boolean call(String line) { 
        return line.contains("LinearAccelerationEvent"); 
       } 
      }).map(
      new Function<String, LinearAccelerationEvent>() { 
       public LinearAccelerationEvent call(String line) throws Exception { 
        String[] fields = line.split(","); 
        LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3])); 
        return linearAccelerationEvent; 
       } 
      }).cache(); 

Что я делаю здесь, чтобы фильтровать начальную CSV, чтобы получить только LinearAccelerationEvent, то я хочу, чтобы отобразить эти объекты в класс LinearAccelerationEvent и генерировать новый RDD объектов LinearAccelerationEvent. Для каждой строки исходного файла csv мне нужно создать несколько объектов LinearAccelerometerEvent, но я не знаю, как это сделать. Причина, почему я хочу сделать это в том, что в конце этого РДД будет выталкиваться к Кассандре, как это:

javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra(); 

Так что идеальным решением будет что-то вроде:

JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
       new Function<String, Boolean>() { 
        public Boolean call(String line) { 
         return line.contains("LinearAccelerationEvent"); 
        } 
       }).map(
       new Function<String, LinearAccelerationEvent>() { 
        public LinearAccelerationEvent call(String line) throws Exception { 
         String[] fields = line.split(","); 
         for() { 
          LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3])); 
          return linearAccelerationEvent; 
         } 
       } 
      }).cache(); 

я могу использовать функцию foreachPartition() и нажимать каждое событие цикла for на Cassandra, но я видел, что этот подход намного медленнее. Возможно ли, чтобы пользователь не выполнял то, что я хочу сделать? Спасибо

ответ

1

Если я правильно понимаю вас, верните коллекцию (например, список) LinearAccelerationEvent и позвоните по номеру flatMap вместо map. Это приведет к получению значения в результате RDD для каждого события ускорения.

flatMap - это то же самое, что и вызывающая карта, за которым следует сглаживание. Если вы знакомы с Hive, то он похож на использование DTE разлома, доступного в HiveQL.

+0

Да flatMap - это ответ, спасибо человеку! Я отправлю решение в свой код. – phcaze