在 R 中调试 mapreduce() 函数



今天我开始研究 rhdfs 和 rmr2 包。

1D 向量上的 mapreduce() 函数按预期运行良好。一维矢量上的一段代码

a1 <- to.dfs(1:20)
a2 <- mapreduce(input=a1, map=function(k,v) keyval(v, v^2))
a3 <- as.data.frame(from.dfs(a2())

它返回以下数据帧

    Key  Val
1     1    1
2    10  100
3    11  121
4    12  144
5    13  169
6    14  196
7    15  225
8    16  256
9    17  289
10   18  324
11   19  361
12    2    4
13   20  400
14    3    9
15    4   16
16    5   25
17    6   36
18    7   49
19    8   64
20    9   81

直到现在,一切都很好。

但是,在mtcars数据集上使用mapreduce函数时,我收到了以下错误消息。无法进一步调试它。请提供一些线索以继续前进。

我的代码段:

rs1 <- mapreduce(input=mtcars, 
                  map=function(k, v) {
                      if (mtcars$hp > 150) keyval("Bigger", 1) },
                  reduce=function(k, v)  keyval(k, sum(v))
                  )

上述代码段的错误消息。

13/09/21 07:24:49 ERROR streaming.StreamJob: Missing required option: input
Usage: $HADOOP_HOME/bin/hadoop jar 
          $HADOOP_HOME/hadoop-streaming.jar [options]
Options:
  -input    <path>     DFS input file(s) for the Map step
  -output   <path>     DFS output directory for the Reduce step
  -mapper   <cmd|JavaClassName>      The streaming command to run
  -combiner <cmd|JavaClassName> The streaming command to run
  -reducer  <cmd|JavaClassName>      The streaming command to run
  -file     <file>     File/dir to be shipped in the Job jar file
  -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
  -outputformat TextOutputFormat(default)|JavaClassName  Optional.
  -partitioner JavaClassName  Optional.
  -numReduceTasks <num>  Optional.
  -inputreader <spec>  Optional.
  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands
  -mapdebug <path>  Optional. To run this script when a map task fails 
  -reducedebug <path>  Optional. To run this script when a reduce task fails 
  -io <identifier>  Optional.
  -verbose
Generic options supported are
-conf <configuration file>     specify an application configuration file
-D <property=value>            use value for given property
-fs <local|namenode:port>      specify a namenode
-jt <local|jobtracker:port>    specify a job tracker
-files <comma separated list of files>    specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars>    specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives>    specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]

For more details about these options:
Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info
Streaming Command Failed!
Error in mr(map = map, reduce = reduce, combine = combine, vectorized.reduce,  : 
  hadoop streaming failed with error code 1

高度赞赏快速和详细的回复...

您传递给Keyval的数据将其视为向量,而不是单个实体。尝试从下面的代码中解释。

本地尝试

  • 加载数据

    data(mtcars)
    
  • 查看几条数据线

    head(mtcars)
    hpTest=mtcars$hp # taking required data
    print(hpTest)
    
  • 最终总和

    sum(hpTest[which(hpTest>150)])  # 2804
    

运行在Hadoop-MapReduce上

  • 导出环境变量

    # requied
    Sys.setenv(HADOOP_HOME="/home/trendwise/apache/hadoop-1.0.4");
    Sys.setenv(HADOOP_CMD="/home/trendwise/apache/hadoop-1.0.4/bin/hadoop");
    #optional
    Sys.setenv(HADOOP_BIN="/home/trendwise/apache/hadoop-1.0.4/bin");
    Sys.setenv(HADOOP_CONF_DIR="/home/trendwise/apache/hadoop-1.0.4/conf");
    Sys.setenv(HADOOP_STREAMING='/home/trendwise/apache/hadoop-1.0.4/contrib/streaming/hadoop-streaming-1.0.4.jar')
    Sys.setenv(LD_LIBRARY_PATH="/lib:/lib/x86_64-linux-gnu:/lib64:/usr/lib:/usr/lib64:/usr/local/lib:/usr/local/lib64:/usr/lib/jvm/jdk1.7.0_10/lib:/usr/lib/jvm/jdk1.7.0_10/jre/lib:/usr/lib/jvm/jdk1.7.0_10/jre/lib/amd64:/usr/lib/jvm/jdk1.7.0_10/jre/lib/amd64/server");
    
  • 加载库

    library(rmr2)
    library(rhdfs)
    
  • 初始 化

    hdfs.init()
    
  • 将输入放入 HDFS

    hpInput = to.dfs(mtcars$hp)
    
  • 运行MapReduce。

    mapReduceResult <- mapreduce(input=hpInput, 
              map=function(k, v) { keyval( rep(1,length(which(inputData > 150))) ,v[which(v>150)] )} ,
              reduce=function(k2, v2){  keyval(k2, sum(v2))}
    
  • 查看 MR 输出

    from.dfs(mapReduceResult)
    
  • 输出

    $key
    [1] 1
    $val
    [1] 2804    
    

您可以在最新的 RStudio 中使用内置调试功能。只需以本地 MR 方式重写代码即可

相关内容

  • 没有找到相关文章

最新更新