使用Debezium 1.5中的过滤器变换



我想过滤我创建的源连接器中的一些记录。该镜像包含/kafka/connect/debezium-connector-mysql中的debezium-scripting-1.5.0.Beta1.jar(使用连接基础镜像的"ENABLE_DEBEZIUM_SCRIPTING=true"属性启用)。

我的连接器有以下属性:

"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter"

在注册源连接器时,它无法使用以下stacktrace配置连接器:

Caused by: java.lang.ClassNotFoundException: io.debezium.config.EnumeratedValue
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:471)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)

消息很清楚:我没有找到EnumeratedValue类(这是debezium核心包的一部分)。但是当我将"transforms"属性更改为"unwrap"时,就不再有错误了。

这个方法对我有效。
我有<some-path>/debezium/debezium-connector-mongo/文件夹,和
plugin.path=<some-path>/debezium
当我启动连接器时,它工作正常。
然而,当我添加debezium-script插件,其中在<some-path>/debezium/debezium-script/文件夹(在相同的plugin.path),出现错误。

因此,我将<some-path>/debezium/debezium-script/中的任何插件移动到<some-path>/debezium/debezium-connector-mongo/中(将两个文件夹合并为一个),这有效。

最新更新