如何使用 EXTRACTJSONFIELD(message, '$.payload.version') 创建 KSQLdb 流



如何使用EXTRACTJSONFIELD创建value_type JSON的KSQLdb流?这会使用select语句来完成吗?我不清楚在使用EXTRACTJSONFIELD运算符创建流的过程中如何为流定义字段名。

感谢

使用AS对字段名进行别名。下面是一个例子。

伪源数据:

ksql> PRINT 'source_data' FROM BEGINNING;
Format:JSON
{"ROWTIME":1545239521600,"ROWKEY":"null","Header":{"RecType":"RecA"},"RAFld1":{"someFld":"some data","someOtherField":1.001},"RAFld2":{"aFld":"data","anotherFld":98.6}}
{"ROWTIME":1545239526600,"ROWKEY":"null","Header":{"RecType":"RecB"},"RBFld1":{"randomFld":"random data","randomOtherField":1.001}}

声明源流

CREATE STREAM my_stream (Header VARCHAR, 
RAFld1 VARCHAR, 
RAFld2 VARCHAR, 
RBFld1 VARCHAR) 
WITH (KAFKA_TOPIC='source_data', VALUE_FORMAT='JSON');

创建派生流

CREATE STREAM recA_data WITH (VALUE_FORMAT='AVRO') AS 
SELECT EXTRACTJSONFIELD(RAFld1,'$.someOtherField') AS someOtherField, 
EXTRACTJSONFIELD(RAFld1,'$.someFld')        AS someFld, 
EXTRACTJSONFIELD(RAFld2,'$.aFld')           AS aFld, 
EXTRACTJSONFIELD(RAFld2,'$.anotherFld')     AS anotherFld 
FROM my_stream 
WHERE EXTRACTJSONFIELD(Header,'$.RecType') = 'RecA';

(来源(

相关内容

最新更新