2016-11-16 6 views
1

flowДоступ Json элемент и записать в текстовый файл с помощью питона ExecuteScript процессор

Я новичок в Python и НИФИ.

Мой поток GetFile -> ExecuteScript

В сценарии для каждого JSON, я хочу Accesss конкретный элемент и записать его в текстовый файл построчно.

Я попытался ниже:

import json 
import java.io 
from org.apache.commons.io import IOUtils 
from java.nio.charset import StandardCharsets 
from org.apache.nifi.processor.io import StreamCallback 

class ModJSON(StreamCallback): 
    def __init__(self): 
    pass 
    def process(self, inputStream, outputStream): 
    text = IOUtils.toString(inputStream, StandardCharsets.UTF_8) 
    json_content = json.loads(text) 
    try: 
    body = json_content['id']['body'] 
    body_encoded = body.encode('utf-8') 
    except (KeyError,TypeError,ValueError): 
    body_encoded = '' 

    text_file = open ('/tmp/test/testFile.txt', 'w') 
    text_file.write("%s"%body_encoded) 
    text_file.close() 
    outputStream.write(bytearray(json.dumps(body, indent=4).encode('utf-8'))) 

flowFile = session.get() 
if (flowFile != None): 
    flowFile = session.write(flowFile, ModJSON()) 
    flowFile = session.putAttribute(flowFile, "filename", flowFile.getAttribute('filename').split('.')[0]+'_translated.json') 
session.transfer(flowFile, REL_SUCCESS) 

но в testfile.txt, к которому осуществляется доступ тело не записывается.

что мне здесь не хватает?

ответ

3

Тело вашего класса Python не имеет отступов, и ни один из них не является телом метода процесса. Попробуйте отступом на один уровень от def init линии через строку outputStream.write, затем снова отступайте на один уровень от текста = строка IOUtils.toString через строку outputStream.write, это должно дать вам рабочий класс StreamCallback и вызвать скрипт для правильной работы.

Также вам не нужен вызов session.commit(), который будет вызываться для вас, когда скрипт будет завершен.

EDIT (из-за редактирования OP - см. Комментарии): В приведенном выше сценарии по-прежнему не с отступом, тело метода process() должно быть отступом. Вы получаете ошибки или бюллетени на процессоре ExecuteScript? Если файлы входящего потока находятся в очереди до ExecuteScript, то «flowFile = session.get()» не выполняется, или процессор должен вызывать ошибку и размещать бюллетень (красное поле в верхнем правом углу).

Также, поскольку вы собираетесь отправлять один и тот же контент из процессора в файл потока, вам не нужен код «text_file», я предполагаю, что это для отладки?

+0

Спасибо. Я вставил неправильно. Теперь я отредактировал сообщение. Все еще ничего не написано. – vishnu

+0

Есть ли файл потока, который выводится из этого процессора? Если да, то содержит ли его содержимое тело входящего JSON? – mattyb

+0

Он говорит в очереди 28, но никакой поток не поступает из getFile в executeScript. Я также добавил изображение – vishnu