使用flume从kafka到HDFS的avro事件



我有一个kafka集群,从生产者那里接收avro事件。

我想使用flume来消费这些事件,并把它们作为avro文件在HDFS

这对flume来说可能吗?

有没有人有一个配置文件的例子来演示如何做到这一点?

尤西

这确实是可能的。

如果你想从Kafka消费,那么你需要设置一个Kafka源和一个使用Avro的HDFS sink。

下面是Kafka源代码配置选项的链接:http://flume.apache.org/FlumeUserGuide.html#kafka-source

设置源代码配置非常简单。当然,您需要对此进行测试,以验证您选择的设置在您的系统中是否运行良好。

要用Avro设置HDFS,你需要设置一个HDFS接收器,你很幸运,这个网站描述了如何做:http://thisdataguy.com/2014/07/28/avro-end-to-end-in-hdfs-part-2-flume-setup/

最后,您需要配置一个通道。我有使用默认设置Flume的内存通道的经验(我相信…现在无法检查),它工作得很好。

我建议您花时间阅读Flume文档:http://flume.apache.org/FlumeUserGuide.html,因为所有这些信息都包含在那里。在设置Flume代理来处理数据之前,了解正在使用的系统是很重要的。

考虑这个场景。对于来自kafka的avro事件(只有二进制数据,没有模式),下面是为我工作的代理。

架构将使用下面的代理在汇聚端添加。

#source
MY_AGENT.sources.my-source.type = org.apache.flume.source.kafka.KafkaSource
MY_AGENT.sources.my-source.channels = my-channel
MY_AGENT.sources.my-source.batchSize = 10000
MY_AGENT.sources.my-source.useFlumeEventFormat = false
MY_AGENT.sources.my-source.batchDurationMillis = 5000
MY_AGENT.sources.my-source.kafka.bootstrap.servers =${BOOTSTRAP_SERVERS}
MY_AGENT.sources.my-source.kafka.topics = my-topic
MY_AGENT.sources.my-source.kafka.consumer.group.id = my-topic_grp
MY_AGENT.sources.my-source.kafka.consumer.client.id = my-topic_clnt
MY_AGENT.sources.my-source.kafka.compressed.topics = my-topic
MY_AGENT.sources.my-source.kafka.auto.commit.enable = false
MY_AGENT.sources.my-source.kafka.consumer.session.timeout.ms=100000
MY_AGENT.sources.my-source.kafka.consumer.request.timeout.ms=120000
MY_AGENT.sources.my-source.kafka.consumer.max.partition.fetch.bytes=704857
MY_AGENT.sources.my-source.kafka.consumer.auto.offset.reset=latest
#channel
MY_AGENT.channels.my-channel.type = memory
MY_AGENT.channels.my-channel.capacity = 100000000
MY_AGENT.channels.my-channel.transactionCapacity = 100000
MY_AGENT.channels.my-channel.parseAsFlumeEvent = false
#Sink
MY_AGENT.sinks.my-sink.channel = my-channel
MY_AGENT.sinks.my-sink.type = hdfs
MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text
MY_AGENT.sinks.my-sink.hdfs.fileType = DataStream
MY_AGENT.sinks.my-sink.hdfs.kerberosPrincipal =${user}
MY_AGENT.sinks.my-sink.hdfs.kerberosKeytab =${keytab}
MY_AGENT.sinks.my-sink.hdfs.useLocalTimeStamp = true
MY_AGENT.sinks.my-sink.hdfs.path = hdfs://nameservice1/my_hdfs/my_table1/timestamp=%Y%m%d
MY_AGENT.sinks.my-sink.hdfs.rollCount=0
MY_AGENT.sinks.my-sink.hdfs.rollSize=0
MY_AGENT.sinks.my-sink.hdfs.batchSize=100000
MY_AGENT.sinks.my-sink.hdfs.maxOpenFiles=2000
MY_AGENT.sinks.my-sink.hdfs.callTimeout=50000
MY_AGENT.sinks.my-sink.hdfs.fileSuffix=.avro
MY_AGENT.sinks.my-sink.serializer = org.apache.flume.sink.hdfs.AvroEventSerializer$Builder
MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs://nameservice1/my_hdfs/avro_schemas/${AVSC_FILE}

我想强调几件事。

MY_AGENT.sinks.my-sink.hdfs.writeFormat= Text ..有助于只转储来自Flume事件的数据(忽略Flume事件头....)

MY_AGENT.sinks.my-sink.serializer.schemaURL = hdfs://nameservice1/my_hdfs/avro_schemas/${AVSC_FILE}…需要传递适当的模式(它将被添加到avro文件中的二进制数据中)。hdfs中的最终输出文件将包含schema + data。

将数据存储到HDFS后,使用合适的avro模式创建了hive表,我可以像预期的那样访问数据。

最新更新