我试图提交一个已经打包在JAR中的flink作业。基本上,它使用了一个受SASL身份验证保护的kafka主题,因此它需要一个.jks文件,我已经将其包含在JAR中,并在代码中读取为:
try(InputStream resourceStream = loader.getResourceAsStream(configFile)){
properties.load(resourceStream);
properties.setProperty("ssl.truststore.location",
loader.getResource(properties.getProperty("ssl.truststore.location")).toURI().getPath());
}
catch(Exception e){
System.out.println("Failed to load config");
}
为了测试,我尝试在两个不同(不同VM规格(的独立服务器上提交作业。一台服务器成功运行,但另一台服务器抛出java.nio.file.NoSuchFileException
,表示找不到我的.jks文件。有人能指出可能的问题吗?
在这里,flink部署在具有以下版本的独立集群模式上:
- Flink版本:
1.14.0
- Java版本:
11.0.13
我意识到我的问题真的很傻。这部分实际上返回null并触发异常。
loader.getResource(properties.getProperty("ssl.truststore.location")).toURI().getPath()
问题是我通过web UI提交了作业,因此我看不到打印的消息。因此,文件名解析为存储在configFile下的原始文件名,configFile是一个相对路径。为什么一台机器工作而另一台不工作?因为我之前在homedir上有.jks
,用于另一个测试:(。
为了让其他人不要陷入这个错误,这里总结了.getResource()
如果分别从IDE(gradle run task(和jar运行,将解决什么问题。
// file:home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// jar:file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").toString());
// home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").getPath());
// home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// null
System.out.println(loader.getResource("kafka-client.trustore.jks").toURI().getPath());
// file:home/gradle-demo/build/resources/main/kafka-client.truststore.jks
// jar:file:home/gradle-demo/build/libs/gradle-demo-1.0-SNAPSHOT.jar!/kafka-client.truststore.jks
System.out.println(loader.getResource("kafka-client.trustore.jks").toURI());
kafka客户端:2.4.1org.apache.kafka.common.security.ssl.SslEngineBuilder#285
try (InputStream in = Files.newInputStream(Paths.get(path))) {
KeyStore ks = KeyStore.getInstance(type);
// If a password is not set access to the truststore is still available, but integrity checking is disabled.
char[] passwordChars = password != null ? password.value().toCharArray() : null;
ks.load(in, passwordChars);
return ks;
} catch (GeneralSecurityException | IOException e) {
throw new KafkaException("Failed to load SSL keystore " + path + " of type " + type, e);
}
看起来我们应该把jks文件放在文件系统(nfs或hdfs(中,任务管理器可以通过绝对路径进行访问。