如何重新运行 Apache Flink Postgres JDBC 作业而不会"No suitable driver found"异常



我有一个从初学者Maven项目派生的Flink作业。该作业有一个打开Postgres JDBC连接的源。我正在使用示例docker-compose.yml在我自己的Flink会话集群上执行作业。

当我第一次提交作业时,它执行成功。当我尝试再次提交时,我得到以下错误:

Caused by: java.sql.SQLException: No suitable driver found for jdbc:postgresql://host.docker.internal:5432/postgres?user=postgres&password=mypassword
at java.sql.DriverManager.getConnection(DriverManager.java:689)
at java.sql.DriverManager.getConnection(DriverManager.java:270)
at com.myorg.project.JdbcPollingSource.run(JdbcPollingSource.java:25)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

我必须重新启动集群才能重新运行作业。为什么会发生这种情况?如何在不重启集群的情况下再次提交作业?

Maven启动器项目中唯一增加的内容是:

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.24</version>
</dependency>

Flink源代码只打开一个JDBC连接,如下所示:

package com.mycompany;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
public class JdbcSource extends RichSourceFunction<Integer> {
private final String connString;
public JdbcSource(String connString) {
this.connString = connString;
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
try (Connection conn = DriverManager.getConnection(this.connString)) {
}
}
@Override
public void cancel() {
}
}

我在Flink版本1.14.0和1.13.2上测试了这个,结果相同。

注意,这个问题提供了在我的RichSourceFunction中使用Class.forName("org.postgresql.Driver");的解决方案。但是我想知道到底发生了什么。

您可以参考的第一个问题JDBC驱动程序不能在Apache Flink中从SQL数据库读取数据集时找到。

第二,如果您使用会话模式。可以很容易地重新运行Flink作业,而无需重新启动集群。您可以登录到作业管理器shell,然后使用命令rerun job。

Class.forName("org.postgresql.Driver");将触发静态方法块,因此您的DriverManager可以获得驱动类。看到:

// from org.postgresql.Driver
static {
try {
register();
} catch (SQLException var1) {
throw new ExceptionInInitializerError(var1);
}
}

我有这个pom.xml依赖于Postgres for Apache Flink 1.13:

<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>9.4-1201-jdbc41</version>
</dependency>

你可以有一个Postgres连接器类,例如:

public class PostgreSQLConnector {
private static volatile PostgreSQLConnector instance;
private Connection connectionDB = null;
public PostgreSQLConnector(your params) {
...
}
public static PostgreSQLConnector getInstance() {
PostgreSQLConnector postgreSQLConnector = instance;
if (postgreSQLConnector != null)
return postgreSQLConnector;
synchronized (PostgreSQLConnector.class) {
if (instance == null) {
instance = new PostgreSQLConnector(your params);
}
return instance;
}
}
public Connection getConnectionDB() throws SQLException {
if (checkNullConnection()) CreateConnection();
return connectionDB;
}
public void CheckConnection() throws SQLException {
if (checkNullConnection()) CreateConnection();
}
public void CreateConnection() throws SQLException {
try {
Class.forName(sink.driverName);
connectionDB = DriverManager.getConnection(fullUrl, username, password);
} catch (Exception e) {
...
}
}
public boolean checkNullConnection() throws SQLException {
return (connectionDB == null || connectionDB.isClosed());
}
}

则可以在overridesopen方法中创建RichSourceFunction并创建连接,而不是在run

中创建连接。
public class JdbcSource extends RichSourceFunction<Integer> {
private final String connString;
private static Connection dbConnection;
private static final PostgreSQLConnector postgreSQLConnector = PostgreSQLConnector.getInstance();
public JdbcSource(String connString) {
this.connString = connString;
}
@Override
public void open(Configuration parameters) throws SQLException {
dbConnection = postgreSQLConnector.getConnectionDB();
}
@Override
public void close() throws Exception {
if (dbConnection != null) dbConnection.close();
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
do something here with the connection
}
@Override
public void cancel() {
}
}

类似的东西你可以尝试一下,它应该可以工作

根据PostgreSQL JDBC驱动程序的官方文档,如果您使用的是Java 1.6+,则只需将驱动程序的jar文件放入类路径即可。驱动程序将由JVM自动加载。所以问题是如何将驱动程序的jar文件放入类路径中。

由于您正在使用docker来部署会话集群,因此有两种方法可以工作:

  1. 将驱动程序的jar文件放入docker映像

运行和访问的图像命令:

docker docker run -it -v $PWD:/tmp/flink <address to image> -- bash

将驱动程序的jar文件复制到文件夹/opt/flink/lib

从容器创建一个新映像。由于默认情况下/opt/flink/lib是作为类路径加载的,所以现在驱动程序的jar文件位于类路径中。

  1. 将驱动程序的jar包打包到您的用户jar

将maven-assembly-plugin添加到maven项目的pom.xml中。重新编译项目并获得一个包含依赖项的jar文件。在这个jar中,PostgreSQL JDBC驱动程序被打包在一起。

最新更新