2016-11-22 11 views
0

Я пытаюсь (безуспешно) запускать простую программу приветствия мирового уровня в Apache Flink. Код принимает сообщение от Apache Kafka добавляет «.». после каждой буквы и печатает новую строку в stdout. Код правильно получает сообщение от Kafka, но функция карты добавляет «.». выходит из строя. Я пробовал эту функцию в приглашении REPL, и там scala-код работает правильно. Scala код:Новое в Scala и apache flink, почему моя функция карты работает правильно REPL, но не работает в Flink

scala> input = "hello" 
input: String = hello 
scala> val output = input.flatMap(value => value + ".") 
output: String = h.e.l.l.o 

Flink программа: flink code отрезан линия читает

val messageStream = env.addSource(new FlinkKafkaConsumer09("CL", new SimpleStringSchema, properties)) 

Я не могу понять, где я буду неправильно, я попробовал документацию апачской не дали никаких результатов. любая помощь, которую вы могли бы мне дать, была бы хорошо воспринята.

+0

Это поможет, если вы сказали нам, что это ошибка. Вы пытались скомпилировать в cmd-строке? Известно, что среды ID Scala выдают ложные ошибки. – pedrofurla

+0

Кроме того, кажется, что вместо flatMap вы можете использовать карту в messageStream. FlatMap работает в 'input' sample, потому что' value' является символом char и '+". '' Превращает его в s String. – pedrofurla

+0

В сообщении об ошибке отсутствует тип параметра. Я также думал, что это может быть IDE, но я получаю ту же ошибку при попытке скомпилировать код в maven, который мешает мне развернуть код. Если планшет не работает одинаково в обеих средах? просмотр строки по одному символу за раз? –

ответ

0

Прежде всего, я бы рекомендовал изучать некоторые основы о функциональном программировании и операций, как отображение/flatMap/уменьшить и т.д.

Все эти упомянутые функции применяются к коллекциям. В вашем примере, как @pedrofuria лестницы указало, вы применить flatMap функцию String, которая является сбором char s

В FLiNK примера messageStream может быть забранным в виде набора строк, поэтому для выполнения операции вы описали вы должны сделать STH нравится:

val stream = messageStream.map(str => str.mkString(".")) 

Я использовал mkString вместо flatMap из вашего примера, потому что прежний, а не

h.e.l.l.o (как вы написали)

производит

h.e.l.l.o.

Но еще раз действительно начинаем с основ функционального программирования.

0

спасибо за помощь, я понял, проблема, вроде. Хотя функция flatMap работает в приглашении scala, она не работает в Flink, так как Flink требует, чтобы FlatMap передавал новую FlatMapFunction с переопределением. Еще не знаете, почему он работает в командной строке scala, но код теперь компилируется и работает, как ожидалось.

0

Потому что flatMap в приглашении scala не совпадает с flatMap в вашей программе flink.

flatMap в вашем приглашении scala - это просто функция в scala.

Flink flatMap можно использовать во время ввода любит ниже:

val input = benv.fromElements(
      "To be, or not to be,--that is the question:--", 
      "Whether 'tis nobler in the mind to suffer", 
      "The slings and arrows of outrageous fortune", 
      "Or to take arms against a sea of troubles,") 
val counts = input 
      .flatMap { _.toLowerCase.split("\\W+") } 
      .map { (_, 1) }.groupBy(0).sum(1) 

См: scala-shell