Spark:根据s3文件中的字段动态生成查询



过度简化场景:在s3文件中生成月度数据的流程。每个月运行的字段数量可能不同。基于s3中的这些数据,我们将数据加载到一个表中,然后手动运行SQL(因为每次运行中字段的数量可能会随着几列的添加或删除而变化(。对这些数据有更多的计算/转换,但为了让启动器Im呈现用例的更简单版本。

方法:考虑到无模式的性质,由于s3文件中的字段数量在每次运行中可能会有所不同,添加/删除的字段很少,这需要每次在SQL中手动更改,因此我计划探索Spark/Scala,这样我们就可以直接从s3中读取数据,并根据字段动态生成SQL。

查询:如何在scala/spark SQL/dataframe中实现这一点?s3文件只包含每次运行所需的字段。因此,从s3中读取动态字段没有问题,因为它由数据帧处理。问题是我们如何生成要处理的SQL数据帧-API/spark-SQL代码。

我可以通过数据帧读取s3文件,并将数据帧注册为createOrReplaceTempView来编写SQL,但我认为在下次运行时在s3中添加新字段时手动更改spark SQL没有帮助。动态生成sql的最佳方法是什么/处理问题的更好方法是什么?

用例1:

  • 首次运行

dataframe:customer,1st_month_count(这里dataframe直接指向s3,它只有必需的属性(

--sample code
SELECT customer,sum(month_1_count)
FROM dataframe
GROUP BY customer
--Dataframe API/SparkSQL
dataframe.groupBy("customer").sum("month_1_count").show()
  • 第二次运行-增加了一列

dataframe:customer,month_1_count,month_2_count((这里dataframe直接指向s3,它只有必需的属性(

--Sample SQL
SELECT customer,sum(month_1_count),sum(month_2_count)
FROM dataframe
GROUP BY customer
--Dataframe API/SparkSQL
dataframe.groupBy("customer").sum("month_1_count","month_2_count").show() 

我是Spark/Scala的新手,如果你能提供指导,我可以进一步探索,那将是很有帮助的。

听起来你想在数据帧模式中出现的新列上一遍又一遍地执行相同的操作吗?这项工作:

from pyspark.sql import functions
#search for column names you want to sum, I put in "month"
column_search = lambda col_names: 'month' in col_names
#get column names of temp dataframe w/ only the columns you want to sum
relevant_columns = original_df.select(*filter(column_search, original_df.columns)).columns
#create dictionary with relevant column names to be passed to the agg function
columns = {col_names: "sum" for col_names in relevant_columns}
#apply agg function with your groupBy, passing in columns dictionary
grouped_df = original_df.groupBy("customer").agg(columns)
#show result
grouped_df.show()

一些重要的概念可以帮助你学习:

  1. DataFrames具有存储在列表中的数据属性:dataframe.columns

  2. 可以将函数应用于列表以创建新列表,如"column_search"中所示

  3. Agg函数接受字典中的多个表达式,如下所述,这就是我传递到"列"中的内容

  4. Spark是惰性的,所以在您执行类似show((的操作之前,它不会更改数据状态或执行操作。这意味着,像我一样编写临时数据帧以使用类似数据帧列的一个元素并不昂贵,即使如果您习惯了SQL,它可能看起来效率低下。

最新更新