我正在努力创建多个映射器和化简器。两个映射器和化简器完成它的工作,并将输出保存在HDFS输出文件夹中。现在,我需要从第三个化简器从HDFS中读取这两个简化的文件,但不知道该怎么做。我正在使用Rhipe进行地图缩减。代码如下:
reduce.test <- (expression(
pre = {
total <- 0
#read file from previous reducers -- this didn't work
pos.mod <- readLines("/path/to/file/in/hdfs")
#error received: Error in file(con, "r"); cannot open connection
},
reduce = { total <- sum(total, unlist(reduce.values)) },
post = { rhcollect(reduce.keys, total) }
))
好吧,这只是使用 R 处理大数据集的另一种选择,而不是使用 Rhipe
,我有一些使用 Hadoop流的经验,这对我来说效果非常好。
简而言之,您可以使用任何语言来编写使用Hadoop Streaming的mapreduce作业。
这是我之前编写的一些代码,只是为了让您简要了解它在 R 中的外观。
您可以自定义自己的myfunction
它应该几乎就在那里。
#!/usr/bin/Rscript
library(dplyr)
f <- file("stdin")
open(f, open="r")
mydelimiter <- 't'
mydelimiter_col <- rawToChar(as.raw(1))
myfunction <- function(inputarray){
...
return(result)
}
# FIRST LINE
firstline <- readLines(f, n=1)
fields <- unlist(strsplit(firstline, split=mydelimiter))
mykey_current <- fields[1]
myvalue_array <- array(fields[2])
while( length(line<-readLines(f, n=1)) > 0){
fields <- unlist(strsplit(line, split=mydelimiter))
mykey_new <- fields[1]
myvalue_new <- fields[2]
if (identical(mykey_new, mykey_current)) {
# Same Key: append new value to existing value array
myvalue_array <- c(myvalue_array, myvalue_new)
} else {
# Different Key
# (1): process existing values
result <- myfunction(myvalue_array)
cat(mykey_current); cat(mydelimiter)
cat(result); cat('n')
# (2): reset key, value
mykey_current <- mykey_new
myvalue_array <- array(myvalue_new)
}
}
# LAST LINE
result <- myfunction(myvalue_array)
cat(mykey_current); cat(mydelimiter)
cat(result); cat('n')
运行 hadoop 流式处理作业。我写了一个看起来像这样的小bash:
#!/bin/bash
hdfs dfs -rmr <hdfspath_output>
hadoop jar
/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.3.0-mr1-cdh5.1.0.jar
-files mapper.py,reducer.R
-input <hdfspath_input>
-output <hdfspath_output>
-mapper mapper.py
-reducer reducer.R