我正在Hadoop的YARN上运行Spark。这种转换是如何工作的?collect()是否发生在转换之前?
我还需要在每个从属节点上安装Python和R,以便转换工作?我很难找到这方面的文件。
toPandas
(PySpark)/as.data.frame
(SparkR)
在创建本地数据帧之前,必须收集数据。例如,toPandas
方法如下:
def toPandas(self):
import pandas as pd
return pd.DataFrame.from_records(self.collect(), columns=self.columns)
您需要在每个节点上安装具有所有依赖项的Python。
SparkR对应项(as.data.frame
)只是collect
的别名。
在这两种情况下,数据都是collected
到驱动程序节点,并转换为本地数据结构(Python和R中分别为pandas.DataFrame
和base::data.frame
)。
矢量化用户定义函数
由于Spark 2.3.0PySpark还提供了一组pandas_udf
(SCALAR
、GROUPED_MAP
、GROUPED_AGG
),它们对定义的数据块进行并行操作
SCALAR
变体情况下的分区GROUPED_MAP
和GROUPED_AGG
情况下的分组表达式
每个区块由表示
- 在CCD_ 17和CCD_ 18变体的情况下的一个或多个CCD_
- 在
GROUPED_MAP
变体的情况下,单个pandas.core.frame.DataFrame
类似地,由于Spark 2.0.0,SparkR提供了分别在由分区和分组表达式定义的data.frames
上操作的dapply
和gapply
函数。
上述功能:
- 不要向司机收取费用。除非数据仅包含单个分区(即具有
coalesce(1)
)或分组表达式是琐碎的(即groupBy(lit(1))
),否则不存在单个节点瓶颈 - 在相应执行器的内存中加载相应的块。因此,它受到每个执行器上可用的单个块/内存大小的限制