所以我有此流数据框架,我正在尝试将此'customer_ids'列施加到一个简单的字符串。
schema = StructType()
.add("customer_ids", MapType(StringType(), StringType()))
.add("date", TimestampType())
original_sdf = spark.readStream.option("maxFilesPerTrigger", 800)
.load(path=source, ftormat="parquet", schema=schema)
.select('customer_ids', 'date')
此转换的目的是按本专栏进行分组,并像以下那样的max(日期)
将其分组original_sdf.groupBy('customer_ids')
.agg(max('date'))
.writeStream
.trigger(once=True)
.format("memory")
.queryName('query')
.outputMode("complete")
.start()
但是我得到了这个例外
AnalysisException: u'expression `customer_ids` cannot be used as a grouping expression because its data type map<string,string> is not an orderable data type.
如何施放此类流数据框列或其他任何方法来分组此列?
tl; dr 使用getItem
方法访问MapType
列中的每个键的值。
真正的问题是您要groupBy
的密钥,因为MapType
列可以具有各种键。每个键都可以是带有MAP列中值的列。
您可以使用column.getItem方法(或类似的Python Voodoo)访问密钥:
getItem(键:any):colum 表达式从数组中获取ordinal ordin的origation,或者通过maptype中的键获取值。
(我使用Scala,然后将其转换为Pyspark作为家庭练习)
val ds = Seq(Map("hello" -> "world")).toDF("m")
scala> ds.show(false)
+-------------------+
|m |
+-------------------+
|Map(hello -> world)|
+-------------------+
scala> ds.select($"m".getItem("hello") as "hello").show
+-----+
|hello|
+-----+
|world|
+-----+