Я пытаюсь загрузить данные в HDFS с использованием Flume 1.7. Я создал следующую конфигурацию:Флуд HDFS-приемник хранит только одну строку источника данных, используя источник netcat
# Starting with: /opt/flume/bin/flume-ng agent -n Agent -c conf -f /opt/flume/conf/test.conf
# Naming the components on the current agent
Agent.sources = Netcat
Agent.channels = MemChannel
Agent.sinks = LoggerSink hdfs-sink LocalOut
# Describing/Configuring the source
Agent.sources.Netcat.type = netcat
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565
# Describing/Configuring the sink
Agent.sinks.LoggerSink.type = logger
# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>:8020/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 0
Agent.sinks.hdfs-sink.hdfs.rollInterval = 0
Agent.sinks.hdfs-sink.hdfs.idleTimeout = 0
# Schreibt input into local Filesystem
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
Agent.sinks.LocalOut.type = file_roll
Agent.sinks.LocalOut.sink.directory = /tmp/flume
Agent.sinks.LocalOut.sink.rollInterval = 0
# Describing/Configuring the channel
Agent.channels.MemChannel.type = memory
Agent.channels.MemChannel.capacity = 1000
Agent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.LoggerSink.channel = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
Agent.sinks.LocalOut.channel = MemChannel
После этого я послал следующий файл, используя Netcat на источник:
cat textfile.csv | nc <IP of flume agent> 56565
Файл содержит следующие элементы:
Name1,1
Name2,2
Name3,3
Name4,4
Name5,5
Name6,6
Name7,7
Name8,8
Name9,9
Name10,10
Name11,11
Name12,12
Name13,13
Name14,14
Name15,15
Name16,16
Name17,17
Name18,18
Name19,19
Name20,20
...
Name490,490
Name491,491
Name492,492
Этот вопрос Я столкнулся с тем, что без ошибок дым записывает в hdfs, а только одну строку переданного файла. Если вы начали нажимать файл несколько раз на источник с помощью нектата, то иногда лоток записывает более одного файла в hdfs, включая более одной строки. Но редко все строки.
Я попытался изменить параметры hdfs для rollSize, размера партии и других, но это не изменило поведение на самом деле.
Раковина для локального файла, которая также настроена, работает отлично.
Кто-нибудь знает, как настроить его, чтобы все записи записывались в hdf без потери записей.
Благодарим за помощь.
Update 1.12.2016
Я удалил все раковины, кроме мойки HDFS и изменены некоторые параметры. После этого приемники HDFS работают так, как должно быть.
Вот конфигурация:
# Naming the components on the current agent
Agent.sources = Netcat
Agent.channels = MemChannel
Agent.sinks = hdfs-sink
# Describing/Configuring the source
Agent.sources.Netcat.type = netcat
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565
# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 100
# Describing/Configuring the channel
Agent.channels.MemChannel.type = memory
Agent.channels.MemChannel.capacity = 1000
Agent.channels.MemChannel.transactionCapacity = 100
# Bind the source and sink to the channel
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
Имеет кого-нибудь идея, почему она работает с этой конфигурацией, но с двумя или более приемников он больше не работает?