2016-12-08 6 views
2

Я хотел бы сделать декартовую продукцию из двух PCollections. Ни PCollection не может поместиться в память, так что боковой вход невозможен.Как сделать декартовое произведение из двух PCollections в потоке данных?

Моя цель такова: у меня есть два набора данных. Один из них - много элементов небольшого размера. Другой - немного (~ 10) очень больших размеров. Я хотел бы взять продукт этих двух элементов, а затем создать объекты с ключом.

ответ

2

Я думаю CoGroupByKey может работать в вашей ситуации:

https://cloud.google.com/dataflow/model/group-by-key#join

Вот что я сделал для подобного сценария использования. Хотя мой, вероятно, не были ограничены в памяти (вы пробовали больший кластер с более крупными машинами?):

PCollection<KV<String, TableRow>> inputClassifiedKeyed = inputClassified 
      .apply(ParDo.named("Actuals : Keys").of(new ActualsRowToKeyedRow())); 

    PCollection<KV<String, Iterable<Map<String, String>>>> groupedCategories = p 
    [...] 
    .apply(GroupByKey.create()); 

Так коллекции сконструированы таким же ключом.

Тогда я объявил Тег:

final TupleTag<Iterable<Map<String, String>>> categoryTag = new TupleTag<>(); 
final TupleTag<TableRow> actualsTag = new TupleTag<>(); 

Комбинированных их:

PCollection<KV<String, CoGbkResult>> actualCategoriesCombined = 
      KeyedPCollectionTuple.of(actualsTag, inputClassifiedKeyed) 
        .and(categoryTag, groupedCategories) 
        .apply(CoGroupByKey.create()); 

И в моем случае последнего шага - переформатирование результаты (из отмеченных групп в непрерывном потоке:

actualCategoriesCombined 
      .apply(
        ParDo.named("Actuals : Formatting") 
          .of(
            new DoFn<KV<String, CoGbkResult>, TableRow>() { 
             @Override 
             public void processElement(ProcessContext c) throws Exception { 

              KV<String, CoGbkResult> e = c.element(); 

              Iterable<TableRow> actualTableRows = e.getValue().getAll(actualsTag); 
              Iterable<Iterable<Map<String, String>>> categoriesAll = e.getValue().getAll(categoryTag); 

              for (TableRow row : actualTableRows) { 

               // Some of the actuals do not have categories 
               if (categoriesAll.iterator().hasNext()) { 
                row.put("advertiser", categoriesAll.iterator().next()); 
               } 

               c.output(row); 
              } 
             } 
            } 
          ) 
      ) 

Надеюсь, что это поможет. Опять же - не уверен в ограничениях памяти. результаты, если вы попробуете этот подход.

+0

Из-за моей нехватки опыта как в Java, так и в потоке данных, для меня это занимает много времени. Если возможно, можете ли вы опубликовать простой пример в python? Я думаю, будет лучше расширить этот вопрос вместо меня, задав новый вопрос. – KobeJohn

+0

Для справки, я исхожу из Spark, где это очень просто: 'collection_a.cartesian (collection_b)' – KobeJohn

+0

Я бы сделал это с удовольствием, не против ли вы связаться в частном порядке? –

 Смежные вопросы

  • Нет связанных вопросов^_^