Pyspark-遍历structType和ArrayType,在structfield中执行类型转换



我是pyspark的新手,这个问题让我感到困惑。基本上,我正在寻找一种可扩展的方法,通过structType或ArrayType循环类型转换。

我的数据模式示例:

root
|-- _id: string (nullable = true)
|-- created: timestamp (nullable = true)
|-- card_rates: struct (nullable = true)
|    |-- rate_1: integer (nullable = true)
|    |-- rate_2: integer (nullable = true)
|    |-- rate_3: integer (nullable = true)
|    |-- card_fee: integer (nullable = true)
|    |-- payment_method: string (nullable = true)
|-- online_rates: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- rate_1: integer (nullable = true)
|    |    |-- rate_2: integer (nullable = true)
|    |    |-- online_fee: double (nullable = true)
|-- updated: timestamp (nullable = true)

正如您在这里看到的,card_rates是struct,online_rates是struct的数组。我正在寻找方法来遍历上面的所有字段,并有条件地对它们进行类型转换。理想情况下,如果它应该是数字,它应该被转换为双精度,如果它是字符串,它应该转换为字符串。我需要循环,因为那些rate_*字段可能会随着时间的推移而增长。

但就目前而言,我满足于能够循环它们并将所有它们类型转换为字符串,因为我对pyspark非常陌生,并且仍在尝试了解它。

我想要的输出模式:

root
|-- _id: string (nullable = true)
|-- created: timestamp (nullable = true)
|-- card_rates: struct (nullable = true)
|    |-- rate_1: double (nullable = true)
|    |-- rate_2: double (nullable = true)
|    |-- rate_3: double (nullable = true)
|    |-- card_fee: double (nullable = true)
|    |-- payment_method: string (nullable = true)
|-- online_rates: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- rate_1: double (nullable = true)
|    |    |-- rate_2: double (nullable = true)
|    |    |-- online_fee: double (nullable = true)
|-- updated: timestamp (nullable = true)

我不知道该怎么做了。

我从这里得到了参考:PySpark将数组中的结构字段转换为字符串

但是这个解决方案对字段进行了硬编码,并没有真正在字段上循环。

请帮忙。

以下是一个借助StructType.simpleString_parse_datatype_string内置函数的解决方案:

from pyspark.sql.types import *
df_schema = StructType([
StructField("_id", StringType(), True),
StructField("created", TimestampType(), True),
StructField("card_rates", StructType([
StructField("rate_1", IntegerType(), True),
StructField("rate_2", IntegerType(), True),
StructField("rate_3", IntegerType(), True),
StructField("card_fee", IntegerType(), True),
StructField("card_fee", IntegerType(), True)])),
StructField("online_rates", ArrayType(
StructType(
[
StructField("rate_1", IntegerType(),True),
StructField("rate_2", IntegerType(),True),
StructField("online_fee", DoubleType(),True)
]),True),True),
StructField("updated", TimestampType(), True)])
schema_str = df_schema.simpleString() # this gives -> struct<_id:string,created:timestamp,card_rates:struct<rate_1:int,rate_2:int,rate_3:int,card_fee:int, card_fee:int>,online_rates:array<struct<rate_1:int,rate_2:int,online_fee:double>>,updated:timestamp>
double_schema = schema_str.replace(':int', ':double')
# convert back to StructType
final_schema = _parse_datatype_string(double_schema)
final_schema
  1. 首先使用schema.simpleString将模式转换为简单字符串
  2. 然后用:double替换所有:int
  3. 最后使用_parse_datatype_string将修改后的字符串模式转换为StructType

更新:

为了避免@jxc指出的回溯问题,更好的解决方案是对元素进行递归扫描,如下所示:

def transform_schema(schema):
if schema == None:
return StructType()
updated = []
for f in schema.fields:
if isinstance(f.dataType, IntegerType):
# if IntegerType convert to DoubleType
updated.append(StructField(f.name, DoubleType(), f.nullable))
elif isinstance(f.dataType, ArrayType):
# if ArrayType unpack the array type(elementType), do recursion then wrap results with ArrayType 
updated.append(StructField(f.name, ArrayType(transform_schema(f.dataType.elementType))))
elif isinstance(f.dataType, StructType):
# if StructType do recursion
updated.append(StructField(f.name, transform_schema(f.dataType)))
else:
# else handle all the other cases i.e TimestampType, StringType etc
updated.append(StructField(f.name, f.dataType, f.nullable))   
return StructType(updated)
# call the function with your schema
transform_schema(df_schema)

解释:函数遍历架构(StructType(上的每个项,并尝试将int字段(StructField(转换为double。最后,将转换后的模式(StructType(传递到上面的层(父StructType(。

输出:

StructType(List(
StructField(_id,StringType,true),
StructField(created,TimestampType,true),
StructField(card_rates,
StructType(List(StructField(rate_1,DoubleType,true),
StructField(rate_2,DoubleType,true),
StructField(rate_3,DoubleType,true),
StructField(card_fee,DoubleType,true),
StructField(card_fee,DoubleType,true))),true),
StructField(online_rates,ArrayType(
StructType(List(
StructField(rate_1,DoubleType,true),
StructField(rate_2,DoubleType,true),
StructField(online_fee,DoubleType,true))),true),true),
StructField(updated,TimestampType,true)))

更新:(2020-02-02(

这里有一个关于如何将新模式与现有数据帧一起使用的示例:

updated_schema = transform_schema(df.schema)
# cast each column to the new type
select_expr = [df[f.name].cast(f.dataType) for f in updated_schema.fields]
df.select(*select_expr).show()

最新更新