Flink作业提交在文件实际存在时抛出java.nio.file.NoSuchFileException



我试图提交一个已经打包在JAR中的flink作业。基本上,它使用了一个受SASL身份验证保护的kafka主题,因此它需要一个.jks文件,我已经将其包含在JAR中,并在代码中读取为:

try(InputStream resourceStream = loader.getResourceAsStream(configFile)){
properties.load(resourceStream);
properties.setProperty("ssl.truststore.location",
loader.getResource(properties.getProperty("ssl.truststore.location")).toURI().getPath());
}
catch(Exception e){
System.out.println("Failed to load config");
}

为了测试,我尝试在两个不同(不同VM规格(的独立服务器上提交作业。一台服务器成功运行,但另一台服务器抛出java.nio.file.NoSuchFileException,表示找不到我的.jks文件。有人能指出可能的问题吗?

在这里,flink部署在具有以下版本的独立集群模式上:

  • Flink版本:1.14.0
  • Java版本:11.0.13

我意识到我的问题真的很傻。这部分实际上返回null并触发异常。

loader.getResource(properties.getProperty("ssl.truststore.location")).toURI().getPath()

问题是我通过web UI提交了作业,因此我看不到打印的消息。因此,文件名解析为存储在configFile下的原始文件名,configFile是一个相对路径。为什么一台机器工作而另一台不工作?因为我之前在homedir上有.jks,用于另一个测试:(。

为了让其他人不要陷入这个错误,这里总结了.getResource()如果分别从IDE(gradle run task(和jar运行,将解决什么问题。

//      file:home/gradle-demo/build/resources/main/kafka-client.truststore.jks
//      jar:file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").toString());
//      home/gradle-demo/build/resources/main/kafka-client.truststore.jks
//      file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").getPath());
//      home/gradle-demo/build/resources/main/kafka-client.truststore.jks
//      null
System.out.println(loader.getResource("kafka-client.trustore.jks").toURI().getPath());
//      file:home/gradle-demo/build/resources/main/kafka-client.truststore.jks
//      jar:file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").toURI());

kafka客户端:2.4.1org.apache.kafka.common.security.ssl.SslEngineBuilder#285

try (InputStream in = Files.newInputStream(Paths.get(path))) {
KeyStore ks = KeyStore.getInstance(type);
// If a password is not set access to the truststore is still available, but integrity checking is disabled.
char[] passwordChars = password != null ? password.value().toCharArray() : null;
ks.load(in, passwordChars);
return ks;
} catch (GeneralSecurityException | IOException e) {
throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
}

看起来我们应该把jks文件放在文件系统(nfs或hdfs(中,任务管理器可以通过绝对路径进行访问。

最新更新