我想在Flink 1.4.0中与以下代码相匹配的CEP图案:
DataStream<Event> input = inputFromSocket.map(new IncomingMessageProcessor()).filter(new FilterEmptyAndInvalidEvents());
DataStream<Event> inputFiltered = input.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator());
KeyedStream<Event, String> partitionedInput = inputFiltered.keyBy(new MyKeySelector());
Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
.where(new ActionCondition("action1"))
.followedBy("middle").where(new ActionCondition("action2"))
.followedBy("end").where(new ActionCondition("action3"));
pattern = pattern.within(Time.seconds(30));
PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
Event
只是POJO
public class Event {
private UUID id;
private String action;
private String senderID;
private long occurrenceTimeStamp;
......
}
从我的自定义来源(Google pubsub(中提取的。第一个过滤器FilterEmptyAndInvalidEvents()
只是用于格式不正确的事件的过滤器。但这在这种情况下不会发生。由于记录输出,我可以验证这一点。因此,每个事件都通过MyKeySelector.getKey()
方法运行。
BoundedOutOfOrdneressGenerator
仅提取一个字段的时间戳:
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {
private static Logger LOG = LoggerFactory.getLogger(BoundedOutOfOrdernessGenerator.class);
private final long maxOutOfOrderness = 5500; // 5.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(Event element, long previousElementTimestamp) {
long timestamp = element.getOccurrenceTimeStamp();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
Watermark newWatermark = new Watermark(currentMaxTimestamp - maxOutOfOrderness);
return newWatermark;
}
}
MyKeySelector
只是在字段中提取一个字符串值:
public class MyKeySelector implements KeySelector<Event, String> {
private static Logger LOG = LoggerFactory.getLogger(MyKeySelector.class);
@Override
public String getKey(Event value) throws Exception {
String senderID = value.getSenderID();
LOG.info("Partioning event {} by key {}", value, senderID);
return senderID;
}
}
ActionCondition
在这里只是对事件中一个字段进行比较,看起来像这样:
public class ActionCondition extends SimpleCondition<Event> {
private static Logger LOG = LoggerFactory.getLogger(ActionCondition.class);
private String filterForCommand = "";
public ActionCondition(String filterForCommand) {
this.filterForCommand = filterForCommand;
}
@Override
public boolean filter(Event value) throws Exception {
LOG.info("Filtering event for {} action: {}", filterForCommand, value);
if (value == null) {
return false;
}
if (value.getAction() == null) {
return false;
}
if (value.getAction().equals(filterForCommand)) {
LOG.info("It's a hit for the {} action for event {}", filterForCommand, value);
return true;
} else {
LOG.info("It's a miss for the {} action for event {}", filterForCommand, value);
return false;
}
}
}
不幸的是,当启动作业并发送应与模式匹配的事件时,它们会正确接收和分区,但CEP模式不匹配。
为例,我发送以下事件:
- Action1
- Action2
- Action3
在Flink作业的日志输出中,我看到事件正在通过MyKeySelector.getKey()
方法正确运行,因为我在此处添加了日志记录输出。因此,这些事件似乎在流中正确出现,但不幸的是它们与模式不匹配。
记录输出看起来像这样:
FilterEmptyAndInvalidEvents - Letting event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 through
MyKeySelector - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents - Letting event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 through
MyKeySelector - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
FilterEmptyAndInvalidEvents - Letting event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 through
MyKeySelector - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::27ef8d25-8c3b-43fc-a228-fa0dda8e564d --- action: start, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448701 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::18b45a9c-b837-4b61-acf3-0b545a097203 --- action: click, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448702 by key RHHLWUi8sXH33AJIAAAA
MyKeySelector - Partioning event Event::fe1486ab-d702-421d-be32-98dd38a1d306 --- action: connect, sender: RHHLWUi8sXH33AJIAAAA, timestamp: 1518194448703 by key RHHLWUi8sXH33AJIAAAA
timecharacteristic通过
设置为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
,事件包含正确的时间戳。
如果我现在发送了另外3个活动(但是带有新的时间戳等(
。- Action1
- Action2
- Action3
与的第一个集合的模式匹配。我知道它是第一个事件匹配的,因为我为调试目的标记了每个事件,并且我将其打印为一个人的匹配。
在第三,第四次发送时,在这3个事件中,始终匹配以前的事件。因此,模式检测中似乎存在某种"偏移"。但是,这似乎不是时间问题,因为如果我在发送后等待很长时间,也不匹配第一组事件(并且看到事件被Flink划分(。
我的代码有什么问题,为什么flink只始终将上一组事件与模式匹配?
我确实对其进行了整理 - 我一直在流源搜索,但是我的活动处理实际上完全可以。问题是,我的 Watermark 不会连续发生。正如您在上面的代码中看到的那样,我只在收到事件时才生成水印。
但是在发送前3个事件之后,我的设置中没有更多的事件。因此,
没有新的水印。并且由于没有时间戳大于序列的最后一个接收事件的时间戳,因此Flink确实从未处理元素。之所
重要句子是:
...当水印到达时,处理时间戳小于水印的所有元素将被处理。
因此,由于我正在以5.5秒的延迟在BoundedOutOfOrdernessGenerator
中生成水印,因此最新的水印始终是最后一个事件时间戳之前的5.5秒。因此,事件从未处理过。
因此,一种解决方案是定期生成对发生事件的特定迟到的水印。为了做到这一点,我们需要为executionConfig设置setAutoWatermarkInterval
:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
..
ExecutionConfig executionConfig = env.getConfig();
executionConfig.setAutoWatermarkInterval(1000L);
这使Flink能够在给定时间(在本例中为每一秒(定期调用水印发电机,然后拉出新的水印。
此外,我们需要调整时间戳/水印发电机,以便它发出新的时间戳,即使没有新事件流入。为此,我操纵了带有flink船的界面timestampextractor.java:
>public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<Event> {
private static final long serialVersionUID = 1L;
/** The current maximum timestamp seen so far. */
private long currentMaxTimestamp;
/** The timestamp of the last emitted watermark. */
private long lastEmittedWatermark = Long.MIN_VALUE;
/**
* The (fixed) interval between the maximum seen timestamp seen in the records
* and that of the watermark to be emitted.
*/
private final long maxOutOfOrderness;
public BoundedOutOfOrdernessGenerator() {
Time maxOutOfOrderness = Time.seconds(5);
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " + "lateness to " + maxOutOfOrderness
+ ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness;
}
public long getMaxOutOfOrdernessInMillis() {
return maxOutOfOrderness;
}
/**
* Extracts the timestamp from the given element.
*
* @param element The element that the timestamp is extracted from.
* @return The new timestamp.
*/
public long extractTimestamp(Event element) {
long timestamp = element.getOccurrenceTimeStamp();
return timestamp;
}
@Override
public final Watermark getCurrentWatermark() {
Instant instant = Instant.now();
long nowTimestampMillis = instant.toEpochMilli();
long latenessTimestamp = nowTimestampMillis - maxOutOfOrderness;
if (latenessTimestamp >= currentMaxTimestamp) {
currentMaxTimestamp = latenessTimestamp;
}
// this guarantees that the watermark never goes backwards.
long potentialWM = currentMaxTimestamp - maxOutOfOrderness;
if (potentialWM >= lastEmittedWatermark) {
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(Event element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) {
currentMaxTimestamp = timestamp;
}
return timestamp;
}
}
您可以在getCurrentWatermark()
中看到,我采用当前的时期时间戳,减去我们期望的最大迟到,然后从此时间戳创建一个水印。
在一起,Flink现在每秒拉动一个新的时间戳,并且水印总是落后5秒。这允许在收到最后一个事件后最多5秒钟与定义的模式匹配事件。
如果这对您的方案有效,则取决于您的情况,因为这也意味着在flink收到的时间点年龄大于5秒(比水印少5秒(的事件被丢弃并且不再处理。