Я слушаю данные из pub-sub, используя потоковые данные в потоке данных. Затем мне нужно загрузить на хранение, обработать данные и загрузить их в bigquery.Потоковая передача данных в Google Cloud Storage из PubSub с использованием Cloud Dataflow
вот мой код:
public class BotPipline {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(BlockingDataflowPipelineRunner.class);
options.setProject(MY_PROJECT);
options.setStagingLocation(MY_STAGING_LOCATION);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection<String> input = pipeline.apply(PubsubIO.Read.maxNumRecords(1).subscription(MY_SUBSCRIBTION));
input.apply(TextIO.Write.to(MY_STORAGE_LOCATION));
input
.apply(someDataProcessing(...)).named("update json"))
.apply(convertToTableRow(...)).named("convert json to table row"))
.apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
);
pipeline.run();
}
}
когда я запускаю код комментирует Сочинение на хранение код работает хорошо. , но когда я пытаюсь загрузить его в большой запрос, я получаю эту ошибку (которая, как ожидается, ..):
Write can only be applied to a Bounded PCollection
Я не использую границу, так как мне нужно, чтобы запустить все это время, и мне нужно, чтобы данные были загружены немедленно . Любое решение?
EDIT: это мое желаемое поведение:
Я получаю сообщение через PubSub. Каждое сообщение должно храниться в собственном файле в GCS в качестве грубых данных, выполнить некоторую обработку данных, а затем сохранить его в большом запросе, имея имя файла в данных.
данных следует рассматривать сразу после того, как получил в BQ Например:
data published to pubsub : {a:1, b:2}
data saved to GCS file UUID: A1F432
data processing : {a:1, b:2} ->
{a:11, b: 22} ->
{fileName: A1F432, data: {a:11, b: 22}}
data in BQ : {fileName: A1F432, data: {a:11, b: 22}}
идея о том, что обработанные данные хранятся в BQ, имеющие связь с шероховатыми данными, хранящимися в ГКС
Возможный дубликат [Может ли TextIO записывать префиксы, полученные из окна maxTimestamp?] (Https://stackoverflow.com/questions/33522178/can-textio-write-to-prefixes-derived-from-the-window- maxtimestamp) – jkff