У меня есть прецедент, в котором PCollection содержит пары значений ключа, причем ключ является идентификатором пользователя, а значение является меткой времени, с которой пользователь взаимодействовал с приложением.Top.smallestPerKey() не работает для сортировки timestamp
В цели ETL я хочу создать PCollection, который содержит пары значений ключа, где ключ является идентификатором пользователя, а значение - это метка времени, с которой пользователь впервые взаимодействовал с приложением.
Я использую преобразование Top.smallestPerKey() для получения PCollection уникальных идентификаторов пользователей и самой ранней отметки времени.
фрагмент кода заключается в следующем -
PCollection<KV<String, Timestamp>> keyedUserAndTimestamp =
a.apply(ParDo.named("Getting minimum timestamp for a user.").of(
new DoFn<TableRow, KV<String, Long>>(){
@Override
public void processElement(ProcessContext c) {
c.output(KV.of(
c.element().get("user_id").toString(),
Timestamp.valueOf(c.element().get("time_stamp").toString())));
}
}));
PCollection<KV<String, List<Timestamp>>> minTimestampPerUser =
keyedFromUserAndTimestamp.apply(Top.smallestPerKey(1));
Это, кажется, не будет работать для меня. Я получаю следующее сообщение об ошибке -
The method apply(PTransform<? super PCollection<KV<String,Long>>,OutputT>)
in the type PCollection<KV<String,Long>>
is not applicable for the arguments
(PTransform<
PCollection<KV<Object,Comparable<Comparable<V>>>>,
PCollection<KV<Object,List<Comparable<Comparable<V>>>>>>)
Я новичок в Google облачных потоков данных и Java, так что я мог бы также отсутствовать на что-то очень очевидно.
Пара вопросов, которые я хотел бы понять общины на -
- Является ли это правильный способ нахождения минимального значения временной метки?
- Я использую правильное преобразование? Если нет, то какая здесь была бы лучшая практика?
Спасибо. Это действительно помогло! Любой комментарий о том, является ли это правильным преобразованием для использования? –
Да, это идеальное приложение преобразования. –