结构化流"apply"没有输出



代码:

df_streaming = spark 
.readStream 
.format("kafka") 
... 
.load() 
.xxx()
df_streaming = df_streaming 
.groupBy(["name", "height"]) 
.apply(cal_feature)
stream_writer = df_streaming 
.writeStream 
.format("console") 
.start()
stream_writer.awaitTermination()

df_streaming像这样:

name height weight
jack 173    100
tom  175    110
tom  175    115

cal_feature:

@pandas_udf(FEATURE_SCHEMA, PandasUDFType.GROUPED_MAP)
def cal_feature(df):
feature_df = pd.DataFrame(columns=FEATURE_NAMES)
feature_df["name"] = df["name"].iloc[0]
feature_df["height"] = df["height"].iloc[0]
feature_df["max_weight"] = df["weight"].max()

# other complicated processing
xxx...

return feature_df

我知道agg(functions.max("weight"))可以得到它,但我想在cal_feature中使用其他panda函数。

当使用静态数据帧(来自csv文件(时,会有输出。agg(max(也得到了它,结构化流数据帧支持适用吗?

spark-2.4.5蟒蛇-3.7.10

我还没有看到任何在结构化流中的流聚合上使用Grouped Map Pandas UDF的成功证据。我最近写了一个关于结构化流媒体中Pandas Grouped Map UDF的相关问题。我在结构化流中从分组映射Pandas UDF中获得结果/记录输出没有问题,但让它们处理正确的数据集并返回正确的结果(而不是从部分输入中返回许多不正确的结果(有很多问题。然而,我的经验仅限于Python API。如果您成功或了解更多信息,请在此处留下最新消息。

最新更新