2017-01-09 10 views
2

Я делаю небольшой проект для университета, используя Apache NiFi и Apache Spark. Я хочу создать рабочий процесс с NiFi, который читает TSV-файлы из HDFS и использует Spark Streaming. Я могу обрабатывать файлы и хранить необходимую мне информацию в MySQL. Я уже создал свой рабочий процесс в NiFi, и часть хранения уже работает. Проблема в том, что я не могу разобрать пакет NiFi, чтобы я мог их использовать.Parse NiFi Data Packet с использованием искры

файлы содержат строки, как это:

linea1File1 TheReceptionist 653 Entertainment 424 13021 4.34 1305 744 DjdA-5oKYFQ NxTDlnOuybo c-8VuICzXtU 

Где каждое пространство вкладка ("\ т")

Это мой код в Спарк с помощью Scala:

val ssc = new StreamingContext(config, Seconds(10)) 
val packet = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY)) 
val file = packet.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8)) 

До сих пор я могу получить весь файл (7000+ строк) в одной строке ... к сожалению, я не могу разбить эту строку в строках. Мне нужно получить весь файл в строках, поэтому я могу разобрать его в объекте, применить некоторые операции к нему и сохранить то, что хочу

Любой может мне помочь?

ответ

3

Каждый пакет данных будет содержимым одного файла потока из NiFi, поэтому, если NiFi собирает один TSV-файл из HDFS, который имеет много строк, все эти строки будут в одном пакете данных.

Трудно сказать, не видя поток NiFi, но вы, вероятно, можете использовать SplitText с количеством строк 1, чтобы разделить ваш TSV в NiFi, прежде чем он начнет испускать потоки.

+0

Большое спасибо ... это полностью разрешило мою проблему ... Я никогда не думал, чтобы решить проблему с NiFi .. я был сосредоточен на Spark ... –