Kafka 连接 - 审核 - 在任务完成时触发事件



我们正在用kafka构建一个异常管理工具。将有源连接器 - 它将从物理文件中提取记录。另一方面,将有sink connect(mongodb-sinkconnect),它将从主题中提取记录并将其推送到mongoDb。一切正常。

我们需要在不同的主题中捕获事件(用于审核目的)。 诸如此类的事件,

  1. 源任务(文件轮询任务)启动事件 示例,如果文件 A 收到
  2. 源任务(文件轮询任务)结束事件 示例,如果文件 A 已完全处理
  3. 接收器任务(将记录推送到 mongodb 任务)启动事件 例如,文件 A 的记录由 mongodb-connect 开始处理
  4. 接收器任务(将记录推送到 mongodb 任务)结束事件 例如,文件 A 的记录完全推送到 MongoDB

我在这里有几个问题: 1.我们能够通过在SourceTask中实例化一个KafkaProduceer来将事件发送到不同的主题,一旦文件被完全处理,我们就会发送一个事件

public class FileSourceTask extends SourceTask {
private Producer<Key, Event> auditProducer;
public void start(Map<String, String> props) {
auditProducer = new KafkaProducer<Key, Event>(auditProps);
}
public List<SourceRecord> poll() {
List<SourceRecord> results = this.filePoller.poll();
if(results.isEmpty() && eventNotSentForCurrentFile) {
Event event = new Event();
auditProducer.send(
new ProducerRecord<Key, Event>(this.props.get("event.topic"), key, event));
}
// futher processing  
}

上述方法是否合用?

  1. 上述解决方案工作正常 - 因为它运行在一个任务(maxTasks = 1),但对于我们的用例,在接收器任务(mongoDB connect)中实现这一点非常困难。由于本主题已分区,因此将创建许多任务。我们无法跟踪接收器任务的开始事件和结束事件

请提出解决此问题的方法。

非常感谢。

我认为,你可以围绕Kafka-connect ReST API构建一些东西。

https://docs.confluent.io/current/connect/restapi.html#get--连接器-(字符串名称)-状态

但是有了这个,您需要让观察者保持连接器状态,一旦连接器的所有任务都完成,您就可以采取行动。

相关内容

  • 没有找到相关文章

最新更新