我是KSQL的新手,我觉得应该有一种方法可以将数据分组并添加到数组中我从流中获取这些单独的对象,例如
{
"ENRLMT_ID": "I12345",
"STUS_CD": "06",
"STUS_RSN_CD": "081",
"STUS_RSN_DESC": "APPROVED FOR REVALIDATION",
"STUS_DESC": "APPROVED"
}
{
"ENRLMT_ID": "I12345",
"STUS_CD": "13",
"SRC_ENRLMT_STUS_HSTRY_SK": "OxdP6jOQnr/o+UfE4q0zr5p7lMvK0Fh9N",
"STUS_RSN_CD": "029",
"STUS_RSN_DESC": "THE PROVIDER OR SUPPLIER IS VOLUNTARILY WITHDRAWING",
"STUS_DESC": "DEACTIVATED"
}
我希望得到的结果是:
{
"ENRLMT_ID": "I12345",
PAYLOAD: [
{
"STUS_CD": "06",
"STUS_RSN_CD": "081",
"STUS_RSN_DESC": "APPROVED FOR REVALIDATION",
"STUS_DESC": "APPROVED"
},
{
"STUS_CD": "13",
"STUS_RSN_CD": "029",
"STUS_RSN_DESC": "THE PROVIDER OR SUPPLIER IS VOLUNTARILY WITHDRAWING",
"STUS_DESC": "DEACTIVATED"
}
]
}
这是我曾经尽可能接近的KSQL:
CREATE STREAM ENROLLMENT_STATUS_STREAM AS SELECT
ENRLMT_ID AS ENRLMT_ID,
STRUCT(
"STUS_CD":= ESJ.STUS_CD,
"STUS_RSN_CD" := ESJ.STUS_RSN_CD,
"STUS_RSN_DESC":= ESJ.STUS_RSN_DESC,
"STUS_DESC":= ESJ.STUS_DESC
) AS PAYLOAD
FROM ENROLLMENT_STATUS_DATA ESJ;
导致流输出:
{
"ENRLMT_ID": "I12345",
"PAYLOAD": {
"STUS_RSN_CD": "029",
"STUS_RSN_DESC": THE PROVIDER OR SUPPLIER IS VOLUNTARILY WITHDRAWING",
"STUS_CD": "13",
"STUS_DESC": "DEACTIVATED"
}
}
是的,可以使用collect_list
聚合函数
CREATE STREAM ENROLLMENT_STATUS_STREAM
AS
SELECT
ENRLMT_ID,
collect_list(struct(
STUS_CD := ESJ.STUS_CD,
STUS_RSN_CD := ESJ.STUS_RSN_CD,
STUS_RSN_DESC := ESJ.STUS_RSN_DESC,
STUS_DESC := ESJ.STUS_DESC
)) AS PAYLOAD
FROM ENROLLMENT_STATUS_DATA ESJ
GROUP BY ENRLMT_ID;