我正在用Python从Kafka向Flink发送消息。在一个Kafka主题中,我有两个不同的json根。我的json根与示例:
1-{"消息1":{"b":"c"}}
2-{'Message2':{e':"f"}}
Flink可以使用这些消息,但不能解析DDL格式。
CREATE TABLE audienceInput (
`messageKey` VARBINARY,
`message` VARBINARY,
`topic` VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'mytopic',
'properties.bootstrap.servers' = '****:9092',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'json'
)
"""
如何在Flink DDL中解析两种根类型的消息?
您可以直接声明表模式中的两个Message1
和Message2
字段。
CREATE TABLE audience_input (
Message1 ROW(b STRING, c STRING),
Message2 ROW(e STRING, f STRING),
...
) WITH
...
'value.format' = 'json'
)
然后,它们将显示为在结果列中具有嵌套字段的列。
有关Json格式的更多详细信息,请参阅Json格式文档。