在 Flink 中的一个作业中有多个作业或多个管道



我有一个用例,我想在 Flink 上运行 2 个独立的处理流。 所以 2 个流看起来像

源 1 -> 运算符 1 -> 接收器 1

源 2 -> 运算符 2 -> 汇 2

我想为两个流重用同一个 Flink 集群。我可以通过两种方式考虑这样做:

1( 在同一份 Flink 申请上提交 2 个不同的职位

2( 在同一作业中设置 2 个管道

我能够设置第一个选项,但不确定如何执行第二个选项。 以前有人尝试过这样的设置吗? 一个比另一个有什么优势?

只需在 setupJob(( 方法中创建多个管道(具有单独的或共享的源使用者(。下面是一个示例:

private void buildPipeline(StreamExecutionEnvironment env, String sourceName, String sinkName) {
DataStream<T> stream = env
.addSource(getInputs().get(sourceName))
.name(sourceName);
stream = stream.filter(evt -> filter());
....
}
@Override
public void setupJob(AthenaFlinkJobConfiguration jobConfig, StreamExecutionEnvironment env) throws Exception {
...
buildPipeline(env, sourceTopic1, sink1, ...);
buildPipeline(env, sourceTopic2, sink2, ...);
...
}

以下是两种方法的快速对比。使用单独作业的优缺点:

  • [+] 代码更简单。
  • [+] 设置低级配置(容错机制、堆大小、并行性等(的更大灵活性
  • [-] 由于资源不共享,基础设施成本更高。
  • [-] 维护和监控更加复杂和耗时。

在单个作业中使用单独管道的好处:

  • [+] 监视和调试单个作业更容易。
  • [+] 修补程序提交到单个存储库中,并部署到单个环境。
  • [+] 经济:降低基础设施硬件和运营成本。
  • [-] 无法绑定单个管道使用。
  • [-] 一个管道中的故障会影响另一个管道。
  • [-] 一个管道中的背压可能会影响整个作业,因为每个作业都会快照单个检查点。

第二种方法可以通过在同一StreamExecutionEnvironment中定义两个独立的管道并只调用StreamExecutionEnvironment.execute()一次来实现。

我会使用第一种方法,因为它可以为您提供更好的隔离。Flink 在发生故障时重新启动整个作业。因此,如果在同一作业中实现两个管道,则在发生故障时,将重置并重新启动两个管道。如果您遵循方法一,您也可以独立获取保存点。

相关内容

  • 没有找到相关文章

最新更新