Flink DDL无法解析Kafka Topic中的两个不同json根



我正在用Python从Kafka向Flink发送消息。在一个Kafka主题中,我有两个不同的json根。我的json根与示例:

1-{"消息1":{"b":"c"}}

2-{'Message2':{e':"f"}}

Flink可以使用这些消息,但不能解析DDL格式。

CREATE TABLE audienceInput (
`messageKey` VARBINARY,
`message` VARBINARY,
`topic` VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'mytopic',
'properties.bootstrap.servers' = '****:9092',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'json'
)
""" 

如何在Flink DDL中解析两种根类型的消息?

您可以直接声明表模式中的两个Message1Message2字段。

CREATE TABLE audience_input (
Message1 ROW(b STRING, c STRING),
Message2 ROW(e STRING, f STRING),
...
) WITH
...
'value.format' = 'json'
)

然后,它们将显示为在结果列中具有嵌套字段的列。

有关Json格式的更多详细信息,请参阅Json格式文档。

相关内容

  • 没有找到相关文章

最新更新