我定义一个Transaction
类:
case class Transaction(accountId: Long, amount: Long, timestamp: Long)
TransactionSource
以一定的时间间隔简单地释放Transaction
。现在我想计算每个帐户id的最后2个事务时间戳,见下面的代码:
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment, _}
import org.apache.flink.table.api.EnvironmentSettings
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.walkthrough.common.entity.Transaction
import org.apache.flink.walkthrough.common.source.TransactionSource
object LastNJob {
final val QUERY =
"""
|WITH last_n AS (
| SELECT accountId, `timestamp`
| FROM (
| SELECT *,
| ROW_NUMBER() OVER (PARTITION BY accountId ORDER BY `timestamp` DESC) AS row_num
| FROM transactions
| )
| WHERE row_num <= 2
|)
|SELECT accountId, LISTAGG(CAST(`timestamp` AS STRING)) last2_timestamp
|FROM last_n
|GROUP BY accountId
|""".stripMargin
def main(args: Array[String]): Unit = {
val settings: EnvironmentSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(streamEnv, settings)
val txnStream: DataStream[Transaction] = streamEnv
.addSource(new TransactionSource)
.name("transactions")
tableEnv.createTemporaryView("transactions", txnStream)
tableEnv.executeSql(QUERY).print()
}
}
当我运行程序时,我得到:
+----+----------------------+--------------------------------+
| op | accountId | last2_timestamp |
+----+----------------------+--------------------------------+
| +I | 1 | 1546272000000 |
| +I | 2 | 1546272360000 |
| +I | 3 | 1546272720000 |
| +I | 4 | 1546273080000 |
| +I | 5 | 1546273440000 |
| -U | 1 | 1546272000000 |
| +U | 1 | 1546272000000,1546273800000 |
| -U | 2 | 1546272360000 |
| +U | 2 | 1546272360000,1546274160000 |
| -U | 3 | 1546272720000 |
| +U | 3 | 1546272720000,1546274520000 |
| -U | 4 | 1546273080000 |
| +U | 4 | 1546273080000,1546274880000 |
| -U | 5 | 1546273440000 |
| +U | 5 | 1546273440000,1546275240000 |
| -U | 1 | 1546272000000,1546273800000 |
| +U | 1 | 1546273800000 |
| -U | 1 | 1546273800000 |
| +U | 1 | 1546273800000,1546275600000 |
(to continue)
让我们关注accountId=1的最后一个事务(来自上面)。当账户1在时间戳=1546275600000处发生新事务时,总共有4个操作。
+----+----------------------+--------------------------------+
| op | accountId | last2_timestamp |
+----+----------------------+--------------------------------+
| -U | 1 | 1546272000000,1546273800000 |
| +U | 1 | 1546273800000 |
| -U | 1 | 1546273800000 |
| +U | 1 | 1546273800000,1546275600000 |
而我只想发出下面的&;new status&;到我的下游(比如另一个Kafka主题)通过某种合并:
+----------------------+--------------------------------+
| accountId | last2_timestamp |
+----------------------+--------------------------------+
| 1 | 1546273800000,1546275600000 |
这样我的下游就可以消费每个账户的最后两个交易时间戳了
+----------------------+--------------------------------+
| accountId | last2_timestamp |
+----------------------+--------------------------------+
| 1 | 1546272000000 |
| 1 | 1546272000000,1546273800000 |
| 1 | 1546273800000,1546275600000 |
(to continue)
正确的方法是什么?
下面是来自Apache用户邮件列表的Timo Walther的简短回答。
解决方案1:使用数据流APItoRetractStream
和filter
过滤删除事件,以便只有增量事件下沉到下游
解决方案2:实现UDF将Top-2和LIST_AGG操作合并为一个