Flink 表,创建表数组类型错误"ValidationException"



我创建了一个包含数据类型字段的flink表,但错误类型不匹配我想知道如何在flink表中创建一个包含数组类型的临时表。

public class FlinkConnectorClickhouse {
public static void main(String[] args) throws Exception {
// create environments of both APIs
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// create a DataStream
DataStream<Order> dataStream = env.fromCollection(Arrays.asList(
new Order(2L, "pen", 1, Arrays.asList("name01", "name02", "name03"), Arrays.asList(1, 2, 3)));
Table inputTable = tableEnv.fromDataStream(dataStreamMap, $("user").as("user_a"), $("product"), $("amount"), $("name_list"), $("id_list"));
// register the Table object as a view and query it
tableEnv.createTemporaryView("InputTable", inputTable);
tableEnv.executeSql("CREATE TABLE sink_table (n" +
"    `user_a` BIGINT,n" +
"    `product` VARCHAR,n" +
"    `amount` BIGINT,n" +
"    `name_list` ARRAY<STRING>,n" +
"    `id_list` ARRAY<INT>,n" +
"    PRIMARY KEY (user_a) NOT ENFORCED /* 如果指定 pk,进入 upsert 模式 */n" +
") WITH (n" +
")");
TableResult resultTable = tableEnv.executeSql("INSERT INTO sink_table SELECT user_a, product, amount,name_list,id_list FROM InputTable");
env.execute();
}
public static class Order {
private Long user;
private String product;
private Integer amount;
private List<String> name_list;
private List<Integer> id_list;
}
") WITH (n" +
")");

可能是哪里出了问题。水槽连接器在哪里?是卡夫卡吗?是蜂巢吗?它是一个带有JDBC的常规数据库吗?

你会在";表API连接器";此处:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/.请确保您遵循正确的JAR打包/部署/管理策略。

如果你是一个初学者,你可能会对这个与Flink捆绑在一起的产品感兴趣:

) WITH (
'connector' = 'filesystem',           -- required: specify the connector
'path' = 'file:///path/to/whatever',  -- required: path to a directory
'format' = 'csv'                     -- required: file system connector requires to specify a format,
)

或者这个:

WITH (
'connector' = 'print'
);

或者这个:

WITH (
'connector' = 'blackhole'
);

最新更新