是我使用flink创建的表的create语句。
CREATE TABLE event_kafkaTable (
columnA string,
columnB string,
timeofevent string,
eventTime AS TO_TIMESTAMP(TimestampConverterUtil(timeofevent)),
WATERMARK FOR eventTime AS eventTime - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'topic' = 'event_name',
'properties.bootstrap.servers'='127.0.0.1:9092',
'properties.group.id' = 'action_hitGroup',
'format'= 'json',
'scan.startup.mode'='earliest-offset',
'json.fail-on-missing-field'='false',
'json.ignore-parse-errors'='true'
)
上表监听Kafka,并将主题中的数据存储在名为event_name的Kafka中。现在,我想通过添加一个新列来修改这个表。以下是我尝试从我的flink作业中运行的ALTER命令:
1. ALTER TABLE event_kafkaTable ADD COLUMN test6 string;
2. ALTER TABLE event_kafkaTable ADD test6 string;
这两个命令都引发了Flink SQL Parser异常。
The Flink的官方网站,https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/sql/alter.html,未列出在表中添加或删除列的语法。请告诉我,使用Flink的table API在表中添加或删除列的语法是什么。
(默认(SQL DDL语法中还不支持此操作,但您可以使用AddColumns
和DropColumns
表API方法来执行这些操作。
本文档页面提供了如何将它们用于每种支持的语言的示例。