Я пытаюсь выполнить задание конвейера потока данных, которое будет выполнять одну функцию на N записей одновременно из хранилища данных. В моем случае эта функция отправляет пакет из 100 записей в некоторую службу REST в качестве полезной нагрузки. Это означает, что я хочу пройти через все записи из одного объекта хранилища данных и отправить 100 собранных записей сразу в некоторые службы REST.Может ли вход данных хранилища данных в конвейере потока данных Google обрабатываться в партии из N записей за раз?
Мое текущее решение
- Читайте вход от датасторе
- Создать столько ключей, сколько рабочих, указанных в параметрах трубопровода (1 работник = 1 ключ).
- Группа по ключу, так что мы получаем итератор в качестве выходного (вход итератора на шаге 4)
- Программно пакетные пользователи во временном списке и отправляют их в виде партии в конечную точку REST.
выше описанного сценария в псевдокоде (без учета деталей):
final int BATCH_SIZE = 100;
// 1. Read input from datastore
pipeline.apply(DatastoreIO.readFrom(datasetId, query))
// 2. create keys to be used in group by so we get iterator in next task
.apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
String key = generateKey(c);
EntryPOJO entry = processEntity(c);
c.output(KV.of(key, entry));
}
}))
// 3. Group by key
.apply(GroupByKey.create())
// 4. Programatically batch users
.apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() {
@Override
public void processElement(ProcessContext c) throws Exception {
List<EntryPOJO> batchedEntries = new ArrayList<>();
for (EntryPOJO entry : c.element().getValue()) {
if (batchedEntries.size() >= BATCH_SIZE) {
sendToRESTEndpoint(batchedEntries);
batchedEntries = new ArrayList<>();
}
batchedEntries.add(entry);
}
sendToRESTEndpoint(batchedEntries);
}
}));
Главная проблема с моим текущим решением
GroupByKey блокирует выполнение последнего Пардо (блоки шаг № 4) пока все записи не будут присвоены ключу.
Решение в целом работает, но я хотел бы сделать все параллельно (отправить партию из 100 записей в конечной точке REST сразу после того, как они загружаются из хранилища данных), что невозможно с моим текущим решением, так как GroupByKey Безразлично 't выводить любые данные, пока каждая запись из базы данных не будет извлечена и не вставлена в пару ключ-значение. Так выполнение фактически в 2 этапа: 1. Извлечь все данные из хранилища данных и присвоить ему ключ, 2. Записи процесса, как партии
Вопрос
Так что я хотел бы знать, если есть некоторые существующие функции, чтобы иметь возможность сделать это. Или, по крайней мере, чтобы получить Iterable без шага GroupByKey, чтобы задача функции пакетной обработки не нуждалась в ожидании удаления данных.
Только то, что я искал, решило мою проблему. Спасибо за быстрый ответ. Я пытался реализовать его таким образом сначала, но я упустил метод «finishBundle», что имеет решающее значение, если вы хотите обработать элементы, оставшиеся в аккумуляторе. –
Вы также должны сбросить список в файле finishBundle (или, по крайней мере, startBundle), поскольку DoFns можно повторно использовать (см. Https://issues.apache.org/jira/browse/BEAM-38) – robertwb