IBMCDC-Kafka的数据复制(LiveAudit上的自定义)



我们使用IBM AS400作为源表。我想做的是:

  1. 使用描述性column_name(如"收盘价"(而不是system_column_name("CPHKD001"(
  2. 将时间戳格式"2020-03-10 18:25:311.123456000000"转换为"2020-03-1T18:25:311.123Z">

我认为我应该处理文件KcopLiveAuditSingleRowIntegrated.java。对于时间戳,它可能有解决方案,但我不认为我能在那里找到答案。

KCOP基础设施允许您对Kafka生产者记录进行程序控制。对于源上的每个操作,您可以确定有多少消息被写入Kafka,它们也包含哪些主题,以及密钥和值字节是什么。

在KCOP中,您可以使用java将时间戳重新定义为任何您喜欢的格式。这是因为每次调用createProducerRecords都会为您提供一个建议的Avro通用记录,该记录具有一个模式,允许您标识表、列及其类型。

根据开源Avro文档中记录的Avro Generic Record行为,您可以选择感兴趣的相关值,并使用转换后的值创建一个新的Avro通用记录。然后将这个新的Avro Generic记录传递到KCOP的其余部分。

请注意,审计KCOP包括这样做的代码,单行Avro审计KCOP就是一个很好的例子。您可以在产品安装的samples.jar文件中找到我们所有集成KCOP的代码。

单行Avro审核KCOP获取Avro之前和之后的泛型记录,并生成一个新的Avro泛型记录,该记录是它们的组合。如果在复制值时检查了列的类型,则可以识别时间戳,并更改放入新的复合Avro泛型记录中的值。

然而,根据以下链接,我们确实提供了一些预格式化的灵活性

https://www.ibm.com/support/knowledgecenter/en/SSTRGZ_11.4.0/com.ibm.cdcdoc.mcadminguide.doc/refs/mirror_timestamp_write_format_Kafka.html

注意,要使用此可选的预格式化程序,您需要"要启用此参数,必须将mirror_write_format参数设置为DYNAMIC。">

您将注意到,可以设置一个数据存储参数,该参数将允许对KCOP接收到的时间戳数据进行通常所需的自定义。

"AVRO(默认(

将TIMESTAMP列值格式化为从1970年1月1日(ISO日历(开始的微秒数。为了进行此计算,时间戳假定为UTC。

对于精度大于微秒的TIMESTAMP列,值的格式为字符串。您可以使用timestamp_format数据存储参数指定字符串格式。时间戳_格式的默认值为yyyy-MM-dd HH:MM:ss.SSSNNNnnnppp.

TIMESTAMP WITH TIMEZONE列值的格式为字符串。可以使用timestamp_tz_format数据存储参数指定字符串格式。时间戳_ tz_format的默认值为yyyy-MM-dd HH:MM:ss.SSSNNNnnnppp T。">

如果这不能为您提供所需的确切格式,那么您可以选择最接近的格式并修改KCOP中时间戳列的值。

修改KCOP的步骤如下。。。

https://www.ibm.com/support/knowledgecenter/en/SSTRGZ_11.4.0/com.ibm.cdcdoc.mcadminguide.doc/tasks/createkafkacop.html

关于列名的问题也是一样的。如果使用的KCOP是使用Avro模式注册表,那么您可以通过程序更改在模式注册表中注册的模式。如果是JSON字符串,您可以在创建JSON字符串后更改它,或者在调用Avro to JSON方法之前进行更改。

或者,如果您的源支持派生列,我相信在管理控制台中,您可以定义一个具有新名称的派生列,该名称只是原始列的值。我当时的记忆是,你可以取消选择原来的栏,这样你的名字就会被更改。

相关内容

  • 没有找到相关文章

最新更新