在Spark中并行读取来自不同aws S3的多个文件



我有一个场景,我需要从位于不同位置和不同模式的s3桶中读取许多文件(csv或parquet)。

我这样做的目的是从不同的s3位置提取所有元数据信息,并将其保存为数据框架,并将其保存为s3本身的csv文件。这里的问题是,我有很多s3位置来读取文件(分区)。我的示例s3位置类似于

s3://myRawbucket/source1/filename1/year/month/day/16/f1.parquet
s3://myRawbucket/source2/filename2/year/month/day/16/f2.parquet
s3://myRawbucket/source3/filename3/year/month/day/16/f3.parquet
s3://myRawbucket/source100/filename100/year/month/day/16/f100.parquet
s3://myRawbucket/source150/filename150/year/month/day/16/f150.parquet    and .......... so on

所有我需要做的是使用火花代码读取这些许多文件(约200),并应用一些转换,如果需要和提取头信息,计数信息,s3位置信息,数据类型。

什么是有效的方式来读取所有这些文件(不同的模式),并使用spark代码(Dataframe)处理它,并将其保存为csv在s3桶?请容忍我,因为我是新的火花世界。我正在使用python (Pyspark)

我认为你想做的是使用一些Python/Pandas逻辑,并使用Spark并行化作业。赋格很适合这样。您可以通过极少的代码更改将逻辑移植到Spark。让我们先考虑用Python和Pandas定义逻辑,然后再把它带到Spark。

首先设置:

import pandas as pd
df = pd.DataFrame({"x": [1,2,3]})
df.to_parquet("/tmp/1.parquet")
df.to_parquet("/tmp/2.parquet")
df.to_parquet("/tmp/3.parquet")

我们需要一个包含所有文件的小DataFrame,以便与Spark协调作业。例如:

file_paths = pd.DataFrame({"path": ["/tmp/1.parquet",
"/tmp/2.parquet",
"/tmp/3.parquet"]})

现在我们可以创建一个函数来保存每个文件的逻辑。请注意,当我们将它引入Spark时,我们将创建1个"job"。每个文件路径。我们的函数一次只需要处理一个文件。

def process(df:pd.DataFrame) -> pd.DataFrame:
path = df.iloc[0]['path']

tmp = pd.read_parquet(path)

# transformation
tmp['y'] = tmp['x'] + 1

# save
tmp.to_parquet(path)

# summary stats
return pd.DataFrame({"path": [path],
'count': [tmp.shape[0]]})

我们可以测试代码:

process(file_paths)

得到:

path    count
/tmp/1.parquet  3

现在我们可以用赋格把它带到Spark。我们只需要transform()函数将逻辑引入Spark。模式是Spark的一个要求。

import fugue.api as fa
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
out = fa.transform(file_paths, process, schema="path:str,count:int", engine=spark)
# out is a Spark DataFrame
out.show()

输出将是:

+--------------+-----+
|          path|count|
+--------------+-----+
|/tmp/1.parquet|    3|
|/tmp/2.parquet|    3|
|/tmp/3.parquet|    3|
+--------------+-----+

最新更新