如何向Flink工作节点提供KafkaSource SSL文件



我正在创建一个基于Kafka的Flink流应用程序,并试图创建一个关联的KafkaSource连接器来读取Kafka数据。

例如:

final KafkaSource<String> source = KafkaSource.<String>builder()
// standard source builder setters
// ...
.setProperty(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "truststore.jks")
.build();

truststore.jks文件是在执行应用程序之前在作业管理器节点上本地创建的,并且我已经验证了它的存在和正确填充。我的问题是,在分布式Flink应用程序中,这个truststore.jks不会自动存在于任务工作者节点上,因此执行上述代码时会产生FileNotFoundException

我尝试过的:

  • 使用env.registerCacheFilegetRuntimeContext().getDistributedCache().getFile()将文件分发到所有节点,但由于图正在构建中,应用程序尚未运行,因此RuntimeContext在此阶段不可用
  • 提供信任库的base64参数表示,并手动将其转换为.jks格式。我需要一些";预初始化";KafkaSource挂钩来完成此操作,并且在文档中没有发现任何此类功能
  • 使用外部数据存储,如s3,并从中检索文件。据我所知,内部Kafka使用者不支持非本地文件系统,因此我仍然需要一些预初始化方法来在每个任务节点上本地检索文件

在源初始化期间,使该文件可用于任务工作节点的最佳方法是什么?

我以前读过类似的问题:

  1. 如何在apacheflink中将文件分发到工作节点
  • 如上所述,在应用程序的这一点上,我无法访问RuntimeContext
  1. Flink Kafka连接器SSL支持
  • 这将信任库作为base64编码的字符串参数注入。我可以这样做,但由于内部Kafka使用者需要一个文件,所以在使用者初始化之前,我会遇到将参数转换为.jks格式的问题。我看不出有什么方法可以注册";预初始化";用于文档中KafkaSource的钩子

更新:

我可以通过使用ssl.truststore.certificates配置字段来解决这个问题。这允许我提供底层truststore.jks证书的base64编码表示,而不是本地文件路径。

[我还不得不将我的kafka-clients依赖项更新为2.7.x+,因为此配置在库的旧版本中不可用]

相关内容

  • 没有找到相关文章

最新更新