2016-07-03 4 views
0

Я пытаюсь запустить конвейер в Google Cloud DataFlow в режиме «Потоковая передача». Конвейер должен читать из темы PubSub, однако на самом деле он не читается из темы до тех пор, пока я не удалю ее, не заново создаю и не опубликую все сообщения в теме ПОСЛЕ начала работы конвейера.Google Cloud DataFlow PubSubIO не читает из полной темы

Есть ли какой-либо способ, чтобы трубопровод читал уже опубликованные сообщения?

ответ

1

Создайте пользовательскую подписку в pub sub с помощью облачной консоли. В коде попробуйте что-то вроде этого.

PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device iot data from PubSub") 

      .subscription(String.format("projects/%s/subscriptions/%s",<ProjectId>,<Subscriptionname>)) 

      .timestampLabel("ts") 
      .withCoder(TableRowJsonCoder.of())); 

Обратите внимание, что при подписании подписки вы можете подписаться на тему или название подписки.

В приведенном выше коде я подписался на подписку, которую я создал явно в паб-консоли. Преимущество перехода на явную подписку заключается в том, что оно хранит данные, вытащенные из паба, даже когда ваш код потока данных отключен. Такие данные не будут потеряны.

+0

большой. это было. благодаря – Forepick

1

Похоже, что предоставление подписки на Pub/Sub (более подробная информация в Pub/Sub I/O documentation) решит вашу проблему. Сообщения будут буферизованы после создания подписки, что позволяет читать их при запуске конвейера.