嗨,我正在使用纱簇以分布式模式开发SparkR。
我有两个问题:
1)如果我制作了一个包含R行代码和SparkR行代码的脚本,它将只分发SparkR代码还是也分发简单的R
这就是剧本。我读了一个csv,只获得了10万的第一个记录。我清理了它(使用R函数),删除了NA值,并创建了一个SparkR数据帧
这就是它的作用:foreach Lineset取该Lineset出现的每个TimeInterval,并对一些属性(数字属性)求和,然后将它们全部放入矩阵中。
这是带有R和SparkR代码的脚本。独立模式需要7小时,分布式模式需要60小时(被java.net.SocketException
:Broken Pipe杀死)
LineSmsInt<-fread("/home/sentiment/Scrivania/LineSmsInt.csv")
Short<-LineSmsInt[1:100000,]
Short[is.na(Short)] <- 0
Short$TimeInterval<-Short$TimeInterval/1000
ShortDF<-createDataFrame(sqlContext,Short)
UniqueLineSet<-unique(Short$LINESET)
UniqueTime<-unique(Short$TimeInterval)
UniqueTime<-as.numeric(UniqueTime)
Row<-length(UniqueLineSet)*length(UniqueTime)
IntTemp<-matrix(nrow =Row,ncol=7)
k<-1
colnames(IntTemp)<-c("LINESET","TimeInterval","SmsIN","SmsOut","CallIn","CallOut","Internet")
Sys.time()
for(i in 1:length(UniqueLineSet)){
SubSetID<-filter(ShortDF,ShortDF$LINESET==UniqueLineSet[i])
for(j in 1:length(UniqueTime)){
SubTime<-filter(SubSetID,SubSetID$TimeInterval==UniqueTime[j])
IntTemp[k,1]<-UniqueLineSet[i]
IntTemp[k,2]<-as.numeric(UniqueTime[j])
k3<-collect(select(SubTime,sum(SubTime$SmsIn)))
IntTemp[k,3]<-k3[1,1]
k4<-collect(select(SubTime,sum(SubTime$SmsOut)))
IntTemp[k,4]<-k4[1,1]
k5<-collect(select(SubTime,sum(SubTime$CallIn)))
IntTemp[k,5]<-k5[1,1]
k6<-collect(select(SubTime,sum(SubTime$CallOut)))
IntTemp[k,6]<-k6[1,1]
k7<-collect(select(SubTime,sum(SubTime$Internet)))
IntTemp[k,7]<-k7[1,1]
k<-k+1
}
print(UniqueLineSet[i])
print(i)
}
这是脚本R,唯一改变的是子集函数,当然是一个正常的R数据帧,而不是SparkR数据帧。在独立模式下需要1.30分钟
为什么它在R中如此之快,而在SparkR中却如此之慢?
for(i in 1:length(UniqueLineSet)){
SubSetID<-subset.data.frame(LineSmsInt,LINESET==UniqueLineSet[i])
for(j in 1:length(UniqueTime)){
SubTime<-subset.data.frame(SubSetID,TimeInterval==UniqueTime[j])
IntTemp[k,1]<-UniqueLineSet[i]
IntTemp[k,2]<-as.numeric(UniqueTime[j])
IntTemp[k,3]<-sum(SubTime$SmsIn,na.rm = TRUE)
IntTemp[k,4]<-sum(SubTime$SmsOut,na.rm = TRUE)
IntTemp[k,5]<-sum(SubTime$CallIn,na.rm = TRUE)
IntTemp[k,6]<-sum(SubTime$CallOut,na.rm = TRUE)
IntTemp[k,7]<-sum(SubTime$Internet,na.rm=TRUE)
k<-k+1
}
print(UniqueLineSet[i])
print(i)
}
2)第一个脚本在分发模式下被:杀死
java.net.SocketException:断线管道
这种情况有时也会出现:
java.net.SocketTimeoutException:Accept超时
它可能来自错误的配置?建议
谢谢。
不要误解这一点,但这不是一段写得特别好的代码。使用core R已经很低效了,在等式中添加SparkR会使情况变得更糟。
如果我制作了一个包含R行代码和SparkR行代码的脚本,它将只分发SparkR代码还是也分发简单的R?
除非您使用的是在这些结构上操作的分布式数据结构和函数,否则它只是在主机上的单个线程中执行的普通R代码。
为什么它在R中如此之快,而在SparkR中却如此之慢?
对于初学者,您可以为LINESET
、UniqueTime
和列的每个组合执行一个作业。每次Spark扫描所有记录并将数据提取到驱动程序。
此外,使用Spark处理可以在单机内存中轻松处理的数据根本没有意义。在这种情况下运行作业的成本通常远高于实际处理的成本。
建议?
如果你真的想使用SparkR,只需groupBy
和agg
:
group_by(Short, Short$LINESET, Short$TimeInterval) %>% agg(
sum(Short$SmsIn), sum(Short$SmsOut), sum(Short$CallIn),
sum(Short$CallOut), sum(Short$Internet))
如果您关心丢失的(LINESET
,TimeInterval
)对,请使用join
或unionAll
填充这些对。
在实践中,它只需使用data.table
并在本地聚合:
Short[, lapply(.SD, sum, na.rm=TRUE), by=.(LINESET, TimeInterval)]