使用Apache Flink SQL从Kafka消息中获取嵌套字段



我正试图使用Apache Flink 1.11创建一个源表,在那里我可以访问JSON消息中的嵌套属性。我可以从根属性中提取值,但我不确定如何访问嵌套对象。

文档建议它应该是MAP类型,但当我设置它时,我会得到以下错误

: java.lang.UnsupportedOperationException: class org.apache.calcite.sql.SqlIdentifier: MAP

这是我的SQL

CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)

我的JSON看起来像这样:

{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar"
}
}

您可以使用ROW提取JSON消息中的嵌套字段。您的DDL语句看起来像:

CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR)
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);

[2022更新]

在Apache Flink 1.13版本中,没有系统内置的JSON函数。它们在1.14版本中引入。检查此

如果您使用的是<1.14,然后参见下面的解决方案。

如何使用嵌套的JSON输入创建表

JSON输入示例:

{
"id": "message-1",
"title": "Some Title",
"properties": {
"foo": "bar",
"nested_foo":{
"prop1" : "value1",
"prop2" : "value2"
}
}
}

创建语句

CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties ROW(`foo` VARCHAR, `nested_foo` ROW(`prop1` VARCHAR, `prop2` VARCHAR))
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
);

如何选择嵌套列

SELECT properties.foo, properties.nested_foo.prop1 FROM input;

请注意,如果您使用输出结果

SELECT properties FROM input

您可以看到行格式的结果。列properties的内容将是

+I[bar, +I[prop1,prop2]]

您也可以尝试

CREATE TABLE input(
id VARCHAR,
title VARCHAR,
properties MAP<STRING, STRING>
) WITH (
'connector' = 'kafka-0.11',
'topic' = 'my-topic',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'python-test',
'format' = 'json'
)

唯一的区别是:MAP<STRING, STRING>MAP

如果使用format=raw,您可以使用JSON_VALUE函数从payload中提取感兴趣的字段:下面是代码:

CREATE TABLE input(
payload STRING,
foo AS JSON_VALUE(payload, '$.properties.foo' RETURNING STRING),
) WITH (
'connector' = 'kafka',        
'format' = 'raw'
)

最新更新