Я пытаюсь сделать мой код более эффективным, так как мне приходится обрабатывать миллиарды строк данных в кассандре. В настоящее время я использую цикл JAVA в Datastax Cassandra Spark Connector чтобы вытащить данные и поместить их в формат, который мне знаком с (multimap), чтобы получить искру, чтобы сделать манипуляции. Я хотел бы иметь возможность заменить этот цикл Multimap прямой искровой манипуляцией таблицы cassandra на сэкономить время и сделать все более эффективным, я бы очень признателен за любые предложения коды для достижения этой цели Вот мой существующий код:..Как заменить JAVA-контур с помощью Direct Spark Cassandra Table Data Manipulation
Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
stmt.setFetchSize(2000000);
ResultSet results = session.execute(stmt);
// Get the Variables from each Row of Cassandra Data
Multimap<Double, Float> data = LinkedListMultimap.create();
for (Row row : results){
// Column Names in Cassandra (Case Sensitive)
start_frequency = row.getDouble("Start_Frequency");
power = row.getFloat("Power");
bandwidth = row.getDouble("Bandwidth");
// Create Channel Power Buckets
for(channel = 1.6000E8; channel <= channel_end; ){
if((channel >= start_frequency) && (channel <= (start_frequency + bandwidth))) {
data.put(channel, power);
} // end if
channel+=increment;
} // end for
} // end "row" for
// Create Spark List for DataFrame
List<Value> values = data.asMap().entrySet()
.stream()
.flatMap(x -> x.getValue()
.stream()
.map(y -> new Value(x.getKey(), y)))
.collect(Collectors.toList());
// Create DataFrame and Calculate Results
sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
.agg(min("power"), max("power"), avg("power"))
.write().mode(SaveMode.Append)
.option("table", "results")
.option("keyspace", "model")
.format("org.apache.spark.sql.cassandra").save();
} // end session
} // End Compute
Просмотрите этот пример https://github.com/rssvihla/datastax_work/blob/master/spark_commons/examples/spark_bulk_operations/src/main/scala/pro/foundev/scala/BulkUpgrade.scala – phact
Мне жаль, что я не вижу как это похоже ??? – mithrix
Дело в том, что вы не должны читать драйвер java напрямую, а затем обрабатывать с помощью искры. Используйте метод искрового контекста cassandraTable и пусть искра делает чтение для вас в параллельном, разделенном узле локально. – phact