我正在使用kafka-> flink->弹性搜索。
在Kafka上,将产生从0到数千个事件/sec(例如特定主题)的数量不可预测的事件。
。{"gid":"abcd-8910-2ca4227527f9", "state":"stateA", "timestamp:1465566255, "other unusefull info":"..."}
flink将消耗此事件,并应每秒沉入弹性搜索每个状态中的事件数量:
{"stateA":54, "stateB":100, ... "stateJ":34}
我有10个状态:[Created, ... , Deleted]
,平均生命周期为15分钟。国家可以改变两次。理论上可以添加新状态。
为了每秒接收流,我想使用Flink的时间Windows https://flink.apache.org/news/2015/12/12/04/introducing-windows.html
问题是我需要具有有关guid->previous-state
和stateX->count
的信息的状态对象,以便能够在发生新事件时增加/减少计数。
我找到了有关状态蒸汽处理的文件草案,https://cwiki.apache.org/confluence/display/flink/stateful stateful screteam processing
我是Flink和流处理的新手,我还没有挖掘Flink Flink Flink Fell Stream Processing。对于第一阶段,我正在考虑为此使用静态对象,但是当启动多个flink实例时,这种方法将无法使用。
我想问你:
- 您如何看待这种方法?
- Flink适合这种流处理?
- 您解决此问题的方法是什么?
还要感谢一些窗户状态流解决方案(或其他解决方案)的代码段。
谢谢,
如以下内容如何?
它使用15分钟的窗户,然后将清理窗口状态。它还使用自定义触发器,每秒每秒评估窗口。就窗口操作而言,有一个简单功能,仅保留每个GUID的最新状态,以及一个放射A(状态为1)元组的窗口功能。然后,我们按照这种状态进行键入并总结。我认为这应该给您您想要的结果。
val env = StreamExecutionEnvironment.getExecutionEnvironment()
val stream = env.addSource(new FlinkKafkaProducer(...))
val results = stream
.keyBy(_.guid)
.timeWindow(Time.minutes(15))
.trigger(ProcessingTimeTriggerWithPeriodicFirings(1000))
.apply(
(e1, e2) => e2,
(k, w, i, c: Collector[(String, Long)]) => {
if (i.head != null) c.collect((i.head.state, 1))
}
)
.keyBy(0)
.timeWindow(Time.seconds(1))
.sum(1)
.addSink(new ElasticsearchSink<>(...))
env.execute("Count States")
processingTimetriggerwithperiodicFirings定义如下:
object ProcessingTimeTriggerWithPeriodicFirings {
def apply(intervalMs: Long) = {
new ProcessingTimeTriggerWithPeriodicFirings(intervalMs)
}
}
class ProcessingTimeTriggerWithPeriodicFirings(intervalMs: Long)
extends Trigger[Event, TimeWindow] {
private val startTimeDesc =
new ValueStateDescriptor[Long]("start-time", classOf[Long], 0L)
override def onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
val startTime = ctx.getPartitionedState(startTimeDesc)
if (startTime.value == 0) {
startTime.update(window.getStart)
ctx.registerProcessingTimeTimer(window.getEnd)
ctx.registerProcessingTimeTimer(System.currentTimeMillis() + intervalMs)
}
TriggerResult.CONTINUE
}
override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
if (time == window.getEnd) {
TriggerResult.PURGE
}
else {
ctx.registerProcessingTimeTimer(time + intervalMs)
TriggerResult.FIRE
}
}
override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
TriggerResult.CONTINUE
}
}