假设我在spark中有一个数据帧abc
,如下所示:
ID Trxn_Date Order_Date Sales_Rep Order_Category Sales_Amount Discount
100 2021-03-24 2021-03-17 Mathew DailyStaples 1000 1.50
133 2021-01-22 2021-01-12 Camelia Medicines 2000 0.50
目标:
为每个数据类型随机选取一个列,并逐列查找其最小值和最大值。
- For `numerical` column it should also compute the sum or average.
- For `string` column it should compute the maximum and minimum length
创建另一个具有以下结构的数据帧:
Table_Name Column_Name Min Max Sum
abc Trxn_Date 2021-01-22 2021-03-24
abc Sales_Rep 6 7 <----str.len('Mathew') = 6 and that of 'Camelia' is 7
abc Sales_Amount 1000 2000 3000
我正在使用以下代码,但它正在拾取所有列。此外,当我在databrics/PySpark环境中运行此程序时,我会收到如下错误。
table_lst = ['table_1','table_2']
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df_list = []
for i in table_lst:
sdf_i = spark.sql("SELECT * FROM schema_name.{0}".format(i))
df_i = sdf_i.select("*").toPandas()
df_list.append(df_i)
d = {}
for i,j in zip(table_name,dfs):
d[i] = j
df_concat = []
for k,v in d.items():
val_df = {}
for i,j in zip(v.columns,v.dtypes):
if 'object' in str(j):
max_s = v[i].map(len).max()
min_s = v[i].map(len).min()
val_df[k+'-'+i+'_'+'Max_String_L']= max_s
val_df[k+'-'+i+'_'+'Min_String_L']= min_s
elif 'int' or 'float' in str(j):
max_int = v[i].max() <------Error line as indicated in Databricks
min_int = v[i].min()
val_df[k+'-'+i+'Max_Num'] = max_int
val_df[k+'-'+i+'_'+'Min_Num'] = min_int
elif 'datetime' in str(j):
max_date = v[i].max()
min_date = v[i].min()
val_df[k+'-'+i+'_'+'Max_Date'] = max_date
val_df[k+'-'+i+'_'+'Min_Date'] = min_date
else:
print('left anythg?')
df_f_d = pd.DataFrame.from_dict(val_df,orient='index').reset_index()
df_concat.append(df_f_d)
当我在databrics pyspark上运行此代码时,我会得到以下错误:
TypeError: '>=' not supported between instances of 'float' and 'str'
此外,上面的代码并没有抛出如上所述的结果数据帧。
我怀疑在将sparkDF
转换为pandas
时,所有数据类型都被转换为string
。
那么,如何解决这个问题呢?此外,是否可以修改上述代码以实现目标?
我的解决方案有点长(可能是根据需求预期的(,您可以根据需要对每个步骤进行调试。总体思路是
- 区分哪列是字符串,哪列是数字
- 用CCD_ 5函数求出各列的min-max
- 然而,
describe
不计算总和和平均值,所以我们必须分别进行聚合
# a.csv
# ID,Trxn_Date,Order_Date,Sales_Rep,Order_Category,Sales_Amount,Discount
# 100,2021-03-24,2021-03-17,Mathew,DailyStaples,1000,1.50
# 133,2021-01-22,2021-01-12,Camelia,Medicines,2000,0.50
from pyspark.sql import functions as F
df = spark.read.csv('a.csv', header=True, inferSchema=True
all_cols = [col[0] for col in df.dtypes]
date_cols = ['Trxn_Date', 'Order_Date'] # Spark doesn't infer DateType so I have to handle it manually. You can ignore if your original schema already has it.
str_cols = [col[0] for col in df.dtypes if col[1] == 'string' and col[0] not in date_cols]
num_cols = [col[0] for col in df.dtypes if col[1] in ['int', 'double']]
# replace actual string values with its length
for col in str_cols:
df = df.withColumn(col, F.length(col))
# calculate min max and transpose dataframe
df1 = (df
.describe()
.where(F.col('summary').isin('min', 'max'))
.withColumn('keys', F.array([F.lit(c) for c in all_cols]))
.withColumn('values', F.array([F.col(c) for c in all_cols]))
.withColumn('maps', F.map_from_arrays('keys', 'values'))
.select('summary', F.explode('maps').alias('col', 'value'))
.groupBy('col')
.agg(
F.collect_list('summary').alias('keys'),
F.collect_list('value').alias('values')
)
.withColumn('maps', F.map_from_arrays('keys', 'values'))
.select('col', 'maps.min', 'maps.max')
)
df1.show(10, False)
# +--------------+----------+----------+
# |col |min |max |
# +--------------+----------+----------+
# |Sales_Amount |1000 |2000 |
# |Sales_Rep |6 |7 |
# |Order_Category|9 |12 |
# |ID |100 |133 |
# |Discount |0.5 |1.5 |
# |Trxn_Date |2021-01-22|2021-03-24|
# |Order_Date |2021-01-12|2021-03-17|
# +--------------+----------+----------+
# calculate sum and transpose dataframe
df2 = (df
.groupBy(F.lit(1).alias('sum'))
.agg(*[F.sum(c).alias(c) for c in num_cols])
.withColumn('keys', F.array([F.lit(c) for c in num_cols]))
.withColumn('values', F.array([F.col(c) for c in num_cols]))
.withColumn('maps', F.map_from_arrays('keys', 'values'))
.select(F.explode('maps').alias('col', 'sum'))
)
df2.show(10, False)
# +------------+------+
# |col |sum |
# +------------+------+
# |ID |233.0 |
# |Sales_Amount|3000.0|
# |Discount |2.0 |
# +------------+------+
# Join them together to get final dataframe
df1.join(df2, on=['col'], how='left').show()
# +--------------+----------+----------+------+
# | col| min| max| sum|
# +--------------+----------+----------+------+
# | Sales_Amount| 1000| 2000|3000.0|
# | Sales_Rep| 6| 7| null|
# |Order_Category| 9| 12| null|
# | ID| 100| 133| 233.0|
# | Discount| 0.5| 1.5| 2.0|
# | Trxn_Date|2021-01-22|2021-03-24| null|
# | Order_Date|2021-01-12|2021-03-17| null|
# +--------------+----------+----------+------+