Я пытаюсь перенести пример, написанный в Scala (из проекта Apache Spark) на Java, и запуская некоторые проблемы.Как использовать Spark's .newAPIHadoopRDD() из Java
Код
val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(),
classOf[CqlPagingInputFormat],
classOf[java.util.Map[String,ByteBuffer]],
classOf[java.util.Map[String,ByteBuffer]])
из исходного примера Scala строит и работает просто отлично, но
JavaPairRDD rdd = jsc.newAPIHadoopRDD(job.getConfiguration(),
CqlPagingInputFormat.class,
java.util.Map<String, ByteBuffer>.class,
java.util.Map<String, ByteBuffer>.class);
не допускается в Java (Cannot select from parameterized type
).
Изменение
java.util.Map<String, ByteBuffer>.class
в
Class.forName("java.util.Map<String, ByteBuffer>")
дает новую ошибку:
Error:(42, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types;
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<capture#1 of ?>,java.lang.Class<capture#2 of ?>
reason: inferred type does not conform to declared bound(s)
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
bound(s): org.apache.hadoop.mapreduce.InputFormat<capture#1 of ?,capture#2 of ?>
Изменение его в просто java.util.Map.class
возвращающее подобную ошибку:
Error:(44, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types;
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V>
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<java.util.Map>,java.lang.Class<java.util.Map>
reason: inferred type does not conform to declared bound(s)
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat
bound(s): org.apache.hadoop.mapreduce.InputFormat<java.util.Map,java.util.Map>
Итак, что такое правильный перевод? Стоит отметить, что функция newAPIHadoopRDD()
является другой реализацией для Scala и для Java. Документацию по методам можно найти here для Scala и здесь: http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#newAPIHadoopRDD(org.apache.hadoop.conf.Configuration, java.lang.Class, java.lang.Class, java.lang.Class) для Java.
Декларация CqlPagingInputFormat
выглядит следующим образом
public class CqlPagingInputFormat extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat<java.util.Map<java.lang.String,java.nio.ByteBuffer>,java.util.Map<java.lang.String,java.nio.ByteBuffer>> {
Вы пробовали 'java.util.Map.class' вместо' java.util.Map .class'? –
Да, наверное, я должен был добавить это. Спасибо, я отправлю сообщение об ошибке. – martingms