3

Я пытаюсь выполнить задание конвейера потока данных, которое будет выполнять одну функцию на N записей одновременно из хранилища данных. В моем случае эта функция отправляет пакет из 100 записей в некоторую службу REST в качестве полезной нагрузки. Это означает, что я хочу пройти через все записи из одного объекта хранилища данных и отправить 100 собранных записей сразу в некоторые службы REST.Может ли вход данных хранилища данных в конвейере потока данных Google обрабатываться в партии из N записей за раз?

Мое текущее решение

  1. Читайте вход от датасторе
  2. Создать столько ключей, сколько рабочих, указанных в параметрах трубопровода (1 работник = 1 ключ).
  3. Группа по ключу, так что мы получаем итератор в качестве выходного (вход итератора на шаге 4)
  4. Программно пакетные пользователи во временном списке и отправляют их в виде партии в конечную точку REST.

выше описанного сценария в псевдокоде (без учета деталей):

final int BATCH_SIZE = 100; 

// 1. Read input from datastore 
pipeline.apply(DatastoreIO.readFrom(datasetId, query)) 

    // 2. create keys to be used in group by so we get iterator in next task 
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, KV<String, EntryPOJO>>() { 
     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      String key = generateKey(c); 
      EntryPOJO entry = processEntity(c); 
      c.output(KV.of(key, entry)); 
     } 
    })) 

    // 3. Group by key 
    .apply(GroupByKey.create()) 

    // 4. Programatically batch users 
    .apply(ParDo.of(new DoFn<KV<String, Iterable<EntryPOJO>>() { 
     @Override 
     public void processElement(ProcessContext c) throws Exception { 
      List<EntryPOJO> batchedEntries = new ArrayList<>(); 
      for (EntryPOJO entry : c.element().getValue()) { 
       if (batchedEntries.size() >= BATCH_SIZE) { 
        sendToRESTEndpoint(batchedEntries); 
        batchedEntries = new ArrayList<>(); 
       } 
       batchedEntries.add(entry); 
      } 
      sendToRESTEndpoint(batchedEntries); 
     } 
    })); 

Главная проблема с моим текущим решением

GroupByKey блокирует выполнение последнего Пардо (блоки шаг № 4) пока все записи не будут присвоены ключу.

Решение в целом работает, но я хотел бы сделать все параллельно (отправить партию из 100 записей в конечной точке REST сразу после того, как они загружаются из хранилища данных), что невозможно с моим текущим решением, так как GroupByKey Безразлично 't выводить любые данные, пока каждая запись из базы данных не будет извлечена и не вставлена ​​в пару ключ-значение. Так выполнение фактически в 2 этапа: 1. Извлечь все данные из хранилища данных и присвоить ему ключ, 2. Записи процесса, как партии

Вопрос

Так что я хотел бы знать, если есть некоторые существующие функции, чтобы иметь возможность сделать это. Или, по крайней мере, чтобы получить Iterable без шага GroupByKey, чтобы задача функции пакетной обработки не нуждалась в ожидании удаления данных.

ответ

4

Эти элементы можно выполнить в пределах вашего DoFn. Например:

final int BATCH_SIZE = 100; 

pipeline 
    // 1. Read input from datastore 
    .apply(DatastoreIO.readFrom(datasetId, query)) 

    // 2. Programatically batch users 
    .apply(ParDo.of(new DoFn<DatastoreV1.Entity, Iterable<EntryPOJO>>() { 

    private final List<EntryPOJO> accumulator = new ArrayList<>(BATCH_SIZE); 

    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     EntryPOJO entry = processEntity(c); 
     accumulator.add(c); 
     if (accumulator.size() >= BATCH_SIZE) { 
     c.output(accumulator); 
     accumulator = new ArrayList<>(BATCH_SIZE); 
     } 
    } 

    @Override 
    public void finishBundle(Context c) throws Exception { 
     if (accumulator.size() > 0) { 
     c.output(accumulator); 
     } 
    } 
    }); 

    // 3. Consume those bundles 
    .apply(ParDo.of(new DoFn<Iterable<EntryPOJO>, Object>() { 
    @Override 
    public void processElement(ProcessContext c) throws Exception { 
     sendToRESTEndpoint(batchedEntries); 
    } 
    })); 

Вы также могли бы объединить шаги 2 и 3 в одном DoFn, если вы не хотите, отдельный «пакетирования» шаг.

+0

Только то, что я искал, решило мою проблему. Спасибо за быстрый ответ. Я пытался реализовать его таким образом сначала, но я упустил метод «finishBundle», что имеет решающее значение, если вы хотите обработать элементы, оставшиеся в аккумуляторе. –

+0

Вы также должны сбросить список в файле finishBundle (или, по крайней мере, startBundle), поскольку DoFns можно повторно использовать (см. Https://issues.apache.org/jira/browse/BEAM-38) – robertwb