Я пытаюсь записать текст в HDFS удаленной машины, используя лоток. Но мои попытки потерпели неудачу.Невозможно записать с удаленного HDFS с использованием флюма
Я использую cloudera quickstart VM в качестве удаленной машины. Вот мои шаги:
Я запускаемые желоб как:
sudo init.d/flume-ng-agent start
Edited лотковой конфигурации в менеджере Cloudera
# Please paste flume.conf here. Example: # Sources, channels, and sinks are defined per # agent name, in this case 'tier1'. tier1.sources = source1 tier1.channels = channel1 tier1.sinks = sink1 # For each source, channel, and sink, set # standard properties. tier1.sources.source1.type = avro tier1.sources.source1.bind = 172.24.***.*** # address of remote machine (cloudera quickstart VM) tier1.sources.source1.port = 41414 tier1.sources.source1.channels = channel1 tier1.channels.channel1.type = memory tier1.sinks.sink1.type = hdfs tier1.sinks.sink1.channel = channel1 tier1.sinks.sink1.hdfs.path = /tmp/%y-%m-%d/%H%M/%S tier1.sinks.sink1.hdfs.fileType = DataStream #Format to be written tier1.sinks.sink1.hdfs.writeFormat = Text tier1.sinks.sink1.hdfs.maxOpenFiles = 10 # rollover file based on maximum size of 10 MB tier1.sinks.sink1.hdfs.rollSize = 10485760 # never rollover based on the number of events tier1.sinks.sink1.hdfs.rollCount = 0 # rollover file based on max time of 1 mi tier1.sinks.sink1.hdfs.rollInterval = 60 #Specify the channel the sink should use tier1.sinks.sink1.channel = memoryChannel # Other properties are specific to each type of # source, channel, or sink. In this case, we # specify the capacity of the memory channel. tier1.channels.channel1.capacity = 100
Вот мой код, который должен отправлять сообщения удаленная машина
import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.api.RpcClient; import org.apache.flume.api.RpcClientFactory; import org.apache.flume.event.EventBuilder; import java.nio.charset.Charset; public class FlumeTransport { public static void main(String[] args) { MyRpcClientFacade client = new MyRpcClientFacade(); // Initialize client with the remote Flume agent's host and port client.init("172.24.***.***", 41414); // Send 10 events to the remote Flume agent. That agent should be // configured to listen with an AvroSource. String sampleData = "Hello Flume!"; for (int i = 0; i < 10; i++) { client.sendDataToFlume(sampleData); } client.cleanUp(); } } class MyRpcClientFacade { private RpcClient client; private String hostname; private int port; public void init(String hostname, int port) { // Setup the RPC connection this.hostname = hostname; this.port = port; this.client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead of the above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } public void sendDataToFlume(String data) { // Create a Flume Event object that encapsulates the sample data Event event = EventBuilder.withBody(data, Charset.forName("UTF-8")); // Send the event try { client.append(event); } catch (EventDeliveryException e) { // clean up and recreate the client System.out.println(e.getMessage()); client.close(); client = null; client = RpcClientFactory.getDefaultInstance(hostname, port); // Use the following method to create a thrift client (instead of the above line): // this.client = RpcClientFactory.getThriftInstance(hostname, port); } } public void cleanUp() { // Close the RPC connection client.close(); } }
Когда я бегу приложение я получаю исключение
13/2-14:57:14,202 WARN : o.a.f.a.NettyAvroRpcClient - Invalid value for batchSize: 0; Using default value.
13/2-14:57:14,209 WARN : o.a.f.a.NettyAvroRpcClient - Using default maxIOWorkers
NettyAvroRpcClient { host: quickstart.cloudera.*******.com.ua, port: 41414 }: Failed to send event