Hi我尝试在PostgreSQL作为源到SQL Server作为目标之间建立Kafka连接管道。我使用了3个Kafka代理,需要消耗252个主题(一个主题与一个PostgreSQL表相同(。运行一个多小时后,它只能从252张桌子中拉出218张。我发现的错误是SQL Server中存在死锁机制,可以将事务保存到SQL Server并尝试重试,而且Debezium复制槽也存在。
我使用分布式连接器,水槽上最多有3个工人,但可能这似乎还不够。也可以尝试使用更高的offset.time_out.ms到60000以及更高的偏移分区(100(。恐怕这不是我想要的生产水平。有人能对这个案子提出建议吗?有没有计算来决定我需要的最佳工人人数?
更新
我犯了一些错误。我看到一些连接器坏了。有人告诉我死锁发生在SQL SERVER:中
[2020-03-26 15:06:28,494] ERROR WorkerSinkTask{id=sql_server_sink_XXA-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
更新2020年4月14日
我仍然有这个问题,我忘记告诉我如何部署连接器。现在我使用两个工人,一个用于源,一个用作汇。我在csv中列出了我的所有表和pk,并在行中循环以创建连接器,而无需睡眠或等待每分钟。我还使用了单主题分区和每个主题3个副本。但我仍然有sql服务器连接死锁
问题可能是在同一时间用多个任务访问同一SQL表并导致同步问题,如您所提到的死锁
由于您已经有大量的主题,并且您的连接器可以并行访问它们,我建议您将每个主题的分区数减少到1(Kafka不支持减少分区数,因此您应该删除并用新的分区数重新创建每个主题(
这样,每个主题都只有一个分区;每个分区只能在一个线程(/task/customer(中访问,因此没有机会对同一个表进行并行SQL事务。
或者,更好的方法是创建一个包含3个分区的主题(与您拥有的任务/使用者数量相同(,并让生产者使用SQL表名称作为消息键
Kafka保证具有相同密钥的消息总是去往同一个分区,因此具有相同表的所有消息都将驻留在一个分区上(单线程消耗(。
如果你觉得它很有用,我可以附上更多关于如何创建Kafka Producer和发送密钥消息的信息。