如何将maptype(StringType,StringType)的列转换为StringType



所以我有此流数据框架,我正在尝试将此'customer_ids'列施加到一个简单的字符串。

schema = StructType()
    .add("customer_ids", MapType(StringType(), StringType()))
    .add("date", TimestampType())
original_sdf = spark.readStream.option("maxFilesPerTrigger", 800)
    .load(path=source, ftormat="parquet", schema=schema)
    .select('customer_ids', 'date')

此转换的目的是按本专栏进行分组,并像以下那样的max(日期)

将其分组
original_sdf.groupBy('customer_ids')
  .agg(max('date')) 
  .writeStream 
  .trigger(once=True) 
  .format("memory") 
  .queryName('query') 
  .outputMode("complete") 
  .start()

但是我得到了这个例外

AnalysisException: u'expression `customer_ids` cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type.

如何施放此类流数据框列或其他任何方法来分组此列?

tl; dr 使用getItem方法访问MapType列中的每个键的值。


真正的问题是您要groupBy的密钥,因为MapType列可以具有各种键。每个键都可以是带有MAP列中值的列。

您可以使用column.getItem方法(或类似的Python Voodoo)访问密钥:

getItem(键:any):colum 表达式从数组中获取ordinal ordin的origation,或者通过maptype中的键获取值。

(我使用Scala,然后将其转换为Pyspark作为家庭练习)

val ds = Seq(Map("hello" -> "world")).toDF("m")
scala> ds.show(false)
+-------------------+
|m                  |
+-------------------+
|Map(hello -> world)|
+-------------------+
scala> ds.select($"m".getItem("hello") as "hello").show
+-----+
|hello|
+-----+
|world|
+-----+

相关内容

  • 没有找到相关文章

最新更新