我有一个场景,我需要从位于不同位置和不同模式的s3桶中读取许多文件(csv或parquet)。
我这样做的目的是从不同的s3位置提取所有元数据信息,并将其保存为数据框架,并将其保存为s3本身的csv文件。这里的问题是,我有很多s3位置来读取文件(分区)。我的示例s3位置类似于
s3://myRawbucket/source1/filename1/year/month/day/16/f1.parquet
s3://myRawbucket/source2/filename2/year/month/day/16/f2.parquet
s3://myRawbucket/source3/filename3/year/month/day/16/f3.parquet
s3://myRawbucket/source100/filename100/year/month/day/16/f100.parquet
s3://myRawbucket/source150/filename150/year/month/day/16/f150.parquet and .......... so on
所有我需要做的是使用火花代码读取这些许多文件(约200),并应用一些转换,如果需要和提取头信息,计数信息,s3位置信息,数据类型。
什么是有效的方式来读取所有这些文件(不同的模式),并使用spark代码(Dataframe)处理它,并将其保存为csv在s3桶?请容忍我,因为我是新的火花世界。我正在使用python (Pyspark)
我认为你想做的是使用一些Python/Pandas逻辑,并使用Spark并行化作业。赋格很适合这样。您可以通过极少的代码更改将逻辑移植到Spark。让我们先考虑用Python和Pandas定义逻辑,然后再把它带到Spark。
首先设置:
import pandas as pd
df = pd.DataFrame({"x": [1,2,3]})
df.to_parquet("/tmp/1.parquet")
df.to_parquet("/tmp/2.parquet")
df.to_parquet("/tmp/3.parquet")
我们需要一个包含所有文件的小DataFrame,以便与Spark协调作业。例如:
file_paths = pd.DataFrame({"path": ["/tmp/1.parquet",
"/tmp/2.parquet",
"/tmp/3.parquet"]})
现在我们可以创建一个函数来保存每个文件的逻辑。请注意,当我们将它引入Spark时,我们将创建1个"job"。每个文件路径。我们的函数一次只需要处理一个文件。
def process(df:pd.DataFrame) -> pd.DataFrame:
path = df.iloc[0]['path']
tmp = pd.read_parquet(path)
# transformation
tmp['y'] = tmp['x'] + 1
# save
tmp.to_parquet(path)
# summary stats
return pd.DataFrame({"path": [path],
'count': [tmp.shape[0]]})
我们可以测试代码:
process(file_paths)
得到:
path count
/tmp/1.parquet 3
现在我们可以用赋格把它带到Spark。我们只需要transform()
函数将逻辑引入Spark。模式是Spark的一个要求。
import fugue.api as fa
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
out = fa.transform(file_paths, process, schema="path:str,count:int", engine=spark)
# out is a Spark DataFrame
out.show()
输出将是:
+--------------+-----+
| path|count|
+--------------+-----+
|/tmp/1.parquet| 3|
|/tmp/2.parquet| 3|
|/tmp/3.parquet| 3|
+--------------+-----+