r-SparkR dubt和Broken管道异常



嗨,我正在使用纱簇以分布式模式开发SparkR。

我有两个问题:

1)如果我制作了一个包含R行代码和SparkR行代码的脚本,它将只分发SparkR代码还是也分发简单的R

这就是剧本。我读了一个csv,只获得了10万的第一个记录。我清理了它(使用R函数),删除了NA值,并创建了一个SparkR数据帧
这就是它的作用:foreach Lineset取该Lineset出现的每个TimeInterval,并对一些属性(数字属性)求和,然后将它们全部放入矩阵中。

这是带有R和SparkR代码的脚本。独立模式需要7小时,分布式模式需要60小时(被java.net.SocketException:Broken Pipe杀死)

LineSmsInt<-fread("/home/sentiment/Scrivania/LineSmsInt.csv")
Short<-LineSmsInt[1:100000,]
Short[is.na(Short)] <- 0
Short$TimeInterval<-Short$TimeInterval/1000
ShortDF<-createDataFrame(sqlContext,Short)
UniqueLineSet<-unique(Short$LINESET)
UniqueTime<-unique(Short$TimeInterval)
UniqueTime<-as.numeric(UniqueTime)
Row<-length(UniqueLineSet)*length(UniqueTime)
IntTemp<-matrix(nrow =Row,ncol=7)
k<-1
colnames(IntTemp)<-c("LINESET","TimeInterval","SmsIN","SmsOut","CallIn","CallOut","Internet")
Sys.time()
for(i in 1:length(UniqueLineSet)){
  SubSetID<-filter(ShortDF,ShortDF$LINESET==UniqueLineSet[i])
  for(j in 1:length(UniqueTime)){
    SubTime<-filter(SubSetID,SubSetID$TimeInterval==UniqueTime[j])       
    IntTemp[k,1]<-UniqueLineSet[i]
    IntTemp[k,2]<-as.numeric(UniqueTime[j])
    k3<-collect(select(SubTime,sum(SubTime$SmsIn)))
    IntTemp[k,3]<-k3[1,1]
    k4<-collect(select(SubTime,sum(SubTime$SmsOut)))
    IntTemp[k,4]<-k4[1,1]
    k5<-collect(select(SubTime,sum(SubTime$CallIn)))
    IntTemp[k,5]<-k5[1,1]
    k6<-collect(select(SubTime,sum(SubTime$CallOut)))
    IntTemp[k,6]<-k6[1,1]
    k7<-collect(select(SubTime,sum(SubTime$Internet)))
    IntTemp[k,7]<-k7[1,1]
    k<-k+1
  }
  print(UniqueLineSet[i])
  print(i)
}

这是脚本R,唯一改变的是子集函数,当然是一个正常的R数据帧,而不是SparkR数据帧。在独立模式下需要1.30分钟
为什么它在R中如此之快,而在SparkR中却如此之慢?

for(i in 1:length(UniqueLineSet)){
  SubSetID<-subset.data.frame(LineSmsInt,LINESET==UniqueLineSet[i])
  for(j in 1:length(UniqueTime)){
    SubTime<-subset.data.frame(SubSetID,TimeInterval==UniqueTime[j])
    IntTemp[k,1]<-UniqueLineSet[i]
    IntTemp[k,2]<-as.numeric(UniqueTime[j])
    IntTemp[k,3]<-sum(SubTime$SmsIn,na.rm = TRUE)
    IntTemp[k,4]<-sum(SubTime$SmsOut,na.rm = TRUE)
    IntTemp[k,5]<-sum(SubTime$CallIn,na.rm = TRUE)
    IntTemp[k,6]<-sum(SubTime$CallOut,na.rm = TRUE)
    IntTemp[k,7]<-sum(SubTime$Internet,na.rm=TRUE)
    k<-k+1
  }
  print(UniqueLineSet[i])
  print(i)
}

2)第一个脚本在分发模式下被:杀死

java.net.SocketException:断线管道

这种情况有时也会出现:

java.net.SocketTimeoutException:Accept超时

它可能来自错误的配置?建议

谢谢。

不要误解这一点,但这不是一段写得特别好的代码。使用core R已经很低效了,在等式中添加SparkR会使情况变得更糟。

如果我制作了一个包含R行代码和SparkR行代码的脚本,它将只分发SparkR代码还是也分发简单的R?

除非您使用的是在这些结构上操作的分布式数据结构和函数,否则它只是在主机上的单个线程中执行的普通R代码。

为什么它在R中如此之快,而在SparkR中却如此之慢?

对于初学者,您可以为LINESETUniqueTime和列的每个组合执行一个作业。每次Spark扫描所有记录并将数据提取到驱动程序。

此外,使用Spark处理可以在单机内存中轻松处理的数据根本没有意义。在这种情况下运行作业的成本通常远高于实际处理的成本。

建议?

如果你真的想使用SparkR,只需groupByagg:

group_by(Short, Short$LINESET, Short$TimeInterval) %>% agg(
  sum(Short$SmsIn), sum(Short$SmsOut), sum(Short$CallIn),
  sum(Short$CallOut), sum(Short$Internet))

如果您关心丢失的(LINESETTimeInterval)对,请使用joinunionAll填充这些对。

在实践中,它只需使用data.table并在本地聚合:

Short[, lapply(.SD, sum, na.rm=TRUE), by=.(LINESET, TimeInterval)]

最新更新