我构建了一个flink流作业,从kafka读取一个xml文件,转换文件并将其写入数据库。由于xml文件中的属性与数据库列名不匹配,我为映射构建了一个开关用例。
由于这不是真正灵活的,我想把这个硬连线的映射信息从代码中去掉。首先,我想到了一个映射文件的想法,它可以看起来像这样:
path.in.xml.to.attribut=database.column.name
当前作业逻辑如下:
switch(path.in.xml.to.attribute){
case "example.one.name":
return "name";
对于映射文件,我想我会使用Map将映射数据存储为键值对。
这将使目前的工作更加灵活。还有一个缺点是,对于我想应用的配置中的每一个更改,我都必须重新启动flink作业。
我的问题是,是否有可能在运行时注入这种映射逻辑,例如通过自己的kafka主题。当这种实现成为可能时,作为一个例子会是什么样子呢。
如果您只需要能够更新xml属性和数据库列名之间的映射,那么可以使用广播状态模式。此外,ApacheFlink中的广播状态实用指南也很有用。
我们的想法是创建一个流,订阅您自己的kafka主题,并使用数据库映射将更新广播给所有任务管理器。这些运算符将此Map<String, String>
作为一个状态进行维护,您可以使用此映射状态来解析列名,即使用map.get(path.in.xml.to.attribute))
而不是switch(path.in.xml.to.attribute)
。在这种情况下,map
运算符应替换为BroadcastProcessFunction
。