如何使用Rhipe (R)从Reducer中的HDFS读取文件



我正在努力创建多个映射器和化简器。两个映射器和化简器完成它的工作,并将输出保存在HDFS输出文件夹中。现在,我需要从第三个化简器从HDFS中读取这两个简化的文件,但不知道该怎么做。我正在使用Rhipe进行地图缩减。代码如下:

reduce.test <- (expression(
  pre = { 
     total <- 0 
     #read file from previous reducers -- this didn't work
     pos.mod <- readLines("/path/to/file/in/hdfs")  
    #error received: Error in file(con, "r"); cannot open connection
  }, 
  reduce = { total <- sum(total, unlist(reduce.values)) }, 
  post = { rhcollect(reduce.keys, total) }
))

好吧,这只是使用 R 处理大数据集的另一种选择,而不是使用 Rhipe ,我有一些使用 Hadoop流的经验,这对我来说效果非常好。

简而言之,您可以使用任何语言来编写使用Hadoop Streaming的mapreduce作业。

这是我之前编写的一些代码,只是为了让您简要了解它在 R 中的外观。

您可以自定义自己的myfunction它应该几乎就在那里。

#!/usr/bin/Rscript
library(dplyr)
f <- file("stdin")
open(f, open="r")
mydelimiter <- 't'
mydelimiter_col <- rawToChar(as.raw(1))
myfunction <- function(inputarray){
    ...
    return(result)
}
# FIRST LINE
firstline <- readLines(f, n=1)
fields <- unlist(strsplit(firstline, split=mydelimiter))
mykey_current <- fields[1]
myvalue_array <- array(fields[2])
while( length(line<-readLines(f, n=1)) > 0){
    fields <- unlist(strsplit(line, split=mydelimiter))
    mykey_new <- fields[1]
    myvalue_new <- fields[2]
    if (identical(mykey_new, mykey_current)) {
        # Same Key: append new value to existing value array
        myvalue_array <- c(myvalue_array, myvalue_new)
    } else {
        # Different Key
        # (1): process existing values
        result <- myfunction(myvalue_array)
        cat(mykey_current); cat(mydelimiter)
        cat(result); cat('n')
        # (2): reset key, value
        mykey_current <- mykey_new
        myvalue_array <- array(myvalue_new)
    }
}
# LAST LINE
result <- myfunction(myvalue_array)
cat(mykey_current); cat(mydelimiter)
cat(result); cat('n')

运行 hadoop 流式处理作业。我写了一个看起来像这样的小bash:

#!/bin/bash
hdfs dfs -rmr <hdfspath_output>
hadoop jar 
/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.3.0-mr1-cdh5.1.0.jar 
-files mapper.py,reducer.R 
-input <hdfspath_input> 
-output <hdfspath_output> 
-mapper mapper.py 
-reducer reducer.R

相关内容

  • 没有找到相关文章

最新更新