我正在设置一个Apache Spark长时间运行的流作业,以使用InputDStream执行(非并行化(流。
我试图实现的是,当队列上的批处理花费太长时间(基于用户定义的超时(时,我希望能够跳过批处理并完全放弃它 - 并继续执行其余部分。
我无法在 Spark API 或在线找到这个问题的解决方案——我考虑使用 StreamingContext awaitTerminationOrTimeout,但这会在超时时杀死整个 StreamingContext,而我想做的只是跳过/杀死当前批次。
我也考虑过使用mapWithState,但这似乎不适用于这个用例。最后,我正在考虑设置一个 StreamingListener 并在批处理开始时启动一个计时器,然后在达到某个超时阈值时让批处理停止/跳过/终止,但似乎仍然没有办法杀死批处理。
谢谢!
我看过一些来自 yelp 的文档,但我自己还没有做过。
使用UpdateStateByKey(update_func)
或mapWithState(stateSpec)
,
- 首次看到事件和初始化状态时附加超时
-
如果状态过期,则删除状态
def update_function(new_events, current_state): if current_state is None: current_state = init_state() attach_expire_datetime(new_events) ...... if is_expired(current_state): return None //current_state drops? if new_events: apply_business_logic(new_events, current_state)
看起来结构化流水印也会在事件超时时丢弃事件,如果这可能适用于您的作业/阶段超时丢弃。