2017-02-20 20 views
0

Когда я применил ParDo.of(new ParDoFn()) к PCollection по имени textInput, программа выбрасывает это исключение. Но программа нормальная, когда я удаляю .apply(ParDo.of(new ParDoFn())).AssertionError: утверждение не выполнено: copyAndReset должен вернуть копию нулевого значения

// SparkRunner

private static void testHadoop(Pipeline pipeline){ 
    Class<? extends FileInputFormat<LongWritable, Text>> inputFormatClass = 
      (Class<? extends FileInputFormat<LongWritable, Text>>) 
        (Class<?>) TextInputFormat.class; 
    @SuppressWarnings("unchecked") //hdfs://localhost:9000 
      HadoopIO.Read.Bound<LongWritable, Text> readPTransfom_1 = HadoopIO.Read.from("hdfs://localhost:9000/tmp/kinglear.txt", 
      inputFormatClass, 
      LongWritable.class, 
      Text.class); 
    PCollection<KV<LongWritable, Text>> textInput = pipeline.apply(readPTransfom_1) 
      .setCoder(KvCoder.of(WritableCoder.of(LongWritable.class), WritableCoder.of(Text.class))); 

    //OutputFormat 
    @SuppressWarnings("unchecked") 
    Class<? extends FileOutputFormat<LongWritable, Text>> outputFormatClass = 
      (Class<? extends FileOutputFormat<LongWritable, Text>>) 
        (Class<?>) TemplatedTextOutputFormat.class; 

    @SuppressWarnings("unchecked") 
    HadoopIO.Write.Bound<LongWritable, Text> writePTransform = HadoopIO.Write.to("hdfs://localhost:9000/tmp/output", outputFormatClass, LongWritable.class, Text.class); 

    textInput.apply(ParDo.of(new ParDoFn())).apply(writePTransform.withoutSharding()); 

    pipeline.run().waitUntilFinish(); 

} 
+0

Можете ли вы включить в свой вопрос полную трассировку стека исключений? Это поможет сузить проблему точно. Кроме того, вы можете попытаться следовать стилю в примерах Apache Beam - используемые вами трансформации используются один раз; вы можете захотеть встроить их, и ваш код будет намного читабельнее. –

ответ

3

Какие Спарк версии вы работаете на вершине? По моему опыту ошибка, которую вы получаете, выбрасывается Spark 2.x AccumulatorV2, Spark runner в настоящее время поддерживает Spark 1.6.

+0

Вы правы! – zifanpan

+0

Я уже решил проблему с Spark 1.6. – zifanpan

+0

@zifanpan Вы можете объяснить, как вы это исправили. У меня такая же версия зависимости, как вы предложили, то есть 1.6.3, я не могу это исправить. пожалуйста, предложите – Abhishek

1

У меня возникла аналогичная проблема, когда я создал пользовательский аккумулятор, который расширяет org.apache.spark.util.AccumulatorV2. Причиной была неправильная логика в методе override def isZero: Boolean. Поэтому в основном, когда вы вызываете copyAndReset, он вызывает copy(), тогда reset() ваш isZero() должен возвращать true. Если вы посмотрите на исходном коде AccumulatorV2, что где является проверка:

// Called by Java when serializing an object 
final protected def writeReplace(): Any = { 
if (atDriverSide) { 
    if (!isRegistered) { 
    throw new UnsupportedOperationException(
     "Accumulator must be registered before send to executor") 
    } 
    val copyAcc = copyAndReset() 
    assert(copyAcc.isZero, "copyAndReset must return a zero value copy") 
    copyAcc.metadata = metadata 
    copyAcc 
} else { 
    this 
} 
} 

конкретно этой часть

val copyAcc = copyAndReset() 
assert(copyAcc.isZero, "copyAndReset must return a zero value copy") 

Надеется, что это помогает. Счастливые искры!