Я новичок в искры, и у меня есть проблема. Я обрабатываю RDD, сгенерированный с помощью textFile(), который является файлом csv. Для каждой строки я хочу вернуть несколько строк в новый RDD (один, не один). Это мой код:Spark RDD map 1 to many
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}).cache();
Что я делаю здесь, чтобы фильтровать начальную CSV, чтобы получить только LinearAccelerationEvent, то я хочу, чтобы отобразить эти объекты в класс LinearAccelerationEvent и генерировать новый RDD объектов LinearAccelerationEvent. Для каждой строки исходного файла csv мне нужно создать несколько объектов LinearAccelerometerEvent, но я не знаю, как это сделать. Причина, почему я хочу сделать это в том, что в конце этого РДД будет выталкиваться к Кассандре, как это:
javaFunctions(linearAccelerationEventJavaRDD).writerBuilder("d300ea832fe462598f473f76939452283de495a1", "linearaccelerationevent", mapToRow(LinearAccelerationEvent.class)).saveToCassandra();
Так что идеальным решением будет что-то вроде:
JavaRDD<LinearAccelerationEvent> linearAccelerationEventJavaRDD = csvFile.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("LinearAccelerationEvent");
}
}).map(
new Function<String, LinearAccelerationEvent>() {
public LinearAccelerationEvent call(String line) throws Exception {
String[] fields = line.split(",");
for() {
LinearAccelerationEvent linearAccelerationEvent = new LinearAccelerationEvent(Long.valueOf(fields[4]), Float.valueOf(fields[1]), Float.valueOf(fields[2]), Float.valueOf(fields[3]));
return linearAccelerationEvent;
}
}
}).cache();
я могу использовать функцию foreachPartition()
и нажимать каждое событие цикла for на Cassandra, но я видел, что этот подход намного медленнее. Возможно ли, чтобы пользователь не выполнял то, что я хочу сделать? Спасибо
Да flatMap - это ответ, спасибо человеку! Я отправлю решение в свой код. – phcaze