Flink转换(联接(流中有一些元素正在重新处理,即使它们没有被修改。
假设我们有3个元素:1、2和3。当它们被插入时,会发生这种情况:
- 插入第一个元素1时,输出为:1
- 当插入第二个元素2时,输出为:1->2(1被重新处理并输出(
- 第三元素插入:1->2->3(1和2被重新处理(
在上次插入中,1或2没有任何更改,因此没有理由对它们进行重新处理。
再处理规则:
- 只有同一出版商的书才会被重新处理。这意味着当插入出版商2的书籍时,只有出版商2的书被重新处理。我们的目标是不重新处理任何内容,因为它们不受现有新书的影响
- 修改某个出版商时,只会重新处理该出版商的图书。(可以(
加入后将使用全局窗口,如下所示:
bookStream
.join(publisherStream)
.where(book -> book.publisherId)
.equalTo(publisher -> publisher.id)
.window(GlobalWindows.create())
.trigger(new ForeverTrigger<>())
.apply(new JoinFunction<Book, Publisher, Book_Publisher>() {
@Override
public Book_Publisher join(Book book, Publisher publisher) throws Exception {
return new Book_Publisher(book, publisher);
}
})
ForeverTrigger实现:
public class ForeverTrigger<T, E extends Window> extends Trigger<T, E> {
@Override
public TriggerResult onElement(T element, long timestamp, E window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(long time, E window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, E window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(E window, TriggerContext ctx) throws Exception {}
}
对于这个用例,有必要始终存储所有元素,因为如果一本书被更新,我们需要有相应的出版商加入,反之亦然。因此,从bookStream
或publisherStream
中删除元素不是一种选择。
一个解决方案是使用TableAPI,如下所述:为什么Flink在DataStream join+Global窗口上发出重复记录?。这将起作用,然后可以转换为数据流。然而,我希望避免将表API使用与数据流API使用混合在一起,特别是因为项目的主要目标是推广和自动创建flink管道,这意味着将有两个API来推广,而不是一个。因此,如果有一个不同的有效解决方案,那就太好了。
另一种解决方案是驱逐或过滤元素,正如上面链接的同一篇文章中所提到的,但这似乎效率低下,因为它仍然需要处理元素,以便驱逐/过滤它们。这将需要保留以前状态的列表,并比较传入的元素。
理想情况下,Flink会知道只处理包含更改的元素。有没有一个有效的解决方案可以用数据流执行这种连接,并且只处理修改后的元素?
窗口联接的设计并没有考虑到这种情况。为了有效地处理这个问题,我认为您需要在API堆栈中降低一个级别并使用KeyedCoProcessFunctions,或者提高一个级别,并使用Table API。