Apache Flink错误处理和条件处理



我是Flink的新手,已经浏览了多个网站/示例/博客。我很难正确使用运算符。基本上我有两个问题

问题1:Flink支持声明性异常处理吗?我需要处理parse/validate/。。。错误

  • 我可以使用org.apache.flink.runtime.operators.sor.ExceptionHandler或类似程序吗处理错误
  • 还是Rich/FlatMap功能是我的最佳选择?如果Rich/FlattMap是唯一的选项,那么有没有一种方法可以在Rich/FlatsMap函数中获得Stream的句柄,以便连接接收器进行错误处理

问题2:我可以有条件地连接不同的接收器吗

  • 根据键控分割流中的某些字段,我需要选择不同的接收器,我是再次分割流还是使用Rich/FlatMap来处理

我正在使用Flink 1.3.2。以下是我工作的相关部分

.....
.....
DataStream<String> eventTextStream = env.addSource(messageSource)
KeyedStream<EventPojo, Tuple> eventPojoStream = eventTextStream
// parse, transform or enrich
.flatMap(new MyParseTransformEnrichFunction())
.assignTimestampsAndWatermarks(new EventAscendingTimestampExtractor())
.keyBy("eventId");
// split stream based on eventType as different reduce and windowing functions need to be applied
SplitStream<EventPojo> splitStream = eventPojoStream
.split(new EventStreamSplitFunction());
// need to apply reduce function
DataStream<EventPojo> event1TypeStream = splitStream.select("event1Type");
// need to apply reduce function
DataStream<EventPojo> event2TypeStream = splitStream.select("event2Type");
// need to apply time based windowing function
DataStream<EventPojo> event3TypeStream = splitStream.select("event3Type");
....
....
env.execute("Event Processing");      

我在这里使用了正确的运算符吗?

更新1:

按照@alpinegizmo的建议,尝试使用ProcessFunction,但没有成功,因为它依赖于一个键控流,而在解析/验证输入之前,我没有这个键控流。我得到"InvalidProgramException:对于非复合类型,字段表达式必须等于'*'或'_'。"。

这是一个常见的用例,第一次解析/验证输入时还没有键控流,那么如何解决它呢?

谢谢你的耐心和帮助。

您忽略了一个关键的构建块。看看侧面输出。

该机制提供了一种类型安全的方式来生成任意数量的附加输出流。这可能是一种报告错误的干净方法,还有其他用途。在Flink中,1.3侧输出只能与ProcessFunction一起使用,但1.4将向ProcessWindowFunction添加侧输出。

相关内容

  • 没有找到相关文章

最新更新