任何遇到这种情况的人都应该始终是非负的或空的.从flink应用程序发布事件到kafka主题时出错? &



从flink应用程序向kafka主题发布消息时出现此错误。相同的代码在我们的测试环境中工作,具有类似的生产者配置,但在生产环境中失败。我找不到这个问题的原因。

2022-04-15 16:51:37,228 thread="Sink: spend-limit-publisher-sink (2/2)#15" level=WARN  logger=o.a.f.r.t.Task - Sink: spend-limit-publisher-sink (2/2)#15 (d7b4646c840c3882bb784125393b484a) switched from RUNNING to FAILED with failure cause: java.lang.IllegalArgumentException: Invalid timestamp: -9223372036854775808. Timestamp should always be non-negative or null.
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:74)
at org.apache.kafka.clients.producer.ProducerRecord.<init>(ProducerRecord.java:97)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:80)
at com.ex******.streamplatform.sdk.kafka.flink.ProducerFunction$Tuple2ProducerFunction.apply(ProducerFunction.java:72)
at com.ex******.streamplatform.sdk.kafka.flink.KafkaSerializationSchemaAdapter.serialize(KafkaSerializationSchemaAdapter.java:37)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:907)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:684)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:639)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:623)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.base/java.lang.Thread.run(Unknown Source)

我发现这个问题是因为水槽缺乏水印。所以时间戳默认为Long。MIN_VALUE (-9223372036854775808)

添加水印

limitedStream1.union(limitedStream2)
            .assignTimestampsAndWatermarks(WatermarkStrategy.<String>forMonotonousTimestamps()
                    .withTimestampAssigner((id, streamRecordTimestamp) -> Instant.now().toEpochMilli()))
            .map(new DataMapper())
            .filter(new NotNull<>())
            .addSink(connectorFactory.createProducer(String.class, EventRecord.class));
           

最新更新