Я использую Apache's Beam
sdk version 0.2.0-incubating-SNAPSHOT
и пытаюсь вытащить данные на большой столбец с помощью бегуна Dataflow
. К сожалению, я получаю NullPointerException
при выполнении моего потока данных, где я использую BigTableIO.Write
в качестве своей раковины. Уже проверено мое BigtableOptions
и параметры в порядке, в соответствии с моими потребностями.Ошибка NullPointerException при записи в BigTable с использованием потока данных Apache Beam sdk
В основном, я создаю и в какой-то момент моего трубопровода у меня есть шаг, чтобы написать PCollection<KV<ByteString, Iterable<Mutation>>>
к моему желаемому Bigtable:
final BigtableOptions.Builder optionsBuilder =
new BigtableOptions.Builder().setProjectId(System.getProperty("PROJECT_ID"))
.setInstanceId(System.getProperty("BT_INSTANCE_ID"));
// do intermediary steps and create PCollection<KV<ByteString, Iterable<Mutation>>>
// to write to bigtable
// modifiedHits is a PCollection<KV<ByteString, Iterable<Mutation>>>
modifiedHits.apply("writting to big table", BigtableIO.write()
.withBigtableOptions(optionsBuilder).withTableId(System.getProperty("BT_TABLENAME")));
p.run();
При выполнении трубопровода, я получил NullPointerException
, указывая именно на BigtableIO класс на public void processElement(ProcessContext c)
методе:
(6e0ccd8407eed08b): java.lang.NullPointerException at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.processElement(BigtableIO.java:532)
Я проверил этот метод обработки всех элементов, прежде чем писать на Bigtable, но не знает, почему я получаю такое исключение сверхурочного я выполняю этот pipelin е. Согласно приведенному ниже коду, этот метод использует атрибут bigtableWriter
для обработки каждого c.element()
, но я даже не могу установить точку останова для отладки, где именно находится null
. Какой-нибудь совет или предложение о том, как решить эту проблему?
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
checkForFailures();
Futures.addCallback(
bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element()));
++recordsWritten;
}
Спасибо.
Не могли бы вы пояснить несколько вещей: 1) Какую версию SDK вы используете? 2) Какой бегун вы используете? (прямой бегун, Spark, Flink, Dataflow?) Если это Dataflow, вы можете указать идентификатор задания? – jkff
@jkff благодарит за комментарий. Да, только что отредактировал мой вопрос, включая версии. Итак, да, я использую бегун Dataflow. Его идентификатор работы - 2016-09-13_08_29_14-14276852956124203982 –
Я искал работу и ее путь к классу, и если я не ошибаюсь, похоже, что вы используете версию 0.3.0-инкубацию-SNAPSHOT из beam-sdks-java- {core, io}, но версия 0.2.0-incubating-SNAPSHOT из google-cloud-dataflow-java. Я считаю, что из-за этого проблема - вы должны использовать ту же версию (более подробная информация: BigtableIO в версии 0.3.0 использует методы \ @Setup и \ @Teardown, но runner 0.2.0 еще не поддерживает их). – jkff