我正在使用Confluent JDBCSourCeconnector从Oracle表读取。我正在尝试使用SMT生成一个由3个串联字段组成的密钥。
transforms=createKey
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=BUS_OFC_ID_CD,SO_TYPE,SO_NO
使用上述转换,我得到了这样的东西:
{"BUS_OFC_ID_CD":"111","SO_TYPE":"I","SO_NO":"55555"}
我想要类似的东西:
111I55555
关于我如何仅加入值的任何想法?
我无法在属性文件中解决上述问题。因此,周围的工作是:
- 创建一个视图(我们已经必须这样做才能获得模式=时间戳到达使用我们的Oracle DB(
- 添加一个带有keyname的串联值 的额外字段
- 提取串联值以用作密钥。
例如:
CREATE VIEW XX_TEST_V AS
SELECT BUS_OFC_ID_CD, SO_TYPE, SO_NO, BUS_OFC_ID_CD||SO_TYPE||SO_NO as KEYNAME
FROM XX_TEST;
这将为您提供
的JSON关键信息{"KEYNAME ":"111I55555"}
剥离JSON以仅具有文本,然后在属性文件中完成
例如:
transforms=createKey,extractString
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields=KEYNAME
transforms.extractString.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.extractString.field=KEYNAME
然后,这应该给您以下作为键
"111I55555"
问候彼得