我已经实现了自定义WindowAssigher
:
public class SessionWindowAssigner extends WindowAssigner<LogItem, SessionWindow> {
@Override
public Collection<SessionWindow> assignWindows(LogItem element, long timestamp) {
return Collections.singletonList(new SessionWindow(element.getSessionUid()));
}
@Override
public Trigger<LogItem, SessionWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return new SessionTrigger(60_000L);
}
@Override
public TypeSerializer<SessionWindow> getWindowSerializer(ExecutionConfig executionConfig) {
return new SessionWindow.Serializer();
}
}
,Window
:
public class SessionWindow extends Window {
private final String sessionUid;
public SessionWindow(String sessionUid) {
this.sessionUid = sessionUid;
}
public String getSessionUid() {
return sessionUid;
}
@Override
public long maxTimestamp() {
return Long.MAX_VALUE;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SessionWindow that = (SessionWindow) o;
return sessionUid.equals(that.sessionUid);
}
@Override
public int hashCode() {
return sessionUid.hashCode();
}
public static class Serializer extends TypeSerializer<SessionWindow> {
private static final long serialVersionUID = 1L;
@Override
public boolean isImmutableType() {
return true;
}
@Override
public TypeSerializer<SessionWindow> duplicate() {
return this;
}
@Override
public SessionWindow createInstance() {
return null;
}
@Override
public SessionWindow copy(SessionWindow from) {
return from;
}
@Override
public SessionWindow copy(SessionWindow from, SessionWindow reuse) {
return from;
}
@Override
public int getLength() {
return 0;
}
@Override
public void serialize(SessionWindow record, DataOutputView target) throws IOException {
target.writeUTF(record.sessionUid);
}
@Override
public SessionWindow deserialize(DataInputView source) throws IOException {
return new SessionWindow(source.readUTF());
}
@Override
public SessionWindow deserialize(SessionWindow reuse, DataInputView source) throws IOException {
return new SessionWindow(source.readUTF());
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
target.writeUTF(source.readUTF());
}
@Override
public boolean equals(Object obj) {
return obj instanceof Serializer;
}
@Override
public boolean canEqual(Object obj) {
return obj instanceof Serializer;
}
@Override
public int hashCode() {
return 0;
}
}
}
和Trigger
:
public class SessionTrigger extends Trigger<LogItem, SessionWindow> {
private final long sessionTimeout;
private final ValueStateDescriptor<Long> previousFinishTimestampDesc = new ValueStateDescriptor<>("SessionTrigger.timestamp", LongSerializer.INSTANCE, null);
public SessionTrigger(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
@Override
public TriggerResult onElement(LogItem element, long timestamp, SessionWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc);
Long previousFinishTimestamp = previousFinishTimestampState.value();
Long newFinisTimestamp = timestamp + sessionTimeout;
if (previousFinishTimestamp != null) {
ctx.deleteEventTimeTimer(previousFinishTimestamp);
}
ctx.registerEventTimeTimer(newFinisTimestamp);
previousFinishTimestampState.update(newFinisTimestamp);
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, SessionWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onProcessingTime(long time, SessionWindow window, TriggerContext ctx) throws Exception {
throw new UnsupportedOperationException("This is not processing time trigger");
}
@Override
public void clear(SessionWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> previousFinishTimestampState = ctx.getPartitionedState(previousFinishTimestampDesc);
Long previousFinishTimestamp = previousFinishTimestampState.value();
ctx.deleteEventTimeTimer(previousFinishTimestamp);
previousFinishTimestampState.clear();
}
}
用于通过超时检测会话结束,即如果最后一个事件发生在N秒前,则评估窗口函数。正如您所看到的,我将最后一个事件时间戳保存在ValueState中,因为我想在失败后恢复它。
似乎我应该在这个触发器中为保存/恢复保存点(和检查点)快照实现Checkpointed
接口,因为我不想在重新部署流的过程中丢失触发器状态。
所以,有人能告诉我如何在部署过程中正确保存SessionTrigger
触发器(可能还有相关窗口)的状态吗?
据我所知,我只需要为SessionTrigger
实现Checkpointed
接口,因为只有它有状态。正确的SessionWindow
-s和SessionWindowAssigner
怎么样?它们会在部署后自动恢复吗?还是我应该手动恢复?
取自会话窗口
private static class SessionTrigger extends Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
private static final long serialVersionUID = 1L;
private final Long sessionTimeout;
private final ValueStateDescriptor<Long> stateDesc =
new ValueStateDescriptor<>("last-seen", Long.class, -1L);
public SessionTrigger(Long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
@Override
public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
Long lastSeen = lastSeenState.value();
Long timeSinceLastEvent = timestamp - lastSeen;
ctx.deleteEventTimeTimer(lastSeen + sessionTimeout);
// Update the last seen event time
lastSeenState.update(timestamp);
ctx.registerEventTimeTimer(timestamp + sessionTimeout);
if (lastSeen != -1 && timeSinceLastEvent > sessionTimeout) {
System.out.println("FIRING ON ELEMENT: " + element + " ts: " + timestamp + " last " + lastSeen);
return TriggerResult.FIRE_AND_PURGE;
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
Long lastSeen = lastSeenState.value();
if (time - lastSeen >= sessionTimeout) {
System.out.println("CTX: " + ctx + " Firing Time " + time + " last seen " + lastSeen);
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
if (lastSeenState.value() != -1) {
ctx.deleteEventTimeTimer(lastSeenState.value() + sessionTimeout);
}
lastSeenState.clear();
}
}