从flink应用程序向kafka主题发布消息时出现此错误。相同的代码在我们的测试环境中工作,具有类似的生产者配置,但在生产环境中失败。我找不到这个问题的原因。
2022-04-15 16:51:37,228 thread="Sink: spend-limit-publisher-sink (2/2)#15" level=WARN logger=o.a.f.r.t.Task - Sink: spend-limit-publisher-sink (2/2)#15 (d7b4646c840c3882bb784125393b484a) switched from RUNNING to FAILED with failure cause: java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:74)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:97)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:80)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:72)
at com.ex******.streamplatform.sdk.kafka.flink.KafkaSerializationSchemaAdapter.serialize(KafkaSerializationSchemaAdapter.java:37)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)
我发现这个问题是因为水槽缺乏水印。所以时间戳默认为Long。MIN_VALUE (-9223372036854775808)
添加水印
limitedStream1.union(limitedStream2)
.assignTimestampsAndWatermarks(WatermarkStrategy.<String>forMonotonousTimestamps()
.withTimestampAssigner((id, streamRecordTimestamp) -> Instant.now().toEpochMilli()))
.map(new DataMapper())
.filter(new NotNull<>())
.addSink(connectorFactory.createProducer(String.class, EventRecord.class));