2016-11-27 14 views
0

Я пытаюсь загрузить данные в HDFS с использованием Flume 1.7. Я создал следующую конфигурацию:Флуд HDFS-приемник хранит только одну строку источника данных, используя источник netcat

# Starting with: /opt/flume/bin/flume-ng agent -n Agent -c conf -f /opt/flume/conf/test.conf 
# Naming the components on the current agent 
Agent.sources = Netcat 
Agent.channels = MemChannel 
Agent.sinks = LoggerSink hdfs-sink LocalOut 

# Describing/Configuring the source 
Agent.sources.Netcat.type = netcat 
Agent.sources.Netcat.bind = 0.0.0.0 
Agent.sources.Netcat.port = 56565 

# Describing/Configuring the sink 
Agent.sinks.LoggerSink.type = logger 

# Define a sink that outputs to hdfs. 
Agent.sinks.hdfs-sink.type = hdfs 
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>:8020/user/admin/flume_folder/%y-%m-%d/%H%M/ 
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true 
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream 
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text 
Agent.sinks.hdfs-sink.hdfs.batchSize = 100 
Agent.sinks.hdfs-sink.hdfs.rollSize = 0 
Agent.sinks.hdfs-sink.hdfs.rollCount = 0 
Agent.sinks.hdfs-sink.hdfs.rollInterval = 0 
Agent.sinks.hdfs-sink.hdfs.idleTimeout = 0 

# Schreibt input into local Filesystem 
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink 
Agent.sinks.LocalOut.type = file_roll 
Agent.sinks.LocalOut.sink.directory = /tmp/flume 
Agent.sinks.LocalOut.sink.rollInterval = 0 


# Describing/Configuring the channel 
Agent.channels.MemChannel.type = memory 
Agent.channels.MemChannel.capacity = 1000 
Agent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
Agent.sources.Netcat.channels = MemChannel 
Agent.sinks.LoggerSink.channel = MemChannel 
Agent.sinks.hdfs-sink.channel = MemChannel 
Agent.sinks.LocalOut.channel = MemChannel 

После этого я послал следующий файл, используя Netcat на источник:

cat textfile.csv | nc <IP of flume agent> 56565 

Файл содержит следующие элементы:

Name1,1 
Name2,2 
Name3,3 
Name4,4 
Name5,5 
Name6,6 
Name7,7 
Name8,8 
Name9,9 
Name10,10 
Name11,11 
Name12,12 
Name13,13 
Name14,14 
Name15,15 
Name16,16 
Name17,17 
Name18,18 
Name19,19 
Name20,20 
... 
Name490,490 
Name491,491 
Name492,492 

Этот вопрос Я столкнулся с тем, что без ошибок дым записывает в hdfs, а только одну строку переданного файла. Если вы начали нажимать файл несколько раз на источник с помощью нектата, то иногда лоток записывает более одного файла в hdfs, включая более одной строки. Но редко все строки.

Я попытался изменить параметры hdfs для rollSize, размера партии и других, но это не изменило поведение на самом деле.

Раковина для локального файла, которая также настроена, работает отлично.

Кто-нибудь знает, как настроить его, чтобы все записи записывались в hdf без потери записей.

Благодарим за помощь.


Update 1.12.2016

Я удалил все раковины, кроме мойки HDFS и изменены некоторые параметры. После этого приемники HDFS работают так, как должно быть.

Вот конфигурация:

# Naming the components on the current agent 
Agent.sources = Netcat 
Agent.channels = MemChannel 
Agent.sinks = hdfs-sink 

# Describing/Configuring the source 
Agent.sources.Netcat.type = netcat 
Agent.sources.Netcat.bind = 0.0.0.0 
Agent.sources.Netcat.port = 56565 


# Define a sink that outputs to hdfs. 
Agent.sinks.hdfs-sink.type = hdfs 
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>/user/admin/flume_folder/%y-%m-%d/%H%M/ 
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true 
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream 
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text 
Agent.sinks.hdfs-sink.hdfs.batchSize = 100 
Agent.sinks.hdfs-sink.hdfs.rollSize = 0 
Agent.sinks.hdfs-sink.hdfs.rollCount = 100 


# Describing/Configuring the channel 
Agent.channels.MemChannel.type = memory 
Agent.channels.MemChannel.capacity = 1000 
Agent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
Agent.sources.Netcat.channels = MemChannel 
Agent.sinks.hdfs-sink.channel = MemChannel 

Имеет кого-нибудь идея, почему она работает с этой конфигурацией, но с двумя или более приемников он больше не работает?

ответ

0

Я нашел решение самостоятельно. Насколько я понимаю, я использовал один и тот же канал для обоих приемников. Поэтому приемник, который быстрее выполнял все записи, и только некоторые из записей были переданы в приемник hdfs.

После того, как с помощью различных каналов, и в том числе нагнетание для источников с параметром

Agent.sources.Netcat.selector.type = replicating 

Флюм записывает в локальный файл и HDFS, как ожидалось.

 Смежные вопросы

  • Нет связанных вопросов^_^