Kafka分布式连接产生重复消息



运行环境


  • 三个服务器
  • 三个Kafka broker, connect, schema-registry (confluent-7.1.0)
  • 一个ftp连接器用于测试(3个任务)

问题
  • 连接产生重复消息。但是,我希望ftp连接器每个文件发出一条消息。

分布式连接产生相同消息三次(每个连接任务一条消息)

  • 下面是连接器任务产生消息时的日志。
  • 打印每个连接进程日志
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] poll (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:77)
[2022-06-26 15:23:12,839] INFO [ftp-test-conn|task-0] connect 10.0.0.138:None (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:294)
[2022-06-26 15:23:12,862] INFO [ftp-test-conn|task-0] successfully connected to the ftp server and logged in (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:311)
[2022-06-26 15:23:12,863] INFO [ftp-test-conn|task-0] passive we are (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:318)
[2022-06-26 15:23:12,870] INFO [ftp-test-conn|task-0] Found 4 items in /home/smheo/ftp-dir/* (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:245)
[2022-06-26 15:23:12,877] INFO [ftp-test-conn|task-0] meta store storage HASN'T /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.ConnectFileMetaDataStore:48)
[2022-06-26 15:23:12,878] INFO [ftp-test-conn|task-0] fetching /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:102)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] fetched /home/smheo/ftp-dir/msg-4, wasn't known before (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:218)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] dump entire /home/smheo/ftp-dir/msg-4 (com.datamountaineer.streamreactor.connect.ftp.source.FtpMonitor:219)
[2022-06-26 15:23:12,881] INFO [ftp-test-conn|task-0] got some fileChanges: /home/smheo/ftp-dir/msg-4, offset = -1 (com.datamountaineer.streamreactor.connect.ftp.source.FtpSourcePoller:96)

消费者消费相同的消息

(base) ubuntu@ubuntu:~/distributed-pipeline/confluent-7.1.0$ ./bin/kafka-console-consumer --bootstrap-server <BROKER_IP>:9092 --topic default-topic-1
hello
hello
hello
<<h2> FTP连接器/h2>
{
"ftp-test-conn": {
"info": {
"name": "ftp-test-conn",
"config": {
"connector.class": "com.datamountaineer.streamreactor.connect.ftp.source.FtpSourceConnector",
"connect.ftp.address": "<FTP HOST IP>",
"connect.ftp.keystyle": "string",
"compression.type": "gzip",
"connect.ftp.user": "ftpusername",
"connect.ftp.refresh": "PT1M",
"tasks.max": "3",
"connect.ftp.file.maxage": "P7D",
"name": "ftp-test-conn",
"connect.ftp.monitor.update": "/home/username/ftp-dir/:default-topic-1",
"connect.ftp.timeout": "3000000",
"connect.ftp.password": "<PASSWORD>"
},
"tasks": [
{
"connector": "ftp-test-conn",
"task": 0
},
{
"connector": "ftp-test-conn",
"task": 1
},
{
"connector": "ftp-test-conn",
"task": 2
}
],
"type": "source"
},
"status": {
"name": "ftp-test-conn",
"connector": {
"state": "RUNNING",
"worker_id": "<BROKER 1 IP>:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "<BROKER 1 IP>:8083"
},
{
"id": 1,
"state": "RUNNING",
"worker_id": "<BROKER 2 IP>:8083"
},
{
"id": 2,
"state": "RUNNING",
"worker_id": "<BROKER 3 IP>:8083"
}
],
"type": "source"
}
}
}

每个任务很可能读取同一个文件。尝试只设置tasks.max=1。更具体地说,FTP客户端之间没有文件系统锁定(每个任务启动它自己的连接),因此您将只被限制为一个读取器任务。

仔细查看日志,可以在[ftp-test-conn|task-0]

中看到任务ID。另外,不建议在代理所在的主机上运行Connect。

最新更新