在Pyspark中执行foreachPartition后获得空数据框



我在PySpark有点新,我试图在我的数据框中执行foreachPartition函数,然后我想用相同的数据框执行另一个函数。问题是,在使用foreachPartition函数之后,我的数据框变为空,所以我不能对它做任何其他事情。我的代码如下所示:

def my_random_function(partition, parameters):
#performs something with the dataframe
#does not return anything
my_py_spark_dataframe.foreachPartition(
lambda partition: my_random_function(partition, parameters))

有人能告诉我我怎么能执行这个foreachPartition,也使用相同的数据帧来执行其他功能?

我看到一些用户在谈论使用df.toPandas().copy()复制数据框,但在我的情况下,这会导致一些执行问题,所以我想使用相同的数据框,而不是创建一个新的。

提前感谢!

不清楚您正在尝试的操作;下面是foreachPartition的一个示例用法:

样本数据是来自三大洲的国家列表:

+---------+-------+
|Continent|Country|
+---------+-------+
|       NA|    USA|
|       NA| Canada|
|       NA| Mexico|
|       EU|England|
|       EU| France|
|       EU|Germany|
|     ASIA|  India|
|     ASIA|  China|
|     ASIA|  Japan|
+---------+-------+

以下代码按"Continent"对数据进行分区,使用foreachPartition迭代每个分区,并写入";Country"该特定分区(即大陆)的每个文件的名称。

df = spark.createDataFrame(data=[["NA", "USA"], ["NA", "Canada"], ["NA", "Mexico"], ["EU", "England"], ["EU", "France"], ["EU", "Germany"], ["ASIA", "India"], ["ASIA", "China"], ["ASIA", "Japan"]], schema=["Continent", "Country"])
df.withColumn("partition_id", F.spark_partition_id()).show()
df = df.repartition(F.col("Continent"))
df.withColumn("partition_id", F.spark_partition_id()).show()
def write_to_file(rows):
for row in rows:
with open(f"/content/sample_data/{row.Continent}.txt", "a+") as f:
f.write(f"{row.Country}n")
df.foreachPartition(write_to_file)

输出:

三个文件:每个分区一个。

!ls -1 /content/sample_data/
ASIA.txt
EU.txt
NA.txt

每个文件都有该洲(分区)的国家名称:

!cat /content/sample_data/ASIA.txt
India
China
Japan
!cat /content/sample_data/EU.txt
England
France
Germany
!cat /content/sample_data/NA.txt
USA
Canada
Mexico

最新更新