2014-06-27 2 views
4

Я пытаюсь перенести пример, написанный в Scala (из проекта Apache Spark) на Java, и запуская некоторые проблемы.Как использовать Spark's .newAPIHadoopRDD() из Java

Код

val casRdd = sc.newAPIHadoopRDD(job.getConfiguration(), 
    classOf[CqlPagingInputFormat], 
    classOf[java.util.Map[String,ByteBuffer]], 
    classOf[java.util.Map[String,ByteBuffer]]) 

из исходного примера Scala строит и работает просто отлично, но

JavaPairRDD rdd = jsc.newAPIHadoopRDD(job.getConfiguration(), 
    CqlPagingInputFormat.class, 
    java.util.Map<String, ByteBuffer>.class, 
    java.util.Map<String, ByteBuffer>.class); 

не допускается в Java (Cannot select from parameterized type).

Изменение

java.util.Map<String, ByteBuffer>.class 

в

Class.forName("java.util.Map<String, ByteBuffer>") 

дает новую ошибку:

Error:(42, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types; 
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V> 
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<capture#1 of ?>,java.lang.Class<capture#2 of ?> 
reason: inferred type does not conform to declared bound(s) 
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
bound(s): org.apache.hadoop.mapreduce.InputFormat<capture#1 of ?,capture#2 of ?> 

Изменение его в просто java.util.Map.class возвращающее подобную ошибку:

Error:(44, 30) java: method newAPIHadoopRDD in class org.apache.spark.api.java.JavaSparkContext cannot be applied to given types; 
required: org.apache.hadoop.conf.Configuration,java.lang.Class<F>,java.lang.Class<K>,java.lang.Class<V> 
found: org.apache.hadoop.conf.Configuration,java.lang.Class<org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat>,java.lang.Class<java.util.Map>,java.lang.Class<java.util.Map> 
reason: inferred type does not conform to declared bound(s) 
inferred: org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat 
bound(s): org.apache.hadoop.mapreduce.InputFormat<java.util.Map,java.util.Map> 

Итак, что такое правильный перевод? Стоит отметить, что функция newAPIHadoopRDD() является другой реализацией для Scala и для Java. Документацию по методам можно найти here для Scala и здесь: http://spark.apache.org/docs/latest/api/java/org/apache/spark/api/java/JavaSparkContext.html#newAPIHadoopRDD(org.apache.hadoop.conf.Configuration, java.lang.Class, java.lang.Class, java.lang.Class) для Java.

Декларация CqlPagingInputFormat выглядит следующим образом

public class CqlPagingInputFormat extends org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat<java.util.Map<java.lang.String,java.nio.ByteBuffer>,java.util.Map<java.lang.String,java.nio.ByteBuffer>> { 
+0

Вы пробовали 'java.util.Map.class' вместо' java.util.Map .class'? –

+0

Да, наверное, я должен был добавить это. Спасибо, я отправлю сообщение об ошибке. – martingms

ответ

2

Наконец я получил это решен после того, как много борьбы. Проблема newHadoopAPI требует класса, который расширяет org.apache.hadoop.mapreduce.InputFormat и org.apache.cassandra.hadoop.cql3.CqlInputFormat не расширяет InputFormat напрямую, вместо этого он расширяет org.apache.cassandra.hadoop.AbstractColumnFamilyInputFormat, который в turn extends InputFormat.

Eclipse использует компилятор groovy, который достаточно умен, чтобы разрешить это, но компилятор Java по умолчанию не может решить эту проблему. Также компилятор Groovy корректно определяет значения K, V, которые java-компилятор считает несовместимыми.

Вам необходимо добавить следующие изменения в файл pom.xml использовать заводной компилятор:

<properties> 
    <groovy-version>1.8.6</groovy-version> 
    <maven-comipler-plugin-version>2.5.1</maven-comipler-plugin-version> 
    <groovy-eclipse-compiler-version>2.7.0-01</groovy-eclipse-compiler-version> 
    <maven-clover2-plugin-version>3.1.7</maven-clover2-plugin-version> 
    <groovy-eclipse-batch-version>1.8.6-01</groovy-eclipse-batch-version> 
</properties> 
  1. Добавить заводной как зависимость

    <dependencies> 
        <dependency> 
         <groupId>org.codehaus.groovy</groupId> 
         <artifactId>groovy-all</artifactId> 
         <version>${groovy-version}</version> 
        </dependency> 
    <dependencies> 
    
  2. Добавить grovvy плагин под билда использовать это как компилятор для нашего кода

    <build> 
        <pluginManagement> 
         <plugins> 
         <plugin> 
          <groupId>org.apache.maven.plugins</groupId> 
          <artifactId>maven-compiler-plugin</artifactId> 
          <version>${maven-comipler-plugin-version}</version> 
          <configuration> 
           <!-- Bind Groovy Eclipse Compiler --> 
           <compilerId>groovy-eclipse-compiler</compilerId> 
           <source>${jdk-version}</source> 
           <target>${jdk-version}</target> 
          </configuration> 
          <dependencies> 
           <!-- Define which Groovy version will be used for build (default is 
            2.0) --> 
           <dependency> 
            <groupId>org.codehaus.groovy</groupId> 
            <artifactId>groovy-eclipse-batch</artifactId> 
            <version>${groovy-eclipse-batch-version}</version> 
           </dependency> 
           <!-- Define dependency to Groovy Eclipse Compiler (as it's referred 
            in compilerId) --> 
           <dependency> 
            <groupId>org.codehaus.groovy</groupId> 
            <artifactId>groovy-eclipse-compiler</artifactId> 
            <version>${groovy-eclipse-compiler-version}</version> 
           </dependency> 
          </dependencies> 
         </plugin> 
         <!-- Define Groovy Eclipse Compiler again and set extensions=true. Thanks 
          to this, plugin will --> 
         <!-- enhance default build life cycle with an extra phase which adds 
          additional Groovy source folders --> 
         <!-- It works fine under Maven 3.x, but we've encountered problems with 
          Maven 2.x --> 
         <plugin> 
          <groupId>org.codehaus.groovy</groupId> 
          <artifactId>groovy-eclipse-compiler</artifactId> 
          <version>${groovy-eclipse-compiler-version}</version> 
          <extensions>true</extensions> 
         </plugin> 
         <!-- Configure Clover for Maven plug-in. Please note that it's not bound 
          to any execution phase, --> 
         <!-- so you'll have to call Clover goals from command line. --> 
         <plugin> 
          <groupId>com.atlassian.maven.plugins</groupId> 
          <artifactId>maven-clover2-plugin</artifactId> 
          <version>${maven-clover2-plugin-version}</version> 
          <configuration> 
           <generateHtml>true</generateHtml> 
           <historyDir>.cloverhistory</historyDir> 
          </configuration> 
         </plugin> 
         </plugins> 
        </pluginManagement> 
    </build> 
    

Это должно решить проблему.

+1

Ницца! Я закончил использование собственной Datastax [cassandra-driver-spark] (https: // github.com/datastax/cassandra-driver-spark) вместо этого (из Scala и обертывания его на Java), который имеет гораздо более удобный API (с поддержкой Java очень скоро). Рад видеть, что ты понял это. – martingms

+0

Привет, У меня возникла проблема, когда я использую JavaSparkContext.newAPIHadoopRDD (conf, XXInputFormat.class, NullWritable.class, Map.class), в котором я использовал XXInputFormat, который напрямую расширяет mapreduce.InputFormat и использует Map как параметры InputFormat. – tobe

+0

Ошибка: \t sc.newAPIHadoopRDD (conf, TestInputFormat.class, NullWritable.class, Map.class); \t ^^^^^^^^^^^^^^^ Связанного несоответствие: Общий метод newAPIHadoopRDD (конфигурация, класс , класс , класс ) типа JavaSparkContext не применяется для аргументов (конфигурация, класс , класс , класс ). Выведенный тип TestInputFormat не является допустимым заменой ограниченного параметра > – tobe