使用KSQL将数据拆分到各个主题



这是源主题->gt;

{
"subscriber": {
"mem_ID": "587994622344",
"mem_PIN": "07066",
"case_NUMBER": "21307P019",
"mem_FIRST_NAME": "Henry",
"mem_MIDDLE_NAME": "S",
"mem_LAST_NAME": "Taylor",
"mem_ADD_1": "1255",
"mem_ADD_2": "New Street",
"mem_CITY": "Clark,NJ"
},
"patient": {
"pat_ID": "133235413144",
"pat_SEX": "M",
"pat_DOB": "198760321",
"case_NUMBER": "21307P019",
"pat_FIRST_NAME": "Olive",
"pat_MIDDLE_NAME": "",
"pat_LAST_NAME": "Taylor",
"pat_PLANE_TYPE": "HealthCare",
"pat_PLAN_NAME": "HC"
},
"case": {
"CASE_NUMBER": "21307P019",
"CASE_TYPE": "Medical",
"CASE_CODE": "MC",
"CASE_START_DATE": "20220723",
"CASE_END_DATE": "20220831",
"CASE_AUTH_TYPE": "Insurance",
"CASE_STATUS": "Initiated"
},
"service": {
"svc_ID": "134324564678",
"case_NUMBER": "21307P019",
"svc_TYPE": "Day Care",
"svc_CODE": "DC",
"svc_FAC_ID": "HC",
"svc_FAC_NAME": "Heart Center",
"svc_PHY_ID": "34343555",
"svc_PHY_NAME": "Dr.Charles"
}
}

如何使用KSQL将上述源主题拆分为不同的4个主题。对于例如>gt;订阅者主题应该只有类似的订阅者数据

{
"mem_ID": "587994622344",
"mem_PIN": "07066",
"mem_MIDDLE_NAME": "S",
"mem_LAST_NAME": "Taylor",
"mem_CITY": "Clark,NJ",
"mem_ADD_2": "New Street",
"mem_ADD_1": "1255",
"case_NUMBER": "21307P019",
"mem_FIRST_NAME": "Henry"
}

与其他3个主题类似,患者、病例和服务

从原始数据创建流,然后选择这些字段。。。

CREATE STREAM subscribers WITH(KAFKA_TOPIC="subscribers", VALUE_FORMAT="JSON") AS 
SELECT subscriber.* FROM source_stream;

https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/create-stream-as-select/

或者,从制作人中的4个独立主题开始,然后加入案例编号,例如

最新更新