我有以下代码在本地运行,没有集群:
val count = new AtomicInteger()
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val text: DataStream[String] = env.readTextFile("file:///flink/data2")
val mapped: DataStream[Map[String, Any]] = text.map((x: String) => Map("user" -> x.split(",")(0), "val" -> x.split(",")(1)))
val pattern: ...
CEP.pattern(mapped, pattern).select(eventMap => {
println("Found: " + (patternName, eventMap))
count.incrementAndGet()
})
env.execute()
println(count)
我的数据是以下格式的CSV文件(user, val):
1,1
1,2
1,3
2,1
2,2
2,3
...
我正在尝试检测模式的事件,其中event(val=1) -> event(val=2) -> event(val=3)
。当我在一个大的输入流上运行它时,我知道流中存在一定数量的事件,我得到的检测到的事件数不一致,几乎总是少于系统中的事件数。如果我执行env.setParallelism(1)
(就像我在代码的第3行所做的那样),将检测到所有事件。
我假设问题是,当并行度> 1时,多个线程正在处理来自流的事件,这意味着当一个线程具有event(val=1) -> event(val=2)
时,event(val=3)
可能被发送到不同的线程,并且整个模式可能不会被检测到。
我在这里错过了什么吗?我不能丢失流中的任何模式,但是将并行度设置为1似乎违背了使用Flink这样的系统来检测事件的目的。
更新:我已经尝试使用:
val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))
虽然这可以防止不同用户的事件相互干扰:
1,1
2,2
1,3
这并不能阻止Flink将事件无序地发送到节点,这意味着非确定性仍然存在。
最可能的问题在于在map操作符之后应用keyBy操作符。
所以,而不是:
val mapped: KeyedStream[Map[String, Any]] = text.map(...).keyBy((m) => m.get("user"))
应该有:
val mapped: KeyedStream[Map[String, Any]] = text.keyBy((m) => m.get("user")).map(...)
我知道这是个老问题,但也许它能帮助到某人。
您是否考虑过使用userid(您的第一个值)为您的流设置键?Flink保证一个键的所有事件到达相同的处理节点。当然,只有当您想检测每个用户val=1->val=2->val=3的模式时,这才有帮助。