今天我开始研究 rhdfs 和 rmr2 包。
1D 向量上的 mapreduce() 函数按预期运行良好。一维矢量上的一段代码
a1 <- to.dfs(1:20)
a2 <- mapreduce(input=a1, map=function(k,v) keyval(v, v^2))
a3 <- as.data.frame(from.dfs(a2())
它返回以下数据帧
Key Val
1 1 1
2 10 100
3 11 121
4 12 144
5 13 169
6 14 196
7 15 225
8 16 256
9 17 289
10 18 324
11 19 361
12 2 4
13 20 400
14 3 9
15 4 16
16 5 25
17 6 36
18 7 49
19 8 64
20 9 81
直到现在,一切都很好。
但是,在mtcars数据集上使用mapreduce函数时,我收到了以下错误消息。无法进一步调试它。请提供一些线索以继续前进。
我的代码段:
rs1 <- mapreduce(input=mtcars,
map=function(k, v) {
if (mtcars$hp > 150) keyval("Bigger", 1) },
reduce=function(k, v) keyval(k, sum(v))
)
上述代码段的错误消息。
13/09/21 07:24:49 ERROR streaming.StreamJob: Missing required option: input
Usage: $HADOOP_HOME/bin/hadoop jar
$HADOOP_HOME/hadoop-streaming.jar [options]
Options:
-input <path> DFS input file(s) for the Map step
-output <path> DFS output directory for the Reduce step
-mapper <cmd|JavaClassName> The streaming command to run
-combiner <cmd|JavaClassName> The streaming command to run
-reducer <cmd|JavaClassName> The streaming command to run
-file <file> File/dir to be shipped in the Job jar file
-inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.
-outputformat TextOutputFormat(default)|JavaClassName Optional.
-partitioner JavaClassName Optional.
-numReduceTasks <num> Optional.
-inputreader <spec> Optional.
-cmdenv <n>=<v> Optional. Pass env.var to streaming commands
-mapdebug <path> Optional. To run this script when a map task fails
-reducedebug <path> Optional. To run this script when a reduce task fails
-io <identifier> Optional.
-verbose
Generic options supported are
-conf <configuration file> specify an application configuration file
-D <property=value> use value for given property
-fs <local|namenode:port> specify a namenode
-jt <local|jobtracker:port> specify a job tracker
-files <comma separated list of files> specify comma separated files to be copied to the map reduce cluster
-libjars <comma separated list of jars> specify comma separated jar files to include in the classpath.
-archives <comma separated list of archives> specify comma separated archives to be unarchived on the compute machines.
The general command line syntax is
bin/hadoop command [genericOptions] [commandOptions]
For more details about these options:
Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info
Streaming Command Failed!
Error in mr(map = map, reduce = reduce, combine = combine, vectorized.reduce, :
hadoop streaming failed with error code 1
高度赞赏快速和详细的回复...
您传递给Keyval的数据将其视为向量,而不是单个实体。尝试从下面的代码中解释。
本地尝试
-
加载数据
data(mtcars)
-
查看几条数据线
head(mtcars) hpTest=mtcars$hp # taking required data print(hpTest)
-
最终总和
sum(hpTest[which(hpTest>150)]) # 2804
运行在Hadoop-MapReduce上
-
导出环境变量
# requied Sys.setenv(HADOOP_HOME="/home/trendwise/apache/hadoop-1.0.4"); Sys.setenv(HADOOP_CMD="/home/trendwise/apache/hadoop-1.0.4/bin/hadoop"); #optional Sys.setenv(HADOOP_BIN="/home/trendwise/apache/hadoop-1.0.4/bin"); Sys.setenv(HADOOP_CONF_DIR="/home/trendwise/apache/hadoop-1.0.4/conf"); Sys.setenv(HADOOP_STREAMING='/home/trendwise/apache/hadoop-1.0.4/contrib/streaming/hadoop-streaming-1.0.4.jar') Sys.setenv(LD_LIBRARY_PATH="/lib:/lib/x86_64-linux-gnu:/lib64:/usr/lib:/usr/lib64:/usr/local/lib:/usr/local/lib64:/usr/lib/jvm/jdk1.7.0_10/lib:/usr/lib/jvm/jdk1.7.0_10/jre/lib:/usr/lib/jvm/jdk1.7.0_10/jre/lib/amd64:/usr/lib/jvm/jdk1.7.0_10/jre/lib/amd64/server");
-
加载库
library(rmr2) library(rhdfs)
-
初始 化
hdfs.init()
-
将输入放入 HDFS
hpInput = to.dfs(mtcars$hp)
-
运行MapReduce。
mapReduceResult <- mapreduce(input=hpInput, map=function(k, v) { keyval( rep(1,length(which(inputData > 150))) ,v[which(v>150)] )} , reduce=function(k2, v2){ keyval(k2, sum(v2))}
-
查看 MR 输出
from.dfs(mapReduceResult)
-
输出
$key [1] 1 $val [1] 2804
您可以在最新的 RStudio 中使用内置调试功能。只需以本地 MR 方式重写代码即可