2017-01-20 13 views
0

У меня есть прецедент, в котором PCollection содержит пары значений ключа, причем ключ является идентификатором пользователя, а значение является меткой времени, с которой пользователь взаимодействовал с приложением.Top.smallestPerKey() не работает для сортировки timestamp

В цели ETL я хочу создать PCollection, который содержит пары значений ключа, где ключ является идентификатором пользователя, а значение - это метка времени, с которой пользователь впервые взаимодействовал с приложением.

Я использую преобразование Top.smallestPerKey() для получения PCollection уникальных идентификаторов пользователей и самой ранней отметки времени.

фрагмент кода заключается в следующем -

PCollection<KV<String, Timestamp>> keyedUserAndTimestamp = 
    a.apply(ParDo.named("Getting minimum timestamp for a user.").of(
     new DoFn<TableRow, KV<String, Long>>(){ 
      @Override 
      public void processElement(ProcessContext c) { 
      c.output(KV.of(
       c.element().get("user_id").toString(), 
       Timestamp.valueOf(c.element().get("time_stamp").toString()))); 
      } 
     })); 

PCollection<KV<String, List<Timestamp>>> minTimestampPerUser = 
    keyedFromUserAndTimestamp.apply(Top.smallestPerKey(1)); 

Это, кажется, не будет работать для меня. Я получаю следующее сообщение об ошибке -

The method apply(PTransform<? super PCollection<KV<String,Long>>,OutputT>) 
in the type PCollection<KV<String,Long>> 
is not applicable for the arguments 
(PTransform< 
    PCollection<KV<Object,Comparable<Comparable<V>>>>, 
    PCollection<KV<Object,List<Comparable<Comparable<V>>>>>>) 

Я новичок в Google облачных потоков данных и Java, так что я мог бы также отсутствовать на что-то очень очевидно.

Пара вопросов, которые я хотел бы понять общины на -

  1. Является ли это правильный способ нахождения минимального значения временной метки?
  2. Я использую правильное преобразование? Если нет, то какая здесь была бы лучшая практика?

ответ

0

Это ошибка типа Java, не особенно специфичная для Dataflow. В этом случае я считаю, что вас укушают ограничения вывода типа Java, и вы должны предоставить явные параметры типа Top.smallestPerKey, в частности Top.<String, Long>smallestPerKey(1).

Поскольку вы упоминаете, что вы новичок в Java, я расскажу подробнее. Подпись Top.smallestPerKey является:

public static <K, V extends Comparable<V>> 
    PTransform<PCollection<KV<K, V>>, PCollection<KV<K, List<V>>>> 
    smallestPerKey(int count) 

где K и V являются типы определяются при вызове функции. Java не смогла сделать вывод, что K и V должны быть для вас. В этой ситуации, Java падает обратно на верхние пределы на типы:

  • K не имеет верхнего предела, поэтому выбирается Object.
  • V имеет верхнюю грань Comparable<V> (несмотря на кажущуюся самоссылки, это вполне разумно и общая вещь), поэтому Java выбирает Comparable<V>, даже если V даже не является допустимым типом в контексте, в котором ваше сообщение об ошибке имеет место.
+0

Спасибо. Это действительно помогло! Любой комментарий о том, является ли это правильным преобразованием для использования? –

+0

Да, это идеальное приложение преобразования. –

 Смежные вопросы

  • Нет связанных вопросов^_^