我们在Delta源中有一些嵌套结构的数据。在本例中,我们将重点关注Delta中的一个名为status
的特定字段,该字段有许多子字段:commissionDate
,decommissionDate
,isDeactivated
,isPreview
,terminationDate
。
在我们的转换中,我们使用PySpark读取Delta文件,使用df.toPandas()
将DF转换为pandas,并使用pandas API对该pandas DF进行操作。一旦我们有了这个pandas DF,我们就想在不使用行迭代的情况下访问它的字段。
当使用inventory_df["status"][0]
查询时,Pandas中的数据如下所示(即inventory_df["status"]
是一个列表):
Row(commissionDate='2011-07-24T00:00:00+00:00', decommissionDate='2013-07-15T00:00:00+00:00', isDeactivated=True, isPreview=False, terminationDate=None)
我们发现使用行迭代是成功的:
unit_df["Active"] = [
not row["isDeactivated"] for row in inventory_df["status"]
]
,但是每次我们想要访问inventory_df
中的数据时,我们都必须使用行迭代。这样更冗长,效率更低。
我们希望能够这样做:
unit_df["Active"] = [
not inventory_df["status.isDeactivated"]
]
,它类似于Spark的解构方法,并允许一次访问所有的行,但似乎没有等效的pandas逻辑。
PySpark中的数据有一个类似status: struct<commissionDate:string,decommissionDate:string,isDeactivated:boolean,isPreview:boolean,terminationDate:string>
的格式,我们可以使用上面提到的格式,选择一个类似df.select("status.isDeactivated")
的子列。
如何使用pandas实现此方法?
这可能会让你到达你认为你在的地方:
unit_df["Active"] = unit_df["Active"].apply(lambda x: pd.DataFrame(x.asDict()))
从这里我将做:
unit_df = pd.concat([pd.concat(unif_df["Active"], ignore_index=True), unit_df], axis=1)
这将得到一个单一的pd.DataFrame
,现在有commissiondate
,decomissiondate
等列。