将JDBCTableSource与StreamTableEnvironment一起使用会产生ClassCastExcep



我正在尝试使用JDBC postgresql-db的源连接来实现流应用程序。一开始,我尝试了一个基本查询,但由于强制转换异常,我无法执行它。

这是的例外

Caused by: org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: Could not forward element to next operator
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:651)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.time.LocalDate
at org.apache.flink.api.common.typeutils.base.LocalDateSerializer.copy(LocalDateSerializer.java:30)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)

这是的代码片段

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
streamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecutionEnvironment);
JDBCOptions jdbcOptions = JDBCOptions.builder()
.setDriverName("org.postgresql.Driver")
.setDBUrl("jdbc:postgresql://192.168.4.69:5432/sessiondb")
.setUsername("postgres")
.setPassword("postgres")
.setTableName("sessions")
.build();
TableSchema tableSchema = TableSchema.builder()
.field("id", DataTypes.STRING())
.field("ip", DataTypes.STRING())
.field("port", DataTypes.INT())
.field("alias", DataTypes.STRING())
.field("connect_time", DataTypes.DATE())
.field("disconnect_time", DataTypes.DATE())
.build();
JDBCTableSource jdbcTableSource = JDBCTableSource.builder().setOptions(jdbcOptions).setSchema(tableSchema).build();
tableEnvironment.registerTableSource("sessions", jdbcTableSource);
Table table = tableEnvironment.sqlQuery("SELECT * FROM sessions");
tableEnvironment.toAppendStream(table, Row.class).print();
streamExecutionEnvironment.execute();

当我将DataType更改为TIMESTAMP_WITH_TIME_ZONE(这最初是postgresql上的真正DataType(时,我会收到这个错误,它根本不允许我运行应用程序

org.apache.flink.table.api.TableException: Unsupported conversion from data type
'TIMESTAMP(6) WITH TIME ZONE' (conversion class: java.time.OffsetDateTime) to type information. 
Only data types that originated from type information fully support a reverse conversion.

有什么办法解决这个问题吗?(Apache Flink API 1.9.0(

首先,数据类型TIMESTAMP WITH TIME ZONE的异常是因为两个规划者目前都不支持此数据类型。我建议查看兼容性表。

你试过使用DataTypes.TIMESTAMP(3)吗?

相关内容

  • 没有找到相关文章

最新更新