如何在flink环境中初始化flink作业的spring资源



我最近遇到了一些关于开发flink作业的问题,它引入了spring和hibernate,作业将在flink集群上运行。所以我需要在任务管理器而不是作业管理器上运行flink操作符之前初始化spring资源。但是我找不到任何合适的StreamExecutionEnvironment方法来做到这一点。

我试过下面这些方法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
// etl business logic as flink operators
FlinkOperators.run();  
env.execute();

但是,当并行度大于1的flink作业执行时,spring初始化将在每个任务管理器进程中NOT。所以我不能在flink工作中使用弹簧。

在flink作业中是否有初始化spring资源的方法?

谢谢。

致以最亲切的问候。阿尔文号

每次您需要为每个任务管理器进行某种上下文初始化时,构建一个静态函数(如果使用scala,则是对象中的函数),将这些初始化值存储在静态变量中。

这应该足够了,因为静态值存储在每个任务管理器的内存中。

我使用这种方法在每个任务管理器中加载属性文件,这些属性文件包含每个作业配置。如果您正在加载文件,请检查每个任务管理器是否都有您想要加载的文件的副本。

相关内容

  • 没有找到相关文章