为什么Flink ValueState.value()有时错误地返回null?



我在Flink应用程序中遇到一个错误,其中调用myValueState.value(),在KeyedProcessFunction内,有时返回null,尽管代码中的逻辑应保证.value()返回的对象不为空。这些空值很少返回,并且在应用程序重新启动并运行之前失败的相同数据时不会再次出现。注:myValueStateValueState<java.time.LocalDateTime>类型。

<<p>更多上下文/strong>
  • 我使用Flink 1.15.2,托管在AWS Kinesis数据分析;这是错误发生的地方
  • 本地没有发生错误
  • RocksDB被用作AWS Kinesis Data Analytics上的状态存储后端
  • 我正在使用java 11的数据流API

代码

  • 在进程函数的顶部附近,我运行updateMinTimestamp
  • 这个函数,在我看来,应该确保这个值状态的值永远不应该是空的
  • 稍后在代码中,我调用getLocalDateTimeValueState函数中的minTimestamp.value(),它将在一段时间内返回null

import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class MyClass extends KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String,String>> {
private transient ObjectMapper objectMapper;
private transient ValueState<LocalDateTime> minTimestamp;

@Override
public void processElement(final Tuple2<String, byte[]> input, final KeyedProcessFunction<String, Tuple2<String, byte[]>, Tuple2<String, String>>.Context ctx, final Collector<Tuple2<String, String>> out) throws Exception {
Event maybeDeserializedEvent = deserializeBytesToEvent(input.f1);

if (maybeDeserializedEvent instanceof SuccessfullyDeserializedEvent) {
SuccessfullyDeserializedEvent event = (SuccessfullyDeserializedEvent) maybeDeserializedEvent;
System.out.printf(
"Deserialized event category '%s' for txnId '%s' with timestamp '%s'n",
event.getCategory(), event.getTxnId(), event.getTimestamp()
);

updateMinTimestamp(event.getTimestamp());

// some other stuff (processing + aggregating event, unrelated to the minTimestamp...
//....

// this value is sometimes null, which triggers a NPE when calling `toString` on it
// based on the logic of the updateMinTimestamp() method, `minTimestampValue` should never be null
LocalDateTime minTimestampValue = getLocalDateTimeValueState(minTimestamp);

// sometimes throws NPE
String minTimestampStr = minTimestampValue.toString();

// some more stuff, include ctx.out(...)
//....
}
}

@Override
public void open(Configuration configuration) {
objectMapper = new ObjectMapper();
minTimestamp = getRuntimeContext().getState(createEventTimestampDescriptor("min-timestamp", 2));
}

private ValueStateDescriptor<LocalDateTime> createEventTimestampDescriptor(String name, Integer ttl) {
ValueStateDescriptor<LocalDateTime> eventTimestampDescriptor = new ValueStateDescriptor<>(
name,
new LocalDateTimeSerializer()
);
eventTimestampDescriptor.enableTimeToLive(
StateTtlConfig
.newBuilder(Time.hours(ttl))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build()
);
return eventTimestampDescriptor;
}

private Event deserializeBytesToEvent(byte[] serializedEvent) {
SuccessfullyDeserializedEvent event = new SuccessfullyDeserializedEvent();
try {
final JsonNode node = objectMapper.readTree(serializedEvent);
event.setCategory(node.get("category").asLong());
event.setTxnId(node.get("txnId").asText());
event.setTimestamp(LocalDateTime.parse(node.get("timestamp").asText(), DateTimeFormatter.ISO_DATE_TIME));
event.setPayload(objectMapper.readTree(node.get("payload").asText()));

return event;
} catch (IOException e) {
System.out.printf(
"Failed to deserialize event with category:'%s', txnId:'%s', timestamp:'%s', payload:'%s'n",
event.getCategory(),
event.getTxnId(),
event.getTimestamp(),
event.getPayload()
);
return new UnsuccessfullyDeserializedEvent();
}
}

void updateMinTimestamp(LocalDateTime newTimestamp) {
try {
final LocalDateTime currentMinTimestamp = minTimestamp.value();
if (currentMinTimestamp == null || newTimestamp.isBefore(currentMinTimestamp)) {
minTimestamp.update(newTimestamp);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private LocalDateTime getLocalDateTimeValueState(ValueState<LocalDateTime> localDateTimeValueState) {
try {
return localDateTimeValueState.value();
} catch (IOException e) {
throw new RuntimeException("Error grabbing localdatetime from value state");
}
}

public interface Event {}


public class SuccessfullyDeserializedEvent implements Event {
private Long category;
private JsonNode payload;
private String txnId;
private LocalDateTime timestamp;

SuccessfullyDeserializedEvent() {}

// getters
Long getCategory() {
return this.category;
}
JsonNode getPayload() {
return this.payload;
}
String getTxnId() {
return this.txnId;
}
LocalDateTime getTimestamp() {
return this.timestamp;
}
// setters
void setCategory(Long category) {
this.category = category;
}
void setPayload(JsonNode payload) {
this.payload = payload;
}
void setTxnId(String txnId) {
this.txnId = txnId;
}
void setTimestamp(LocalDateTime timestamp) {
this.timestamp = timestamp;
}
}

public class UnsuccessfullyDeserializedEvent implements Event {
}
}

任何关于这个错误发生的原因,以及如何防止它的信息,非常感谢

回答我自己的问题:我相信这个错误是由我糟糕的编码实践和Flink的状态到期TTL机制引起的奇怪行为的组合引起的:

  1. 在我的代码中,我是:抓取状态,更新状态,然后在同一个processElement调用中抓取更新状态;这是低效的,你所需要做的就是抓取状态并更新它;应该不需要在更新后再次抓取
  2. Flink的TTL的行为似乎是,如果状态是在被清除的过程中,更新它,然后检索它之后可能仍然会给你一个空(即使更新类型是OnReadAndWrite);从我开始研究到现在有一段时间的差距,但我可以肯定地说,Flink的TTL机制有很多意想不到的行为

一般来说,使用Flink状态的安全模式是:在有状态进程函数开始附近获取所有状态,检查它的值,并相应地进行(即,如果所有状态都存在,则执行x,如果某些状态过期但其他状态没有,则执行y,等等)。如果Flink状态有一定的TTL,不小心可能会产生竞争条件(你的代码会抓住状态,还是会意外过期?)