2016-11-12 6 views
1

У меня есть поток, в котором обрабатываю файлы csv (максимальный размер файла ~ 700 мБ). Я разделял эти файлы в файлах потока из 1 строки, затем выполняю некоторую обработку, и, наконец, я использую [Extended] MergeContent процессор в качестве механизма синхронизации. Дело в том, что если я обрабатываю каждый файл csv, когда он находится в очереди MergeContent, он блокирует экземпляр nifi.nifi-1.0.0 блоки при слиянии файлов

Он блокируется, когда у меня более 1,5 миллионов файлов.

Nifi работает на сервере (32 ядра, ~ 200 ГБ ОЗУ). Nipi heap size 64 или 128 возникает одна и та же проблема.

Любая идея, почему это происходит?

Если я обрабатываю 5 файлов csv, которые являются небольшими, нет проблем, процессор работает нормально.

[Extended] MergeContent - это процессор, который наследует MergeContent и ожидает, что все файлы будут упакованы, а затем отправит сигнал на следующий процессор. Он не возвращает весь файл. Например, если мой CSV-файл состоит из 1 000 000 файлов потока из 1 строки, то [Extended] MergeContent отправит только 1 из 1 000 000 файлов потока в качестве сигнала к следующему процессору.

+0

Я не уверен, я понимаю ваш сценарий - вместо того, чтобы на самом деле объединений нескольких flowfiles обратно в один flowfile, этот новый процессор «EMC» просто ждет все flowfiles, чтобы закончить, но затем отправляет только один (произвольный) файл потока в следующий компонент? Существует открытый [Apache Jira - NIFI-1926] (https://issues.apache.org/jira/browse/NIFI-1926) для компонента агрегатора общего назначения и [patch] (https: //issues.apache .org/jira/browse/NIFI-2590), которые могут выполнить то, что вам нужно. – Andy

+0

Если он блокируется из-за большого объема файловых файлов, вы должны проверить, как этот процессор соответствует параметрам конфигурации «Продолжительность срока действия» и «Стратегия планирования». Если этот процессор фактически не выполняет какую-либо логику, кроме ожидания файловых файлов 'x', у нее, вероятно, должна быть« Стратегия планирования »для« Event Driven », поэтому она не будет работать без необходимости. Возможно, вам удастся использовать процессор 'ExecuteScript', который просто оценивает количество полученных файлов потоков и выполняет до тех пор, пока не достигнет установленного порога. – Andy

+0

да, это именно то, что он делает. – bsd

ответ

0

Я имел дело с подобной ситуацией и боролся так же, как пытаться работать с NiFi на уровне детализации с таким количеством файлов.

В моем конкретном случае это накладывало слишком много давления на сборщик мусора, независимо от того, сколько у меня объема памяти или сколько времени я потратил на настройку GC, G1 периодически запускал полные GC. Вы можете проверить, является ли это проблемой, связанной с сборщиком мусора, в том, что касается итогового раздела в верхнем правом углу, а затем для диагностики системы для отображения статистики gc.

Я могу предложить вам перепроектировать потоковые и периодические линии вместе, чтобы в конечном итоге было меньше файлов потока. Другим вариантом было бы ограничить количество потоков в потоке в любой момент времени, применяя обратное давление в очередях, но поскольку вы используете EMC-процессор в качестве механизма синхронизации, я не знаю, возможно ли это.

Надеется, что это помогло

+0

Большое спасибо! Я устранил EMC, я написал еще один процессор, который работает гораздо меньше, чем EMC. – bsd

 Смежные вопросы

  • Нет связанных вопросов^_^