如何写s3表sink在flink没有更新和删除更改错误?



考虑一个代码:

import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
class Scratch {
public static void main(String[] args) {
StreamTableEnvironment tableEnv = /*some init code here*/;
tableEnv.executeSql("CREATE TABLE my_table (n" +
"                                  id STRING,n" +
"                                  createdDate DATE,n" +
"                                  `date` STRING " +
"                                ) PARTITIONED BY (`date`) n" +
"                                WITH (n" +
"                                  'connector' = 'filesystem',n" +
"                                  'path' = 's3://my-bucket/',n" +
"                                  'format' = 'json'n" +
"                                )");
tableEnv.executeSql("CREATE TABLE output_table  (n" +
"  id STRING,n" +
"  created_date DATE,n" +
"  count_value BIGINT,n" +
"  PRIMARY KEY (id, created_date) NOT ENFORCEDn" +
") WITH (n" +
"   'connector' = 'filesystem', n" +
"   'path' = 's3://some-bucket/output-table/',n" +
"   'format' = 'json'n" +
" )");
Table temp = tableEnv.sqlQuery(
" SELECT id as id, " +
" max(createdDate) as created_date, " +
" COUNT(DISTINCT(id)) as count_value  " +
" from my_tablen" +
"    GROUP BY createdDate, id"
);
temp.executeInsert("output_table");
}
}

这将给我错误:

org.apache.flink.client.program。ProgramInvocationException:主要方法导致错误:表sink 'default_catalog.default_database.output_table'不支持消费由节点GroupAggregate(select=[MIN($f0) AS id, MAX(createdDate) AS created_date, COUNT(DISTINCT $f2) AS count_value])产生的更新更改

有办法写聚合s3通过flink?在批处理中运行flink模式)

因为您正在流式模式下运行查询,这需要一个可以处理来自聚合的更新和删除的接收器。

  • 以CDC (changelog)格式生成结果,如debezium,
  • 或以批处理模式运行作业

要以批处理模式运行,可以这样做:

import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings settings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
TableEnvironment tEnv = TableEnvironment.create(settings);

如果你需要在批处理执行模式下使用Table API,同时还需要访问DataStream API,这只能从Flink 1.14开始实现。

最新更新