2015-04-07 5 views
2

Я написал потоковый конвейер, используя Google Cloud Dataflow SDK, но хочу проверить свои конвейеры локально. Мой конвейер принимает входные данные из Google Pub/Sub.Google Cloud Dataflow: доступ к Google Cloud Pub/Sub в конвейере с помощью DirectPipelineRunner (локальное задание)?

Возможно ли запустить задания, которые получают доступ к Pub/Sub (pubsubIO) с помощью DirectPipelineRunner (локальное выполнение, а не в Google Cloud)?

Я столкнулся с проблемами с правами доступа, когда вы вошли в систему как обычная учетная запись пользователя. Я являюсь владельцем проекта с пабом/подтемой, к которой я пытаюсь получить доступ.

+0

Под «попыткой доступа», что вы сделали точно? Каждая операция должна работать, только если вы являетесь владельцем проекта. –

ответ

3

PubsubIO в настоящее время не поддерживается в DirectPipelineRunner. При использовании локально вы получите сообщение об ошибке «Нет оценщика, зарегистрированного для PubsubIO.Read».

Вероятно, что ваши разрешения возникают из какого-то другого источника.

+0

Кстати, есть ли веская причина не поддерживать его? Думаю, это не очень сложно поддержать. –

+0

См. Более новый ответ ниже - InProcessPipelineRunner - это новая версия DirectPipelineRunner, которая может обрабатывать неограниченные параметры PCollections. – Frances

-1

Это действительно возможно, но DirectPipelineRunner не поддерживает неограниченные источники данных. Поэтому вы должны установить maxReadTime или maxNumRecords так:

PubsubIO.Read.topic("projects/<project-id>/topics/<topic>").maxNumRecords(1000); 

От PubSub documentation:

PTransform, который непрерывно считывает данные из облака Pub/Sub потока и возвращает PCollection строк, содержащих элементы из потока. При работе с PipelineRunner, который поддерживает только ограниченные PCllections (например, DirectPipelineRunner), только ограниченная часть может обрабатываться входной поток Pub/Sub. Таким образом, либо PubsubIO.Read.Bound.maxNumRecords (int), либо PubsubIO.Read.Bound.maxReadTime (Duration) должен быть установлен.

3

InProcessPipelineRunner новая версия DirectPipelineRunner ввести в Dataflow SDK for Java 1.6.0, которая включает поддержку неограниченных PCollections.

(Примечание. В Apache Beam эта функциональность уже добавлена ​​в DirectRunner, но в SDK Dataflow для Java мы не можем этого сделать до 2.0, поскольку лучшая проверка модели может привести к дополнительным ошибкам тестирования, который мы рассматриваем обратно несовместимым изменением. Следовательно, добавление компаньона InProcessPipelineRunner пока).

Существует также отличное новое support для тестирования данных в конце и вне порядка.

0

Просто, чтобы помочь любому, кто будет искать это,

С последней версии, вы можете сделать это. Если вы хотите запустить конвейер локально, используйте «DirectRunner», чтобы запустить его на локальном. Используйте «DataflowRunner» для запуска этого в облаке.

установить место размещения и бегун, как показано ниже.

streamingOption.setStagingLocation(PipelineConstants.PUBSUB_STAGING_LOCATION); 

streamingOption.setRunner(DataflowRunner.class); 

или передать его в качестве аргументов.

Можете ли вы подробнее рассказать о проблеме разрешения, с которой вы столкнулись?

 Смежные вопросы

  • Нет связанных вопросов^_^