2015-03-23 1 views
0

Я использую HTTP-источник для размещения файлов JSON в HDFS (Single SANDBOX).Создание файла в HDFS, но не добавление какого-либо содержимого

Файл создан в правильной директории, однако в файл ничего не добавлено. Не могли бы вы проверить мой flume.conf, прежде чем начать отладку HTTP-источника?

################################################################# 
# Name the components on this agent 
################################################################# 

hdfs-agent.sources = httpsource 
hdfs-agent.sinks = hdfssink 
hdfs-agent.channels = channel1 

################################################################# 
# Describe source 
################################################################# 

# Source node 
hdfs-agent.sources.httpsource.type = http 
hdfs-agent.sources.httpsource.port = 5140 
hdfs-agent.sources.httpsource.handler = org.apache.flume.source.http.JSONHandler 

################################################################# 
# Describe Sink 
################################################################# 

# Sink hdfs 
hdfs-agent.sinks.hdfssink.type = hdfs 
hdfs-agent.sinks.hdfssink.hdfs.path = hdfs://sandbox:8020/user/flume/node 
hdfs-agent.sinks.hdfssink.hdfs.fileType = DataStream 
hdfs-agent.sinks.hdfssink.hdfs.batchSize = 1 
hdfs-agent.sinks.hdfssink.hdfs.rollSize = 0 
hdfs-agent.sinks.hdfssink.hdfs.rollCount = 0 

################################################################# 
# Describe channel 
################################################################# 

# Channel memory 
hdfs-agent.channels.channel1.type = memory 
hdfs-agent.channels.channel1.capacity = 1000 
hdfs-agent.channels.channel1.transactionCapacity = 100 


################################################################# 
# Bind the source and sink to the channel 
################################################################# 

hdfs-agent.sources.httpsource.channels = channel1 
hdfs-agent.sinks.hdfssink.channel = channel1 

Сейчас я просто пытаюсь проверить это, начиная с малого:

[{"text": "Hi Flume this Node"}] 

Так что я имею в виду мой BATCHSIZE/rollSize/rollCount может быть проблема здесь?

ответ

2

batchSize, rollSize, rollCount значения прекрасны. Настройка rollSize и rollCount на 0 приведет к отключению функции перемотки файла.

HDFS-agent.sources.httpsource.type должен быть установлен в org.apache.flume.source.http.HTTPSource

Формат данных, отправляемых на источник HTTP должен быть

[{"headers" : {"a":"b", "c":"d"},"body": "random_body"}, {"headers" : {"e": "f"},"body": "random_body2"}].

Я протестированные отправки с использованием данных вы использовали ([{"text": "Привет, Flume этот узел"}]). Ничто не добавлялось к моему файлу, так как нет атрибута «body». Но когда я опубликовал следующее, данные были добавлены в мой файл.

curl -X POST -H 'Content-Type: application/json; charset=UTF-8' -d '[{ "headers" : {   "timestamp" : "434324343", "host" :"random_host.example.com", "field1" : "val1"   }, "body" : "random_body" }]' http://localhost:5140. 

Надеется, что это помогает

1

Как arathim отметил org.apache.flume.source.http.JSONHandler ожидает формат Flume событий. Если вы хотите создать свой собственный JSON, вам нужно создать собственный обработчик. Это пример обработчика, который принимает любой JSON:

public class GenericJSONInputHandler implements HTTPSourceHandler { 

    protected static final String TIMESTAMP = "timestamp"; 
    private static final Logger LOG = LoggerFactory.getLogger(GenericJSONInputHandler.class); 
    protected static final String TYPE = "type"; 


    public GenericJSONInputHandler() { 
    } 

    /** 
    * {@inheritDoc} 
    */ 
    @Override 
    public List<Event> getEvents(HttpServletRequest request) throws Exception { 
     BufferedReader reader = request.getReader(); 
     String charset = request.getCharacterEncoding(); 
     // UTF-8 is default for JSON. If no charset is specified, UTF-8 is to 
     // be assumed. 
     if (charset == null) { 
      LOG.debug("Charset is null, default charset of UTF-8 should be used."); 
     } 

     List<Event> eventList = new ArrayList<Event>(0); 
     try { 
      String json = reader.readLine(); 
      LOG.debug("Received line with size " + json.length()); 
      while (json != null) { 
       List<Event> e = createEvents(json); 
       if (e !=null) { 
        eventList.addAll(e); 
       } 
       json = reader.readLine(); 
      } 
     } 
     catch (Exception ex) { 
      throw new HTTPBadRequestException("Request has invalid JSON Syntax.", ex); 
     } 

     return eventList; 
    } 

    protected List<Event> createEvents(String json) { 
     try { 
      if (isValidJSON(json)) { 
       Map<String, String> headers = new HashMap<>(); 
       headers.put(TIMESTAMP, String.valueOf(System.currentTimeMillis())); 
       headers.put(TYPE, "default"); 
       return Arrays.asList(EventBuilder.withBody(json.getBytes(), headers)); 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 
     return null; 
    } 

    public boolean isValidJSON(final String json) { 
     boolean valid = false; 
     try { 
      final JsonParser parser = new ObjectMapper().getFactory() 
       .createParser(json); 
      while (parser.nextToken() != null) { 
      } 
      valid = true; 
     } 
     catch (JsonParseException jpe) { 
      jpe.printStackTrace(); 
     } 
     catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     return valid; 
    } 

    @Override 
    public void configure(Context context) { 
    } 

} 
+0

Является ли этот пользовательский обработчик, только что помещенный в каталог flume? Как его поднять? – pele88

+1

Скомпилируйте его и поместите в банку, эту банку нужно поместить в ваш каталог FLUME_HOME/lib. Изменение hdfs-agent.sources.httpsource.handler указывает на вашу собственную реализацию. –