基于时间戳同步Apache Flink流



我有几个用例,我需要根据时间戳同步多个流。

这是一个示例,我想同步交易栏和报价栏,例如我从原始交易和报价中产生的,我汇总了:

val tradeBars: DataStream[TradeBar] = trades
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new TimeTradeBar(new DownTick()))
val quotesWithFlow = quotes
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")
  .countWindow(2, 1)
  .reduce((previousQuote, quote) => Quote.localOrderFlow(previousQuote, quote))
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")
val quoteBars: DataStream[QuoteBar] = quotesWithFlow
  .assignAscendingTimestamps(_.epochMillis)
  .keyBy("key")
  .window(TumblingEventTimeWindows.of(Time.minutes(5)))
  .process(new QuoteBars.TimeQuoteBar())
val joined: JoinedStreams[TradeBar, QuoteBar]#Where[LocalDateTime]#EqualTo = tradeBars
  .join(quoteBars)
  .where(_.start).equalTo(_.start)
  // need a window here, just want to sync on same time window

我尝试使用Flink的窗口连接函数,但显然这是希望现在有一个窗口功能,然后我可以做一个应用方法。我想要的是在同一时间窗口上同步流。我怀疑这不是加入方法的意图。

我有一个使用Flink Stream Connect方法的工作实现。我将其应用于贸易栏流和原始报价流,但这要求我自己编码一个非常混乱的协调功能

CoProcessTradeBarsAndQuotes() extends CoProcessFunction[TradeBar, Quote, (TradeBar, QuoteBar)]
{}

这很混乱,因为我必须在缓冲区中跟踪引号,并仔细地从Process1和Process2函数中执行聚合。我想一定有一种简单的方法,我只是看不到它。感谢任何帮助和想法。

您没有提及您使用的逻辑来决定要加入哪两个股票(可能有很多股票(,但是总的来说,我会通过从该产品中生成输出记录来解决此问题第一个窗口函数(开放,高,低,关闭,库存(具有附加字段,代表窗口的时间(截断到小时(,然后按照该时间字段进行键,然后执行另一个窗口操作以创建股票的连接您需要。

相关内容

  • 没有找到相关文章

最新更新