我正在创建一个简单的Apache Flink作业(使用Scala(,它只尝试打印表示RabbitMQ队列(RMQSource
(接收的事件的case类
我已经创建了自己的反序列化模式(使用Jackson(,当使用的消息实际上是表示case类的JSON时,它可以很好地工作。但是,如果队列接收到格式错误的事件(我想我们可以称之为"中毒消息"(,则该作业将失败并继续重新启动。我必须清除队列,然后作业状态更改为"正在运行"。
问题:
收到中毒消息时,如何防止作业失败?我可以在消费消息之前验证它吗?如果我可以在Rabbit中设置死信交换,我应该在哪里(如果可能的话(代表Apache Flink作为消费者进行负面确认?有更好的方法来处理这一问题,并保持作业运行,消耗下一个格式良好的消息吗?
我的自定义反序列化模式提供给RMQSource[Test]:
class eventSerializationSchema extends DeserializationSchema[Test] {
@throws(classOf[IOException])
def deserialize(message: Array[Byte]): Test = objectMapper.readValue(message, classOf[Test])
def isEndOfStream(nextElement: Test): Boolean = false
def getProducedType: TypeInformation[Test] = createTypeInformation[Test]
}
object eventSerializationSchema{
val objectMapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
}
中毒消息到达消耗队列时出错:
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'a': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"a"; line: 1, column: 2]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2337)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:720)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3593)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2688)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:870)
at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:762)
at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4684)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4586)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3609)
at org.angoglez.deserializers.eventSerializationSchema.deserialize(eventSerializationSchema.scala:17)
at org.angoglez.deserializers.eventSerializationSchema.deserialize(eventSerializationSchema.scala:14)
at org.apache.flink.streaming.connectors.rabbitmq.RMQDeserializationSchemaWrapper.deserialize(RMQDeserializationSchemaWrapper.java:47)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.processMessage(RMQSource.java:319)
at org.apache.flink.streaming.connectors.rabbitmq.RMQSource.run(RMQSource.java:331)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:116)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:73)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:323)
从我的角度来看,您有几个选择:
-
捕获反序列化方法内部引发的异常,并丢弃有害记录。
-
捕获反序列化方法中抛出的异常,并以某种方式将您想了解的有关这些有害记录的信息编码到正在生成的对象中。然后在下游流程函数中,过滤掉这些毒记录并将其发送到侧输出。
-
不要在反序列化程序中应用ObjectMapper,而是在链式进程函数中进行真正的反序列化,该函数可以直接将有害记录发送到辅助输出。