我最近遇到了一些关于开发flink作业的问题,它引入了spring和hibernate,作业将在flink集群上运行。所以我需要在任务管理器而不是作业管理器上运行flink操作符之前初始化spring资源。但是我找不到任何合适的StreamExecutionEnvironment方法来做到这一点。
我试过下面这些方法:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
// etl business logic as flink operators
FlinkOperators.run();
env.execute();
但是,当并行度大于1的flink作业执行时,spring初始化将在每个任务管理器进程中NOT。所以我不能在flink工作中使用弹簧。
在flink作业中是否有初始化spring资源的方法?谢谢。
致以最亲切的问候。阿尔文号
每次您需要为每个任务管理器进行某种上下文初始化时,构建一个静态函数(如果使用scala,则是对象中的函数),将这些初始化值存储在静态变量中。
这应该足够了,因为静态值存储在每个任务管理器的内存中。我使用这种方法在每个任务管理器中加载属性文件,这些属性文件包含每个作业配置。如果您正在加载文件,请检查每个任务管理器是否都有您想要加载的文件的副本。