Flink JDBCInputFormat 找不到方法 'setRowTypeInfo'



我想用flink-jdbc从mysql获取数据。 我在 Apache flink 网站上看过一个例子

// Read data from a relational database using the JDBC input format
DataSet<Tuple2<String, Integer> dbData =
env.createInput(
JDBCInputFormat.buildJDBCInputFormat()
.setDrivername("org.apache.derby.jdbc.EmbeddedDriver")
.setDBUrl("jdbc:derby:memory:persons")
.setQuery("select name, age from persons")
.setRowTypeInfo(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish()
);

但是当我尝试编写演示时,我找不到方法"setRowTypeInfo"。 是这样的

import org.apache.flink.api.common.typeinfo.BasicTypeInfo
import org.apache.flink.api.java.ExecutionEnvironment
import org.apache.flink.api.java.io.jdbc.JDBCInputFormat
import org.apache.flink.api.scala._
/**
* Created by lulijun on 17/7/7.
*/
object FlinkJDBC {

def main(args:Array[String]): Unit = {
val env = ExecutionEnvironment.createLocalEnvironment()
val dbData = env.createInput(
JDBCInputFormat.buildJDBCInputFormat
.setDrivername("com.mysql.jdbc.Driver")
.setDBUrl("XXX")
.setUsername("xxx")
.setPassword("XXX")
.setQuery("select name, age from persons")
.setRowTypeInfo(new Nothing(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO))
.finish)
dbData.print()
env.execute()
}
}

"setRowTypeInfo"方法始终为红色,IDEA 提示 "无法解析符号集行类型信息"

我使用的 flink-jdbc 的 jar 版本是 1.0.0。

<dependencies>
<!-- Use this dependency if you are using the DataSet API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.36</version>
</dependency>
</dependencies>

我搜索了很多,大多数人使用的方法与官方文件完全相同,但有一个提到了这个问题。 我怀疑我是否使用了错误的 flink-jdbc 版本,但我无法获得有关使用 flink-jdbc 的正确方法的任何信息。 如果你知道问题所在,请教我。谢谢。

我将 flink-jdbc 版本从 1.0.0 更改为 1.3.0,问题解决了。 但是当我在 maven websit 上搜索 flink-jdbc 时 https://mvnrepository.com/search?q=flink-jdbc,我在前几页中无法获得正确的信息,这让我觉得 flink-jdbc 的版本不需要与其他 flink jar 匹配。 但事实是 flink-jdbc/1.1.3 使用软件包 api.table 的类 RowTypeInfo,但 flink-jdbc/1.3.0 使用包 api 的类 RowTypeInfo .java它们彼此之间有着密切的联系。 我们必须确保版本匹配。

相关内容

  • 没有找到相关文章

最新更新