>场景与经典的流-流连接略有不同
streamA:事务流:transTS,userid,productid,...
streamB:创建的新产品流:productid,productname,createTS,...(
我想使用 productId 加入事务,但找不到水印/连接条件的组合来实现这一点。
streamA_wm = streamA.withWatermark("transTS", "3 minutes")
streamB_wm = streamB.withWatermark("createTS", "1 day")
streamA_wm
.join(streamB_wm, "productId AND transTS >= createTS", "leftOuter")
结果为空。
我做错了什么?
我认为你可能在这里有错误的方法。虽然产品在创建和更新时是事务性的,但它们是相对于其他事务流的元数据。
我建议如下:
- 将事务流加入参考数据产品 - 不受流处理。
- 不要缓存产品,这可以确保您转到源。
- 使用镶木地板,KUDU作为产品。
但是可能有产品流的原因,但是...如果不再对产品进行更新,并且您通过事务流再次获取该产品的数据,会发生什么情况?