2016-02-19 1 views
4

Я пытаюсь сделать мой код более эффективным, так как мне приходится обрабатывать миллиарды строк данных в кассандре. В настоящее время я использую цикл JAVA в Datastax Cassandra Spark Connector чтобы вытащить данные и поместить их в формат, который мне знаком с (multimap), чтобы получить искру, чтобы сделать манипуляции. Я хотел бы иметь возможность заменить этот цикл Multimap прямой искровой манипуляцией таблицы cassandra на сэкономить время и сделать все более эффективным, я бы очень признателен за любые предложения коды для достижения этой цели Вот мой существующий код:..Как заменить JAVA-контур с помощью Direct Spark Cassandra Table Data Manipulation

 Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";"); 
     stmt.setFetchSize(2000000); 
     ResultSet results = session.execute(stmt); 

// Get the Variables from each Row of Cassandra Data   
     Multimap<Double, Float> data = LinkedListMultimap.create(); 
     for (Row row : results){  
      // Column Names in Cassandra (Case Sensitive) 
      start_frequency = row.getDouble("Start_Frequency"); 
      power = row.getFloat("Power"); 
      bandwidth = row.getDouble("Bandwidth"); 

// Create Channel Power Buckets  
       for(channel = 1.6000E8; channel <= channel_end; ){ 
        if((channel >= start_frequency) && (channel <= (start_frequency + bandwidth))) {  
        data.put(channel, power); 
        } // end if 
        channel+=increment; 
       } // end for  
     } // end "row" for 

// Create Spark List for DataFrame   
     List<Value> values = data.asMap().entrySet() 
      .stream() 
      .flatMap(x -> x.getValue() 
        .stream() 
        .map(y -> new Value(x.getKey(), y))) 
      .collect(Collectors.toList()); 

// Create DataFrame and Calculate Results 
    sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel")) 
     .agg(min("power"), max("power"), avg("power")) 
     .write().mode(SaveMode.Append)  
     .option("table", "results") 
     .option("keyspace", "model") 
     .format("org.apache.spark.sql.cassandra").save(); 

    } // end session 
} // End Compute 
+0

Просмотрите этот пример https://github.com/rssvihla/datastax_work/blob/master/spark_commons/examples/spark_bulk_operations/src/main/scala/pro/foundev/scala/BulkUpgrade.scala – phact

+0

Мне жаль, что я не вижу как это похоже ??? – mithrix

+0

Дело в том, что вы не должны читать драйвер java напрямую, а затем обрабатывать с помощью искры. Используйте метод искрового контекста cassandraTable и пусть искра делает чтение для вас в параллельном, разделенном узле локально. – phact

ответ

1
JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class)); 
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){ 
@Override 
public Iterable<Value> call(MeasuredValue row) throws Exception { 
double start_frequency = row.getStart_frequency(); 
float power = row.getPower(); 
double bandwidth = row.getBandwidth(); 

// Define Variable 
double channel,channel_end, increment; 

// Initialize Variables 
channel_end = 1.6159E8; 
increment = 5000; 

List<Value> list = new ArrayList<Value>(); 
// Create Channel Power Buckets 
for(channel = 1.6000E8; channel <= channel_end;){ 
if((channel >= start_frequency) && (channel <= (start_frequency + bandwidth))) { 
list.add(new Value(channel, power)); 
} // end if 
channel+=increment; 
} // end for 

return list; 
}  
    }); 

    sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel")) 
    .agg(min("power"), max("power"), avg("power")) 
    .write().mode(SaveMode.Append)  
    .option("table", "results") 
    .option("keyspace", "model") 
    .format("org.apache.spark.sql.cassandra").save(); 

} // end session 

public static class MeasuredValue implements Serializable { 

     public MeasuredValue() { } 

     private double start_frequency; 
     public double getStart_frequency() { return start_frequency; } 
     public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; } 

     private double bandwidth ; 
     public double getBandwidth() { return bandwidth; } 
     public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; } 

     private float power;  
     public float getPower() { return power; } 
     public void setPower(float power) { this.power = power; } 

    } 
+0

Не нужно ли устанавливать схему в классе MeasuredValue с помощью геттеров и сеттеров? Кроме того, 'rdd.flatMap (строка ->' lambda вызывает ошибки, требующие «FlatMapFunction ' ', но найденные (строки) -> {d [...];}}' и в цикле для канала жалуется, что «локальные переменные, на которые ссылается выражение lambda, должны быть окончательными или эффективными окончательными». Думаю, мне также нужно было бы сделать «mapRowTo» в исходном выражении «JavaRDD», не так ли? – mithrix

+0

Да, эти вещи нужно было сделать Код, который я написал, просто для справки, он нуждается в некоторых модификациях. –

+0

Я пробовал, но мне не нравится использовать строку lambda на JavaRDD. – mithrix

 Смежные вопросы

  • Нет связанных вопросов^_^