2016-11-15 10 views
1

Flink трубопровода заключается в следующем:Использование Grok в Флинка потокового

  1. читать сообщения (строка) от Кафки темы.
  2. шаблон соответствия через grok преобразования в формате json.
  3. Агрегации над временным окном над извлеченным полем из json.

Ниже приведен код для сопоставления образцов с использованием grok.

SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance() 
        .map(new MapFunction<String, JSONObject>() {  
         private static final long serialVersionUID = 6; 

         @Override 
         public JSONObject map(String value) throws Exception { 
          JSONObject logJson = new JSONObject(); 
          grok.compile(pattern); //pattern is some pattern defined in the class 
          Match gm = grok.match(value); 
          gm.captures(); 
          logJson.putAll(gm.toMap()); 
          return logJson; 
         }}) 

В приведенном выше коде писать grok.compile(pattern) внутри функции карты работает отлично. Не делать это дает следующую ошибку

Реализация этого MapFunction не сериализуемым

Вызванный: java.io.NotSerializableException: com.google.code.regexp.Pattern

ли там любой способ, которым я мог бы удалить grok.compile вне карты. По моему мнению, компиляция шаблона с каждым сообщением не требуется и может создать узкое место, если нет. сообщений становится довольно большим.

PS: Я импортировал пакет oi.thekraken.grok.api.Grok

EDIT:

Я просмотрел реализации ГРОК и класс Grok реализует Serializable. https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/Grok.java

ответ

0

Ваш код не отображается, когда локальная переменная Grok приходит, но:

Flink требует от всех операторов быть Serializable, потому что они могут быть перемещены в кластере. Это также справедливо для всех членов операторов. Можете ли вы опубликовать полный нерабочий пример? Это может облегчить просмотр, где может завершиться сериализация.

Более подробной информации о FLiNK сериализации можно ound в документации FLiNK на https://flink.apache.org/faq.html#why-am-i-getting-a-nonserializableexception- и https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html

В принципе, вы можете зарегистрировать Kryo сериалайзер для пользовательских типов или реализаций (де-) сериализаций себя, если вам нужны члены оператора, что не являются непосредственно сериализуемыми.

Btw .: Я думаю, что вы правы в попытке уменьшить количество раз шаблон скомпилированных

+0

'В принципе, вы можете зарегистрировать Kryo сериалайзер для пользовательских типов или реализации (де-) Сериализация себя, если вам нужно членов оператора, которые не являются непосредственно сериализуемыми. '--- Я немного запутался в том, как сделать то же самое. – user3351750