我正在使用Table/SQL API
作业操作:
- 从Kafka中读取并使用SQL查找,以丰富并追加SQL表
- 在打乱SQL表之后,将相同的事件流到Kafka Sink
但是MySQL中的追加和Kafka中的插入是并行发生的,但我们想要一个像下面一样的串行顺序
我们想要的流量:LOOKUP KAFKA和SQL->UPSERT SQL->插入KAFKA
我们获得的流量:LOOKUP KAFKA和SQL->UPSERT SQL//插入KAFKA(Parralrel(
这不是一个简单的方法。接收器是作业图中的终端节点。
但是,您可以使用async i/o操作符来执行upstart,并安排它在upstart完成后仅向kafka接收器发送下游事件。
或者,您可以有第二个作业,从SQL中获取CDC流并插入到kafka中。