Apache flink完全外部联接的错误结果



我有两个数据流,它们是从两个表创建的,比如:

Table orderRes1 = ste.sqlQuery(
"SELECT orderId, userId, SUM(bidPrice) as q FROM " + tble +
" Group by orderId, userId");
Table orderRes2 = ste.sqlQuery(
"SELECT orderId, userId, SUM(askPrice) as q FROM " + tble +
" Group by orderId, userId");
DataStream<Tuple2<Boolean, Row>> ds1 = ste.toRetractStream(orderRes1 , Row.class).
filter(order-> order.f0);
DataStream<Tuple2<Boolean, Row>> ds2 = ste.toRetractStream(orderRes2 , Row.class).
filter(order-> order.f0);

我想对这两个流执行完全的外部联接,并且我使用了两个orderRes1.fullOuterJoin(orderRes2 ,$(exp))和一个包含完整外部联接的sql查询,如下所示:

Table bidOrdr = ste.fromDataStream(bidTuple, $("orderId"),
$("userId"), $("price"));

Table askOrdr = ste.fromDataStream(askTuple, $("orderId"),
$("userId"), $("price"));
Table result = ste.sqlQuery(
"SELECT COALESCE(bidTbl.orderId,askTbl.orderId) , " +
" COALESCE(bidTbl.userId,askTbl.orderId)," +
" COALESCE(bidTbl.bidTotalPrice,0) as bidTotalPrice, " +
" COALESCE(askTbl.askTotalPrice,0) as askTotalPrice, " + 
" FROM " +
" (SELECT orderId, userId," +
" SUM(price) AS bidTotalPrice " +
" FROM " + bidOrdr +
" Group by orderId, userId) bidTbl full outer JOIN " +
" (SELECT orderId, userId," +
" SUM(price) AS askTotalPrice" +
" FROM " + askOrdr +
" Group by orderId, userId) askTbl " +
" ON (bidTbl.orderId = askTbl.orderId" +
" AND bidTbl.userId= askTbl.userId) ") ;
DataStream<Tuple2<Boolean, Row>> =  ste.toRetractStream(result, Row.class).filter(order -> order.f0);

然而,在某些情况下的结果是不正确的:想象用户A以价格向B出售3次,之后用户B向A出售2次,第二次的结果是:

7>(真,123,a,300.0,0.0(

7>(true,123,a,300.0200.0(

10>(真,123,b,0.0300.0(

10>(真,123,b,200.0300.0(

第二行和第四行是流的预期结果,但它也会生成第一行和第三行。值得一提的是,coGroup是另一个解决方案,但我不想在这种情况下使用窗口化,而非窗口化解决方案只能在有界流(DataSet(中访问。

提示:orderId和userId将在两个流中重复,我希望在每个操作中生成2行,包含:orderId,userId1,bidTotalPrice,askTotalPrice ANDorderId,userId2,bidTotalPrice,askTotalPrice

流式查询(或者换句话说,在动态表上执行查询(会出现类似的情况。与传统数据库不同,在传统数据库中,查询的输入关系在查询执行期间保持静态,流式查询的输入不断更新,因此结果也必须不断更新。

如果我理解这里的设置;不正确的";行1和3上的结果是正确的,直到来自CCD_ 2的相关行被处理为止。如果这些行从未到达,那么第1行和第3行将保持正确。

你应该期待的是最终正确的结果,包括必要的撤回。可以通过启用小批量聚合来减少中间结果的数量。

这个邮件列表线程提供了更多的见解。如果我误解了你的情况,请提供一个可重复的例子来说明这个问题。

相关内容

  • 没有找到相关文章

最新更新