2015-02-13 1 views
0

Я пытаюсь записать текст в HDFS удаленной машины, используя лоток. Но мои попытки потерпели неудачу.Невозможно записать с удаленного HDFS с использованием флюма

Я использую cloudera quickstart VM в качестве удаленной машины. Вот мои шаги:

  1. Я запускаемые желоб как:

    sudo init.d/flume-ng-agent start 
    
  2. Edited лотковой конфигурации в менеджере Cloudera

    # Please paste flume.conf here. Example: 
    
    # Sources, channels, and sinks are defined per 
    # agent name, in this case 'tier1'. 
    tier1.sources = source1 
    tier1.channels = channel1 
    tier1.sinks = sink1 
    
    # For each source, channel, and sink, set 
    # standard properties. 
    tier1.sources.source1.type  = avro 
    tier1.sources.source1.bind  = 172.24.***.*** # address of remote machine (cloudera quickstart VM) 
    tier1.sources.source1.port  = 41414 
    tier1.sources.source1.channels = channel1 
    
    tier1.channels.channel1.type = memory 
    
    tier1.sinks.sink1.type   = hdfs 
    tier1.sinks.sink1.channel  = channel1 
    tier1.sinks.sink1.hdfs.path  = /tmp/%y-%m-%d/%H%M/%S 
    tier1.sinks.sink1.hdfs.fileType = DataStream 
    
    #Format to be written 
    tier1.sinks.sink1.hdfs.writeFormat = Text 
    
    tier1.sinks.sink1.hdfs.maxOpenFiles = 10 
    # rollover file based on maximum size of 10 MB 
    tier1.sinks.sink1.hdfs.rollSize = 10485760 
    
    # never rollover based on the number of events 
    tier1.sinks.sink1.hdfs.rollCount = 0 
    
    # rollover file based on max time of 1 mi 
    tier1.sinks.sink1.hdfs.rollInterval = 60 
    
    #Specify the channel the sink should use 
    tier1.sinks.sink1.channel = memoryChannel 
    
    # Other properties are specific to each type of 
    # source, channel, or sink. In this case, we 
    # specify the capacity of the memory channel. 
    tier1.channels.channel1.capacity = 100 
    
  3. Вот мой код, который должен отправлять сообщения удаленная машина

    import org.apache.flume.Event; 
    import org.apache.flume.EventDeliveryException; 
    import org.apache.flume.api.RpcClient; 
    import org.apache.flume.api.RpcClientFactory; 
    import org.apache.flume.event.EventBuilder; 
    import java.nio.charset.Charset; 
    
    public class FlumeTransport { 
        public static void main(String[] args) { 
         MyRpcClientFacade client = new MyRpcClientFacade(); 
         // Initialize client with the remote Flume agent's host and port 
         client.init("172.24.***.***", 41414); 
    
         // Send 10 events to the remote Flume agent. That agent should be 
         // configured to listen with an AvroSource. 
         String sampleData = "Hello Flume!"; 
         for (int i = 0; i < 10; i++) { 
          client.sendDataToFlume(sampleData); 
         } 
    
         client.cleanUp(); 
        } 
    } 
    
    class MyRpcClientFacade { 
        private RpcClient client; 
        private String hostname; 
        private int port; 
    
        public void init(String hostname, int port) { 
         // Setup the RPC connection 
         this.hostname = hostname; 
         this.port = port; 
         this.client = RpcClientFactory.getDefaultInstance(hostname, port); 
         // Use the following method to create a thrift client (instead of the above line): 
         // this.client = RpcClientFactory.getThriftInstance(hostname, port); 
        } 
    
        public void sendDataToFlume(String data) { 
         // Create a Flume Event object that encapsulates the sample data 
         Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); 
    
         // Send the event 
         try { 
          client.append(event); 
         } catch (EventDeliveryException e) { 
          // clean up and recreate the client 
          System.out.println(e.getMessage()); 
          client.close(); 
          client = null; 
          client = RpcClientFactory.getDefaultInstance(hostname, port); 
          // Use the following method to create a thrift client (instead of the above line): 
          // this.client = RpcClientFactory.getThriftInstance(hostname, port); 
         } 
        } 
    
        public void cleanUp() { 
         // Close the RPC connection 
         client.close(); 
        } 
    
    } 
    

Когда я бегу приложение я получаю исключение

13/2-14:57:14,202 WARN : o.a.f.a.NettyAvroRpcClient - Invalid value for batchSize: 0; Using default value. 
13/2-14:57:14,209 WARN : o.a.f.a.NettyAvroRpcClient - Using default maxIOWorkers 
NettyAvroRpcClient { host: quickstart.cloudera.*******.com.ua, port: 41414 }: Failed to send event 

ответ

0

Ваш клиент Avro RPC не может подключиться к водопропускной агенту. Проверьте файлы журнала в /var/log/flume-ng/flume.log, чтобы узнать, что произошло. Вероятно, ваш агент не мог привязываться к интерфейсу. Рассмотрите замену

tier1.sources.source1.bind  = 172.24.***.*** 

с

tier1.sources.source1.bind  = 0.0.0.0 

, который эффективно связывается со всеми интерфейсами. Попробуйте установить telnet на 41414 локально, чтобы проверить, действительно ли порт отвечает.

 Смежные вопросы

  • Нет связанных вопросов^_^