如何在Flink中连接2个以上的流



我有3个不同类型的键控数据流。

DataStream<A> first;
DataStream<B> second;
DataStream<C> third;

每个流都定义了自己的处理逻辑,并在它们之间共享一个状态。我想连接这3个流,每当数据在任何流中可用时触发相应的处理功能。可以在两个流上连接。

first.connect(second).process(<CoProcessFunction>)

我不能使用并集(允许多个数据流(,因为类型不同。我希望避免创建包装器,并将所有流转换为相同的类型。

包装器的方法确实不错。您可以创建一个类似于Flink现有Either<Left, Right>EitherOfThree<T1, T2, T3>包装类,然后在单个函数中处理这些记录的流。类似于:

DataStream <EitherOfThree<A,B,C>> combo = first.map(r -> new EitherOfThree<A,B,C>(r, null, null))
.union(second.map(r -> new EitherOfThree<A,B,C>(null, r, null)))
.union(third.map(r -> new EitherOfThree<A,B,C>(null, null, r)));
combo.process(new MyProcessFunction());

Flink的Either类有一个更优雅的实现,但对于您的用例来说,一些简单的东西应该可以工作。

除了并集,标准方法是在级联中使用连接,例如

first.connect(second).process(...).connect(third).process(...)

您将无法在一个地方共享所有三个流之间的状态。您可以让第一个流程函数输出后续流程函数所需的任何内容,但第三个流无法影响第一个流程功能中的状态,这对某些用例来说是个问题。

另一种可能是利用较低级别的机制——请参阅FLIP-92:在Flink中添加N元流运算符。但是,此机制是供内部使用的(Table/SQL API将其用于n路联接(,需要谨慎对待。有关详细信息,请参阅邮件列表讨论。我提到这一点是为了完整性,但在接口进一步开发之前,我怀疑这是否是个好主意。

您可能还想看看有状态函数api,它克服了数据流api的许多限制。

相关内容

  • 没有找到相关文章

最新更新