如何使用 Flume 在源代码上执行预处理并在 hdfs sink 中保留真实文件名



我是使用Apache Flume的新手,我很难理解它是如何工作的。为了解释我的问题,所以我解释了我的需求和我做了什么。

我想在 csv 文件目录(这些文件每 5 分钟构建一次)和 HDFS 集群之间配置一个流。

我确定"假脱机目录"源和HDFS接收器是我需要的。那是给我这个 flume.conf 文件

agent.sources = seqGenSrc
agent.channels = memoryChannel
agent.sinks = hdfsSink
# For each one of the sources, the type is defined
agent.sources.seqGenSrc.type = spooldir
agent.sources.seqGenSrc.spoolDir = /home/user/data
# The channel can be defined as follows.
agent.sources.seqGenSrc.channels = memoryChannel
# Each sink's type must be defined
agent.sinks.hdfsSink.type = hdfs
agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data
agent.sinks.hdfsSink.hdfs.fileType = DataStream
agent.sinks.hdfsSink.hdfs.writeFormat=Text    
#Specify the channel the sink should use
agent.sinks.hdfsSink.channel = memoryChannel
# Each channel's type is defined.
agent.channels.memoryChannel.type = memory
# Other config values specific to each type of channel(sink or source)
# can be defined as well
# In this case, it specifies the capacity of the memory channel
agent.channels.memoryChannel.capacity = 100

结果是输入文件在我的本地文件系统上用".complete"重命名,数据上传到HDFS上,新名称我猜是独一无二的,由Flume生成。

这几乎是我需要的。

但是在上传之前,我想进行一些特定于文件的操作(删除标题,转义逗号..)。我不知道该怎么做,我想使用拦截器。但是,当数据在水槽中时,它会在事件中转换并流式传输。在他这一点上,对文件一无所知。

否则,原始时间事件

将写入文件名中,因此我希望此时间与我的事件相关联,而不是当前日期。

我还想在hdfs中保留原始文件名(其中有一些有用的信息)。

有人有建议来帮助我吗?

如果指定,原始文件名可以保留为标头

agent.sources.seqGenSrc.fileHeader=true 

然后可以在水槽中检索。

如果要操作文件中的数据,请使用拦截器。您应该知道,事件基本上是假脱机目录中文件中的一行。

最后但并非最不重要的一点是,您需要使用 fileHeader 属性将事件通过管道返回到正确的文件。这可以通过在接收器中指定路径来实现,如下所示:

agent.sinks.hdfsSink.hdfs.path = hdfs://localhost/Flume/data/%{file}

您可以使用前缀和后缀进一步配置文件名:

hdfs.filePrefix FlumeData   Name prefixed to files created by Flume in hdfs directory
hdfs.fileSuffix –   Suffix to append to file (eg .avro - NOTE: period is not automatically added)

最新更新