Я хочу запустить задание потока данных на нескольких входах из облачного хранилища Google, но пути, которые я хочу передать в задание, не могут указываться только с помощью оператора glob *
.Как указать несколько путей ввода к заданию потока данных
Рассмотрим эти пути:
gs://bucket/some/path/20160208/input1
gs://bucket/some/path/20160208/input2
gs://bucket/some/path/20160209/input1
gs://bucket/some/path/20160209/input2
gs://bucket/some/path/20160210/input1
gs://bucket/some/path/20160210/input2
gs://bucket/some/path/20160211/input1
gs://bucket/some/path/20160211/input2
gs://bucket/some/path/20160212/input1
gs://bucket/some/path/20160212/input2
Я хочу, чтобы моя работа, чтобы работать над файлами в 20160209
, 20160210
и 20160211
каталогов, но не на 20160208
(первый) и 20160212
(последний). На самом деле существует намного больше дат, и я хочу иметь возможность указать произвольный диапазон дат для моей работы.
The docs for TextIO.Read
говорят:
Стандартный Java Filesystem GLOB модели ("*", "[..]" "?") Поддерживаются.
Но я не могу заставить это работать. Есть ссылка на Java Filesystem glob patterns, которая, в свою очередь, ссылается на getPathMatcher(String), в которой перечислены все варианты подстановки. Один из них - {a,b,c}
, который выглядит точно так же, как мне нужно, если я передаю gs://bucket/some/path/201602{09,10,11}/*
в TextIO.Read#from
. Я получаю «Невозможно развернуть шаблон файла».
Может быть, документы означают, что только*
, ?
и […]
поддерживаются, и если это так, то как я могу построить Glob, что Dataflow будет принимать и что может соответствовать произвольный диапазон дат, как один я описываю выше?
Update: Я понял, что я могу написать кусок кода, так что я могу передать в пути префиксы, через запятую, создать входной сигнал от каждого и использовать
Это просто вызвало тонну других проблем, я постараюсь заставить его работать, но он чувствует себя как тупик из-за первоначальной перезаписи.Flatten
преобразования, но это похоже на очень неэффективный способ сделать это. Похоже, что первый шаг читает все входные файлы и сразу же записывает их снова во временное место на GCS. Только когда все входы были прочитаны и записаны, начинается фактическая обработка. Этот шаг совершенно не нужен в работе, которую я пишу. Я хочу, чтобы задание читало первый файл, начинал его обрабатывать и читал дальше, и так далее.
для опции 3 ... Можете ли вы помочь мне с примером о том, как поместить список файлов в PCollection, а затем в ParDo? – CCC