代码:
df_streaming = spark
.readStream
.format("kafka")
...
.load()
.xxx()
df_streaming = df_streaming
.groupBy(["name", "height"])
.apply(cal_feature)
stream_writer = df_streaming
.writeStream
.format("console")
.start()
stream_writer.awaitTermination()
df_streaming
像这样:
name height weight
jack 173 100
tom 175 110
tom 175 115
和cal_feature
:
@pandas_udf(FEATURE_SCHEMA, PandasUDFType.GROUPED_MAP)
def cal_feature(df):
feature_df = pd.DataFrame(columns=FEATURE_NAMES)
feature_df["name"] = df["name"].iloc[0]
feature_df["height"] = df["height"].iloc[0]
feature_df["max_weight"] = df["weight"].max()
# other complicated processing
xxx...
return feature_df
我知道agg(functions.max("weight"))
可以得到它,但我想在cal_feature
中使用其他panda函数。
当使用静态数据帧(来自csv文件(时,会有输出。agg(max(也得到了它,结构化流数据帧支持适用吗?
spark-2.4.5蟒蛇-3.7.10
我还没有看到任何在结构化流中的流聚合上使用Grouped Map Pandas UDF的成功证据。我最近写了一个关于结构化流媒体中Pandas Grouped Map UDF的相关问题。我在结构化流中从分组映射Pandas UDF中获得结果/记录输出没有问题,但让它们处理正确的数据集并返回正确的结果(而不是从部分输入中返回许多不正确的结果(有很多问题。然而,我的经验仅限于Python API。如果您成功或了解更多信息,请在此处留下最新消息。