Я думаю CoGroupByKey может работать в вашей ситуации:
https://cloud.google.com/dataflow/model/group-by-key#join
Вот что я сделал для подобного сценария использования. Хотя мой, вероятно, не были ограничены в памяти (вы пробовали больший кластер с более крупными машинами?):
PCollection<KV<String, TableRow>> inputClassifiedKeyed = inputClassified
.apply(ParDo.named("Actuals : Keys").of(new ActualsRowToKeyedRow()));
PCollection<KV<String, Iterable<Map<String, String>>>> groupedCategories = p
[...]
.apply(GroupByKey.create());
Так коллекции сконструированы таким же ключом.
Тогда я объявил Тег:
final TupleTag<Iterable<Map<String, String>>> categoryTag = new TupleTag<>();
final TupleTag<TableRow> actualsTag = new TupleTag<>();
Комбинированных их:
PCollection<KV<String, CoGbkResult>> actualCategoriesCombined =
KeyedPCollectionTuple.of(actualsTag, inputClassifiedKeyed)
.and(categoryTag, groupedCategories)
.apply(CoGroupByKey.create());
И в моем случае последнего шага - переформатирование результаты (из отмеченных групп в непрерывном потоке:
actualCategoriesCombined
.apply(
ParDo.named("Actuals : Formatting")
.of(
new DoFn<KV<String, CoGbkResult>, TableRow>() {
@Override
public void processElement(ProcessContext c) throws Exception {
KV<String, CoGbkResult> e = c.element();
Iterable<TableRow> actualTableRows = e.getValue().getAll(actualsTag);
Iterable<Iterable<Map<String, String>>> categoriesAll = e.getValue().getAll(categoryTag);
for (TableRow row : actualTableRows) {
// Some of the actuals do not have categories
if (categoriesAll.iterator().hasNext()) {
row.put("advertiser", categoriesAll.iterator().next());
}
c.output(row);
}
}
}
)
)
Надеюсь, что это поможет. Опять же - не уверен в ограничениях памяти. результаты, если вы попробуете этот подход.
Из-за моей нехватки опыта как в Java, так и в потоке данных, для меня это занимает много времени. Если возможно, можете ли вы опубликовать простой пример в python? Я думаю, будет лучше расширить этот вопрос вместо меня, задав новый вопрос. – KobeJohn
Для справки, я исхожу из Spark, где это очень просто: 'collection_a.cartesian (collection_b)' – KobeJohn
Я бы сделал это с удовольствием, не против ли вы связаться в частном порядке? –