Flink:合并后如何发出?



我定义一个Transaction类:

case class Transaction(accountId: Long, amount: Long, timestamp: Long)

TransactionSource以一定的时间间隔简单地释放Transaction。现在我想计算每个帐户id的最后2个事务时间戳,见下面的代码:

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource
object LastNJob {
final val QUERY =
"""
|WITH last_n AS (
|    SELECT accountId, `timestamp`
|    FROM (
|        SELECT *,
|            ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY `timestamp` DESC) AS row_num
|        FROM transactions
|    )
|    WHERE row_num <= 2
|)
|SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) last2_timestamp
|FROM last_n
|GROUP BY accountId
|""".stripMargin
def main(args: Array[String]): Unit = {
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, settings)
val txnStream: DataStream[Transaction] = streamEnv
.addSource(new TransactionSource)
.name("transactions")
tableEnv.createTemporaryView("transactions", txnStream)
tableEnv.executeSql(QUERY).print()
}
}

当我运行程序时,我得到:

+----+----------------------+--------------------------------+
| op |            accountId |                last2_timestamp |
+----+----------------------+--------------------------------+
| +I |                    1 |                  1546272000000 |
| +I |                    2 |                  1546272360000 |
| +I |                    3 |                  1546272720000 |
| +I |                    4 |                  1546273080000 |
| +I |                    5 |                  1546273440000 |
| -U |                    1 |                  1546272000000 |
| +U |                    1 |    1546272000000,1546273800000 |
| -U |                    2 |                  1546272360000 |
| +U |                    2 |    1546272360000,1546274160000 |
| -U |                    3 |                  1546272720000 |
| +U |                    3 |    1546272720000,1546274520000 |
| -U |                    4 |                  1546273080000 |
| +U |                    4 |    1546273080000,1546274880000 |
| -U |                    5 |                  1546273440000 |
| +U |                    5 |    1546273440000,1546275240000 |
| -U |                    1 |    1546272000000,1546273800000 |
| +U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
| +U |                    1 |    1546273800000,1546275600000 |
(to continue)

让我们关注accountId=1的最后一个事务(来自上面)。当账户1在时间戳=1546275600000处发生新事务时,总共有4个操作。

+----+----------------------+--------------------------------+
| op |            accountId |                last2_timestamp |
+----+----------------------+--------------------------------+
| -U |                    1 |    1546272000000,1546273800000 |
| +U |                    1 |                  1546273800000 |
| -U |                    1 |                  1546273800000 |
| +U |                    1 |    1546273800000,1546275600000 |

而我只想发出下面的&;new status&;到我的下游(比如另一个Kafka主题)通过某种合并:

+----------------------+--------------------------------+
|            accountId |                last2_timestamp |
+----------------------+--------------------------------+
|                    1 |    1546273800000,1546275600000 |

这样我的下游就可以消费每个账户的最后两个交易时间戳了

+----------------------+--------------------------------+
|            accountId |                last2_timestamp |
+----------------------+--------------------------------+
|                    1 |                  1546272000000 |
|                    1 |    1546272000000,1546273800000 |
|                    1 |    1546273800000,1546275600000 |
(to continue)

正确的方法是什么?

下面是来自Apache用户邮件列表的Timo Walther的简短回答。


解决方案1:使用数据流APItoRetractStreamfilter过滤删除事件,以便只有增量事件下沉到下游

解决方案2:实现UDF将Top-2和LIST_AGG操作合并为一个

相关内容

  • 没有找到相关文章

最新更新