PySpark到Pandas数据帧转换:转换时数据类型出错



假设我在spark中有一个数据帧abc,如下所示:

ID    Trxn_Date    Order_Date    Sales_Rep  Order_Category   Sales_Amount   Discount
100   2021-03-24   2021-03-17    Mathew     DailyStaples       1000           1.50
133   2021-01-22   2021-01-12    Camelia    Medicines          2000           0.50

目标:

为每个数据类型随机选取一个,并逐列查找其最小值和最大值。

- For `numerical` column it should also compute the sum or average.
- For `string` column it should compute the maximum and minimum length 

创建另一个具有以下结构的数据帧:

Table_Name        Column_Name         Min              Max     Sum
abc              Trxn_Date      2021-01-22   2021-03-24
abc              Sales_Rep            6              7              <----str.len('Mathew') = 6 and that of 'Camelia' is 7
abc              Sales_Amount       1000            2000     3000

我正在使用以下代码,但它正在拾取所有列。此外,当我在databrics/PySpark环境中运行此程序时,我会收到如下错误。

table_lst = ['table_1','table_2']
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df_list = []
for i in table_lst:
sdf_i = spark.sql("SELECT * FROM schema_name.{0}".format(i))
df_i = sdf_i.select("*").toPandas()
df_list.append(df_i)
d = {}
for i,j in zip(table_name,dfs):
d[i] = j
df_concat = []
for k,v in d.items():
val_df = {}
for i,j in zip(v.columns,v.dtypes):
if 'object' in str(j):
max_s = v[i].map(len).max()
min_s = v[i].map(len).min()
val_df[k+'-'+i+'_'+'Max_String_L']= max_s
val_df[k+'-'+i+'_'+'Min_String_L']= min_s
elif 'int' or 'float' in str(j):
max_int = v[i].max()      <------Error line as indicated in Databricks
min_int = v[i].min()
val_df[k+'-'+i+'Max_Num'] = max_int
val_df[k+'-'+i+'_'+'Min_Num'] = min_int
elif 'datetime' in str(j):
max_date = v[i].max()
min_date = v[i].min()
val_df[k+'-'+i+'_'+'Max_Date'] = max_date
val_df[k+'-'+i+'_'+'Min_Date'] = min_date
else:
print('left anythg?')
df_f_d = pd.DataFrame.from_dict(val_df,orient='index').reset_index()
df_concat.append(df_f_d)

当我在databrics pyspark上运行此代码时,我会得到以下错误:

TypeError: '>=' not supported between instances of 'float' and 'str' 

此外,上面的代码并没有抛出如上所述的结果数据帧。

我怀疑在将sparkDF转换为pandas时,所有数据类型都被转换为string

那么,如何解决这个问题呢?此外,是否可以修改上述代码以实现目标?

我的解决方案有点长(可能是根据需求预期的(,您可以根据需要对每个步骤进行调试。总体思路是

  1. 区分哪列是字符串,哪列是数字
  2. 用CCD_ 5函数求出各列的min-max
  3. 然而,describe不计算总和和平均值,所以我们必须分别进行聚合
# a.csv
# ID,Trxn_Date,Order_Date,Sales_Rep,Order_Category,Sales_Amount,Discount
# 100,2021-03-24,2021-03-17,Mathew,DailyStaples,1000,1.50
# 133,2021-01-22,2021-01-12,Camelia,Medicines,2000,0.50
from pyspark.sql import functions as F
df = spark.read.csv('a.csv', header=True, inferSchema=True
all_cols = [col[0] for col in df.dtypes]
date_cols = ['Trxn_Date', 'Order_Date'] # Spark doesn't infer DateType so I have to handle it manually. You can ignore if your original schema already has it.
str_cols = [col[0] for col in df.dtypes if col[1] == 'string' and col[0] not in date_cols]
num_cols = [col[0] for col in df.dtypes if col[1] in ['int', 'double']]
# replace actual string values with its length
for col in str_cols:
df = df.withColumn(col, F.length(col))
# calculate min max and transpose dataframe
df1 = (df
.describe()
.where(F.col('summary').isin('min', 'max'))
.withColumn('keys', F.array([F.lit(c) for c in all_cols]))
.withColumn('values', F.array([F.col(c) for c in all_cols]))
.withColumn('maps', F.map_from_arrays('keys', 'values'))
.select('summary', F.explode('maps').alias('col', 'value'))
.groupBy('col')
.agg(
F.collect_list('summary').alias('keys'),
F.collect_list('value').alias('values')
)
.withColumn('maps', F.map_from_arrays('keys', 'values'))
.select('col', 'maps.min', 'maps.max')
)
df1.show(10, False)
# +--------------+----------+----------+
# |col           |min       |max       |
# +--------------+----------+----------+
# |Sales_Amount  |1000      |2000      |
# |Sales_Rep     |6         |7         |
# |Order_Category|9         |12        |
# |ID            |100       |133       |
# |Discount      |0.5       |1.5       |
# |Trxn_Date     |2021-01-22|2021-03-24|
# |Order_Date    |2021-01-12|2021-03-17|
# +--------------+----------+----------+
# calculate sum and transpose dataframe
df2 = (df
.groupBy(F.lit(1).alias('sum'))
.agg(*[F.sum(c).alias(c) for c in num_cols])
.withColumn('keys', F.array([F.lit(c) for c in num_cols]))
.withColumn('values', F.array([F.col(c) for c in num_cols]))
.withColumn('maps', F.map_from_arrays('keys', 'values'))
.select(F.explode('maps').alias('col', 'sum'))
)
df2.show(10, False)
# +------------+------+
# |col         |sum   |
# +------------+------+
# |ID          |233.0 |
# |Sales_Amount|3000.0|
# |Discount    |2.0   |
# +------------+------+
# Join them together to get final dataframe
df1.join(df2, on=['col'], how='left').show()
# +--------------+----------+----------+------+
# |           col|       min|       max|   sum|
# +--------------+----------+----------+------+
# |  Sales_Amount|      1000|      2000|3000.0|
# |     Sales_Rep|         6|         7|  null|
# |Order_Category|         9|        12|  null|
# |            ID|       100|       133| 233.0|
# |      Discount|       0.5|       1.5|   2.0|
# |     Trxn_Date|2021-01-22|2021-03-24|  null|
# |    Order_Date|2021-01-12|2021-03-17|  null|
# +--------------+----------+----------+------+

最新更新