我正在尝试将所有出现的 ID 数据分组到 JSON 有效负载中作为数组

  • 本文关键字:JSON 有效 负载 数组 数据 ID ksqldb
  • 更新时间 :
  • 英文 :


我是KSQL的新手,我觉得应该有一种方法可以将数据分组并添加到数组中我从流中获取这些单独的对象,例如

{
"ENRLMT_ID": "I12345",
"STUS_CD": "06",      
"STUS_RSN_CD": "081",
"STUS_RSN_DESC": "APPROVED FOR REVALIDATION",
"STUS_DESC": "APPROVED"  
}
{
"ENRLMT_ID": "I12345",
"STUS_CD": "13",
"SRC_ENRLMT_STUS_HSTRY_SK": "OxdP6jOQnr/o+UfE4q0zr5p7lMvK0Fh9N",
"STUS_RSN_CD": "029",
"STUS_RSN_DESC": "THE PROVIDER OR SUPPLIER IS VOLUNTARILY WITHDRAWING",
"STUS_DESC": "DEACTIVATED"  
}

我希望得到的结果是:

{     
"ENRLMT_ID": "I12345",
PAYLOAD: [ 
{
"STUS_CD": "06",              
"STUS_RSN_CD": "081",
"STUS_RSN_DESC": "APPROVED FOR REVALIDATION",
"STUS_DESC": "APPROVED"
},

{
"STUS_CD": "13",              
"STUS_RSN_CD": "029",
"STUS_RSN_DESC": "THE PROVIDER OR SUPPLIER IS VOLUNTARILY WITHDRAWING",
"STUS_DESC": "DEACTIVATED"  
}
]
}

这是我曾经尽可能接近的KSQL:

CREATE STREAM ENROLLMENT_STATUS_STREAM AS SELECT
ENRLMT_ID AS ENRLMT_ID,
STRUCT(
"STUS_CD":= ESJ.STUS_CD,
"STUS_RSN_CD" := ESJ.STUS_RSN_CD,
"STUS_RSN_DESC":= ESJ.STUS_RSN_DESC,  
"STUS_DESC":= ESJ.STUS_DESC    
) AS PAYLOAD
FROM ENROLLMENT_STATUS_DATA ESJ;

导致流输出:

{
"ENRLMT_ID": "I12345",
"PAYLOAD": {
"STUS_RSN_CD": "029",
"STUS_RSN_DESC": THE PROVIDER OR SUPPLIER IS VOLUNTARILY WITHDRAWING",
"STUS_CD": "13",
"STUS_DESC": "DEACTIVATED"    
}
}

是的,可以使用collect_list聚合函数

CREATE STREAM ENROLLMENT_STATUS_STREAM 
AS 
SELECT
ENRLMT_ID,
collect_list(struct(
STUS_CD := ESJ.STUS_CD,
STUS_RSN_CD := ESJ.STUS_RSN_CD,
STUS_RSN_DESC := ESJ.STUS_RSN_DESC,  
STUS_DESC := ESJ.STUS_DESC    
)) AS PAYLOAD
FROM ENROLLMENT_STATUS_DATA ESJ
GROUP BY ENRLMT_ID;

最新更新