我有两个闪烁dataStream
。例如:dataStream1
和dataStream2
。我想将两个流合并为 1 个流,以便我可以使用相同的进程函数来处理它们,因为两个dataStream
的 dag 是相同的。
截至目前,我需要对任一流的消息消耗具有相同的优先级。 dataStream2 的生产者每分钟生成 10 条消息,而 dataStream1 的生产者每秒生成 1000 条消息。此外,数据类型对于两个 dataStreams.DataSteam2 是相同的,更多的是应该尽快使用的高优先级队列。dataStream1 和 dataStream2 的消息之间没有关系
dataStream1.union(dataStream2)
是否会生成一个包含两个流元素的流?
可能是解决此问题的最简单解决方案,但根据数据源的确切规范,可能不是最有效的解决方案,可能是连接两个流。在此解决方案中,您可以使用CoProcessFunction
,它将为每个连接的流调用单独的方法。
在此解决方案中,您可以简单地缓冲一个流的元素,直到可以生成它们(例如以循环方式(。但请记住,如果源产生事件的频率之间存在非常大的差异,则这可能效率很低。
听起来这两个DataStream
具有不同类型的元素,尽管您没有明确指定。如果是这种情况,则通过每个流上的MapFunction
创建一个Either<stream1 type, stream2 type>
,然后union()
两个流。你不会得到两者的确切混合,因为 Flink 会交替使用每个流的网络缓冲区。
如果你真的想要很好的混合流,那么(正如其他人所指出的(你需要通过状态缓冲传入的元素,并且如果出于任何原因(例如不同的网络延迟,或者更可能是两个源之间的不同性能(你有两个流之间的数据速率非常不同,你还需要应用一些启发式方法来避免过度缓冲。
您可能希望使用实现InputSelectable
接口的自定义运算符,以减少所需的缓冲量。我在下面包含一个示例,该示例在没有任何缓冲的情况下实现交错,但请务必阅读文档中的警告,该警告解释了
。运营商可能会收到一些它目前不想处理的数据......
换句话说,不能依靠这个简单的例子来真正按原样工作。
public class Alternate {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Long> positive = env.generateSequence(1L, 100L);
DataStream<Long> negative = env.generateSequence(-100L, -1L);
AlternatingTwoInputStreamOperator op = new AlternatingTwoInputStreamOperator();
positive
.connect(negative)
.transform("Hack that needs buffering", Types.LONG, op)
.print();
env.execute();
}
}
class AlternatingTwoInputStreamOperator extends AbstractStreamOperator<Long>
implements TwoInputStreamOperator<Long, Long, Long>, InputSelectable {
private InputSelection nextSelection = InputSelection.FIRST;
@Override
public void processElement1(StreamRecord<Long> element) throws Exception {
output.collect(element);
nextSelection = InputSelection.SECOND;
}
@Override
public void processElement2(StreamRecord<Long> element) throws Exception {
output.collect(element);
nextSelection = InputSelection.FIRST;
}
@Override
public InputSelection nextSelection() {
return this.nextSelection;
}
}
另请注意,InputSelectable
是在 Flink 1.9.0 中添加的。