2015-09-08 6 views
7

При использовании Apache Flink со следующим кодом:

DataStream<List<String>> result = source.window(Time.of(1, TimeUnit.SECONDS)).mapWindow(new WindowMapFunction<String, List<String>>() { 

    @Override 
    public void mapWindow(Iterable<String> iterable, Collector<List<String>> collector) throws Exception { 
     List<String> top5 = Ordering.natural().greatestOf(iterable, 5); 
     collector.collect(top5); 
    } 
}).flatten(); 

Я получил это исключение

Caused by: java.lang.UnsupportedOperationException 
    at java.util.Collections$UnmodifiableCollection.add(Collections.java:1055) 
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116) 
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22) 
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761) 
    at org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:211) 
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:110) 
    at org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer.deserialize(StreamRecordSerializer.java:41) 
    at org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate.read(NonReusingDeserializationDelegate.java:55) 
    at org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.getNextRecord(SpillingAdaptiveSpanningRecordDeserializer.java:125) 
    at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:127) 
    at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:56) 
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:172) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581) 
    at java.lang.Thread.run(Thread.java:745) 

Как и^в UnmodifiableCollection с Флинка?

ответ

9

Проблема заключается в том, что по умолчанию CollectionSerializer компании Kryo не может десериализовать коллекцию повторно, поскольку ее невозможно модифицировать (вызов .add() не удается).

Для решения проблемы мы можем использовать UnmodifiableCollectionsSerializer от проекта kryo-serializers. Flink транзитивно зависит от проекта, поэтому нет необходимости добавлять его в качестве зависимости.

Далее, мы должны зарегистрировать сериализатор с экземплярами Floy's Kryo.

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment(); 
Class<?> unmodColl = Class.forName("java.util.Collections$UnmodifiableCollection"); 
see.getConfig().addDefaultKryoSerializer(unmodColl, UnmodifiableCollectionsSerializer.class); 

Как правило, мы не должны вызывать Class.forName() для регистрации сериалайзера, но в этом случае, java.util.Collections$UnmodifiableCollection виден пакет, поэтому мы не можем непосредственно получить доступ к классу.

+4

спасибо за этот проницательный и быстрый ответ. Ваша скорость ответа ошеломляет :-) –