0

Я использую Apache's Beam sdk version 0.2.0-incubating-SNAPSHOT и пытаюсь вытащить данные на большой столбец с помощью бегуна Dataflow. К сожалению, я получаю NullPointerException при выполнении моего потока данных, где я использую BigTableIO.Write в качестве своей раковины. Уже проверено мое BigtableOptions и параметры в порядке, в соответствии с моими потребностями.Ошибка NullPointerException при записи в BigTable с использованием потока данных Apache Beam sdk

В основном, я создаю и в какой-то момент моего трубопровода у меня есть шаг, чтобы написать PCollection<KV<ByteString, Iterable<Mutation>>> к моему желаемому Bigtable:

final BigtableOptions.Builder optionsBuilder = 
    new BigtableOptions.Builder().setProjectId(System.getProperty("PROJECT_ID")) 
     .setInstanceId(System.getProperty("BT_INSTANCE_ID")); 

// do intermediary steps and create PCollection<KV<ByteString, Iterable<Mutation>>> 
// to write to bigtable 

// modifiedHits is a PCollection<KV<ByteString, Iterable<Mutation>>> 
modifiedHits.apply("writting to big table", BigtableIO.write() 
    .withBigtableOptions(optionsBuilder).withTableId(System.getProperty("BT_TABLENAME"))); 

p.run(); 

При выполнении трубопровода, я получил NullPointerException, указывая именно на BigtableIO класс на public void processElement(ProcessContext c) методе:

(6e0ccd8407eed08b): java.lang.NullPointerException at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.processElement(BigtableIO.java:532) 

Я проверил этот метод обработки всех элементов, прежде чем писать на Bigtable, но не знает, почему я получаю такое исключение сверхурочного я выполняю этот pipelin е. Согласно приведенному ниже коду, этот метод использует атрибут bigtableWriter для обработки каждого c.element(), но я даже не могу установить точку останова для отладки, где именно находится null. Какой-нибудь совет или предложение о том, как решить эту проблему?

@ProcessElement 
    public void processElement(ProcessContext c) throws Exception { 
    checkForFailures(); 
    Futures.addCallback(
     bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element())); 
    ++recordsWritten; 
    } 

Спасибо.

+0

Не могли бы вы пояснить несколько вещей: 1) Какую версию SDK вы используете? 2) Какой бегун вы используете? (прямой бегун, Spark, Flink, Dataflow?) Если это Dataflow, вы можете указать идентификатор задания? – jkff

+0

@jkff благодарит за комментарий. Да, только что отредактировал мой вопрос, включая версии. Итак, да, я использую бегун Dataflow. Его идентификатор работы - 2016-09-13_08_29_14-14276852956124203982 –

+0

Я искал работу и ее путь к классу, и если я не ошибаюсь, похоже, что вы используете версию 0.3.0-инкубацию-SNAPSHOT из beam-sdks-java- {core, io}, но версия 0.2.0-incubating-SNAPSHOT из google-cloud-dataflow-java. Я считаю, что из-за этого проблема - вы должны использовать ту же версию (более подробная информация: BigtableIO в версии 0.3.0 использует методы \ @Setup и \ @Teardown, но runner 0.2.0 еще не поддерживает их). – jkff

ответ

2

Я посмотрел на работу и свой путь к классам, и если я не ошибаюсь, это выглядит, как вы используете версию 0.3.0-incubating-SNAPSHOT из beam-sdks-java-{core,io}, но version 0.2.0-incubating-SNAPSHOT из google-cloud-dataflow-java.

Я считаю, что проблема из-за этого - вы должны использовать ту же версию (более подробную информацию: BigtableIO в версии 0.3.0 использует @Setup и @Teardown методы, но бегун 0.2.0 не поддерживает их еще нет).

 Смежные вопросы

  • Нет связанных вопросов^_^