Flink трубопровода заключается в следующем:Использование Grok в Флинка потокового
- читать сообщения (строка) от Кафки темы.
- шаблон соответствия через grok преобразования в формате json.
- Агрегации над временным окном над извлеченным полем из json.
Ниже приведен код для сопоставления образцов с использованием grok.
SingleOutputStreamOperator<JSONObject> mainStream = messageStream.rebalance()
.map(new MapFunction<String, JSONObject>() {
private static final long serialVersionUID = 6;
@Override
public JSONObject map(String value) throws Exception {
JSONObject logJson = new JSONObject();
grok.compile(pattern); //pattern is some pattern defined in the class
Match gm = grok.match(value);
gm.captures();
logJson.putAll(gm.toMap());
return logJson;
}})
В приведенном выше коде писать grok.compile(pattern)
внутри функции карты работает отлично. Не делать это дает следующую ошибку
Реализация этого MapFunction не сериализуемым
Вызванный: java.io.NotSerializableException: com.google.code.regexp.Pattern
ли там любой способ, которым я мог бы удалить grok.compile вне карты. По моему мнению, компиляция шаблона с каждым сообщением не требуется и может создать узкое место, если нет. сообщений становится довольно большим.
PS: Я импортировал пакет oi.thekraken.grok.api.Grok
EDIT:
Я просмотрел реализации ГРОК и класс Grok реализует Serializable. https://github.com/thekrakken/java-grok/blob/master/src/main/java/io/thekraken/grok/api/Grok.java
'В принципе, вы можете зарегистрировать Kryo сериалайзер для пользовательских типов или реализации (де-) Сериализация себя, если вам нужно членов оператора, которые не являются непосредственно сериализуемыми. '--- Я немного запутался в том, как сделать то же самое. – user3351750