使用Flume(spool目录作为源)将csv文件加载到HDFS中



我正在尝试将csv文件(6MB)加载到HDFS中,使用flume和spoodir作为源,HDFS作为汇,这是我的配置文件:

# Initialize agent's source, channel and sink
agent.sources = TwitterExampleDir
agent.channels = memoryChannel
agent.sinks = flumeHDFS
# Setting the source to spool directory where the file exists
agent.sources.TwitterExampleDir.type = spooldir
agent.sources.TwitterExampleDir.spoolDir = /usr/local/word_count_input
# Setting the channel to memory
agent.channels.memoryChannel.type = memory
# Max number of events stored in the memory channel
agent.channels.memoryChannel.capacity = 10000
# agent.channels.memoryChannel.batchSize = 15000
agent.channels.memoryChannel.transactioncapacity = 1000000
# Setting the sink to HDFS
agent.sinks.flumeHDFS.type = hdfs
agent.sinks.flumeHDFS.hdfs.path = hdfs://192.168.220.128:8000/spool5
agent.sinks.flumeHDFS.hdfs.fileType = DataStream
# Write format can be text or writable
agent.sinks.flumeHDFS.hdfs.writeFormat = Text
# use a single csv file at a time
agent.sinks.flumeHDFS.hdfs.maxOpenFiles = 1
# rollover file based on maximum size of 10 MB
agent.sinks.flumeHDFS.hdfs.rollCount = 0
agent.sinks.flumeHDFS.hdfs.rollInterval = 0
agent.sinks.flumeHDFS.hdfs.rollSize = 1000
agent.sinks.flumeHDFS.hdfs.batchSize = 100
# never rollover based on the number of events
agent.sinks.flumeHDFS.hdfs.rollCount = 0
# rollover file based on max time of 1 min
#agent.sinks.flumeHDFS.hdfs.rollInterval = 0
# agent.sinks.flumeHDFS.hdfs.idleTimeout = 600
# Connect source and sink with channel
agent.sources.TwitterExampleDir.channels = memoryChannel
agent.sinks.flumeHDFS.channel = memoryChannel

在那之后,我得到了这些错误,不知道为什么:

2015-02-05 09:01:01,036 [SinkRunner-PollingRunner-DefaultSinkProcessor] ERROR org.apache.flume.sink.hdfs.HDFSEventSink (HDFSEventSink.java:466) - process failed
java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
        at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:216)
        at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2564)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:274)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:266)
        at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:722)
        at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:187)
        at org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59)
        at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:719)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
2015-02-05 09:01:01,062 [SinkRunner-PollingRunner-DefaultSinkProcessor] ERROR org.apache.flume.SinkRunner (SinkRunner.java:160) - Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:470)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.UnsupportedOperationException: Not implemented by the DistributedFileSystem FileSystem implementation
        at org.apache.hadoop.fs.FileSystem.getScheme(FileSystem.java:216)
        at org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2564)
        at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
        at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
        at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
        at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:274)
        at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:266)
        at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:722)
        at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:187)
        at org.apache.flume.sink.hdfs.BucketWriter.access$1700(BucketWriter.java:59)
        at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:719)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        ... 1 more

有人能在这个问题上帮我吗?

我使用的是hortonworks sandbox v2.2,经过长时间的调试,我发现我手动安装的spark版本"v1.2"和hortonworks沙盒库之间存在一些冲突,所以我决定使用cloudera quickstart 5.3.0,现在一切都很好

相关内容

  • 没有找到相关文章

最新更新