Apache Beam Stateful DoFn定期输出所有K/V对



我正试图使用有状态DoFn(使用@ProcessElement@StateIdValueState元素(在Apache Beam中聚合(每个键(流数据源(通过Scio(。我认为这将是最适合我试图解决的问题。要求是:

  • 对于给定的键,记录是在所有时间内聚合的(基本上是求和的(-我不关心以前计算的聚合,只关心最近的
  • 根据我控制的某些条件,密钥可能会被从状态(state.clear()(中逐出
  • 每5分钟,无论是否看到任何新密钥,都应输出尚未从状态中逐出的所有密钥

考虑到这是一个流媒体管道,并且将无限期运行,在具有累积激发窗格的全局窗口上使用combinePerKey似乎会随着时间的推移继续增加其内存占用和运行所需的数据量,所以我希望避免它。此外,在测试这一点时,(也许正如预期的那样(它只是将新计算的聚合与历史输入一起附加到输出中,而不是使用每个键的最新值。

我的想法是,使用StatefulDoFn只需允许我输出到目前为止的所有全局状态((,但这似乎不是一个微不足道的解决方案。我看到过使用计时器人为地执行回调的暗示,以及可能使用缓慢增长的侧输入映射(当我创建PCollectionView<map<String,String>>时如何解决Duplicate values异常(并以某种方式刷新它,但这本质上需要迭代映射中的所有值,而不是加入它。

我觉得我可能忽略了一些简单的事情来实现这一点。我对Beam中的窗口和定时器的许多概念相对陌生,正在寻找如何解决这个问题的建议。谢谢

Stateful DoFn应该在这里为您提供帮助,这是对的。这是你可以做什么的基本草图。请注意,这只输出总和而不输出键。这可能不是你想要的,但它应该会帮助你前进。

class CombiningEmittingFn extends DoFn<KV<Integer, Integer>, Integer> {
@TimerId("emitter")
private final TimerSpec emitterSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId("done")
private final StateSpec<ValueState<Boolean>> doneState = StateSpecs.value();
@StateId("agg")
private final StateSpec<CombiningState<Integer, int[], Integer>>
aggSpec = StateSpecs.combining(
Sum.ofIntegers().getAccumulatorCoder(null, VarIntCoder.of()), Sum.ofIntegers());
@ProcessElement
public void processElement(ProcessContext c,
@StateId("agg") CombiningState<Integer, int[], Integer> aggState,
@StateId("done") ValueState<Boolean> doneState,
@TimerId("emitter") Timer emitterTimer) throws Exception {
if (SOME CONDITION) {
countValueState.clear();
doneState.write(true);
} else {
countValueState.addAccum(c.element().getValue());
emitterTimer.align(Duration.standardMinutes(5)).setRelative();
}
}
}
@OnTimer("emitter")
public void onEmit(
OnTimerContext context,
@StateId("agg") CombiningState<Integer, int[], Integer> aggState,
@StateId("done") ValueState<Boolean> doneState,
@TimerId("emitter") Timer emitterTimer) {
Boolean isDone = doneState.read();
if (isDone != null && isDone) {
return;
} else {
context.output(aggState.getAccum());
// Set the timer to emit again
emitterTimer.align(Duration.standardMinutes(5)).setRelative();
}
}
}
}

很高兴能和你一起迭代一些有用的东西。

@Pablo确实正确地认为StatefulDoFn和计时器在这种情况下很有用。这是我能够使用的代码。

Stateful Do Fn

// DomainState is a custom case class I'm using
type DoFnT = DoFn[KV[String, DomainState], KV[String, DomainState]]
class StatefulDoFn extends DoFnT {
@StateId("key")
private val keySpec = StateSpecs.value[String]()
@StateId("domainState")
private val domainStateSpec = StateSpecs.value[DomainState]()
@TimerId("loopingTimer")
private val loopingTimer: TimerSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME)
@ProcessElement
def process(
context: DoFnT#ProcessContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit = {
... logic to create key/value from potentially null values
if (keepState(value)) {
loopingTimer.align(Duration.standardMinutes(5)).setRelative()
stateKey.write(key)
stateValue.write(value)
if (flushState(value)) {
context.output(KV.of(key, value))
}
} else {
stateValue.clear()
}
}
@OnTimer("loopingTimer")
def onLoopingTimer(
context: DoFnT#OnTimerContext,
@StateId("key") stateKey: ValueState[String],
@StateId("domainState") stateValue: ValueState[DomainState],
@TimerId("loopingTimer") loopingTimer: Timer): Unit = {
... logic to create key/value checking for nulls
if (keepState(value)) {
loopingTimer.align(Duration.standardMinutes(5)).setRelative()
if (flushState(value)) {
context.output(KV.of(key, value))
}
}
}
}

带管道

sc
.pubsubSubscription(...)
.keyBy(...)
.withGlobalWindow()
.applyPerKeyDoFn(new StatefulDoFn())
.withFixedWindows(
duration = Duration.standardMinutes(5),
options = WindowOptions(
accumulationMode = DISCARDING_FIRED_PANES,
trigger = AfterWatermark.pastEndOfWindow(),
allowedLateness = Duration.ZERO,
// Only take the latest per key during a window
timestampCombiner = TimestampCombiner.END_OF_WINDOW
))
.reduceByKey(mostRecentEvent())
.saveAsCustomOutput(TextIO.write()...)

最新更新