我正试图使用Apache Flink 1.11创建一个源表,在那里我可以访问JSON消息中的嵌套属性。我可以从根属性中提取值,但我不确定如何访问嵌套对象。
文档建议它应该是MAP
类型,但当我设置它时,我会得到以下错误
: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP
这是我的SQL
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
我的JSON看起来像这样:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
}
}
您可以使用ROW
提取JSON消息中的嵌套字段。您的DDL语句看起来像:
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
[2022更新]
在Apache Flink 1.13版本中,没有系统内置的JSON函数。它们在1.14版本中引入。检查此
如果您使用的是<1.14,然后参见下面的解决方案。
如何使用嵌套的JSON输入创建表
JSON输入示例:
{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar",
"nested_foo":{
"prop1" : "value1",
"prop2" : "value2"
}
}
}
创建语句
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);
如何选择嵌套列
SELECT properties.foo, properties.nested_foo.prop1 FROM input;
请注意,如果您使用输出结果
SELECT properties FROM input
您可以看到行格式的结果。列properties
的内容将是
+I[bar, +I[prop1,prop2]]
您也可以尝试
CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP<STRING, STRING>
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)
唯一的区别是:MAP<STRING, STRING>
与MAP
如果使用format=raw
,您可以使用JSON_VALUE
函数从payload
中提取感兴趣的字段:下面是代码:
CREATE TABLE input(
payload STRING,
foo AS JSON_VALUE(payload, '$.properties.foo' RETURNING STRING),
) WITH (
'connector' = 'kafka',
'format' = 'raw'
)