kafka Cassandra接收器连接器中出现ClassCastException



我使用source作为oracle,使用sink作为cassandra。

  1. 来源补充_时间(类型号(3,0((。在源连接器中,我将kafka的字段改为liketo_char(repensis_lead_time(
  2. sink-resumnis_lead_time(键入文本(。为了修复下面的异常,我试图将sink Cassandra字段类型更改为double、int、text,但我仍然看到了cast异常

非常感谢您的帮助。

java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.String at
com.datastax.dsbulk.commons.codecs.string.StringToStringCodec.externalToInternal(StringToStringCodec.java:14) at
com.datastax.dsbulk.commons.codecs.ConvertingCodec.encode(ConvertingCodec.java:63) at
com.datastax.kafkaconnector.RecordMapper.bindColumn(RecordMapper.java:239) at
com.datastax.kafkaconnector.RecordMapper.map(RecordMapper.java:140) at
com.datastax.kafkaconnector.DseSinkTask.mapAndQueueRecord(DseSinkTask.java:226) at
com.datastax.kafkaconnector.DseSinkTask.lambda$put$1(DseSinkTask.java:132) at
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) ... 3 more 
[2020-11-12 13:03:34,022] ERROR WorkerSinkTask{id=dse-gcp-connector-mm-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179) org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception. at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561) at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322) at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224) at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192) at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177) at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227) at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at
java.util.concurrent.FutureTask.run(FutureTask.java:266) at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at
java.lang.Thread.run(Thread.java:748) Caused by: java.util.concurrent.CompletionException: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.String at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1629) ... 3 more Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be cast to java.lang.String at
com.datastax.dsbulk.commons.codecs.string.StringToStringCodec.externalToInternal(StringToStringCodec.java:14) at
com.datastax.dsbulk.commons.codecs.ConvertingCodec.encode(ConvertingCodec.java:63) at
com.datastax.kafkaconnector.RecordMapper.bindColumn(RecordMapper.java:239) at
com.datastax.kafkaconnector.RecordMapper.map(RecordMapper.java:140) at
com.datastax.kafkaconnector.DseSinkTask.mapAndQueueRecord(DseSinkTask.java:226) at
com.datastax.kafkaconnector.DseSinkTask.lambda$put$1(DseSinkTask.java:132) at
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626) ... 3 more 
[2020-11-12 13:03:34,023] ERROR WorkerSinkTask{id=dse-gcp-connector-mm-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)

水槽:

{
"name": "dse-gcp-connector-mm",
"config": {
"connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
"queryExecutionTimeout": "300",
"loadBalancing.localDc": "GCE-UC1A",
"auth.password": "*****",
"tasks.max": "3",
"topics": "mm_topic5",
"contactPoints": "uc1a-ecomdev-cas31.**.com,uc1a-ecomdev-cas30.**.com",
"auth.provider": "None",
"ignoreErrors": "false",
"topic.mm_topic5.middle_layer.mm_temp.consistencyLevel": "LOCAL_QUORUM",
"topic.mm_topic5.masterdata_middle_layer.material_master.consistencyLevel": "LOCAL_QUORUM",
"topic.mm_topic5.middle_layer.mm_temp.ttl": "864000",
"connectionPoolLocalSize": "12",
"topic.mm_topic5.codec.date": "yyyy-MM-dd",
"topic.mm_topic5.codec.timestamp": "yyyy-MM-dd HH:mm:ss",
"topic.mm_topic5.middle_layer.mm_temp.mapping": "datebucket=value.datebucket,country=value.country, brand=value.brand, mat_num=value.mat_num, sales_org=value.sales_org,plant=value.plant,cas_num=value.cas_num,mat_desc=value.mat_desc,mat_grp=value.mat_grp,mat_type=value.mat_type,old_mat_num=value.old_mat_num,prod_hier=value.prod_hier,sales_unit=value.sales_unit,strg_cond=value.strg_cond,updt_ts=value.updt_ts2,act_flg=value.act_flg,apo_rlv=value.apo_rlv,base_unit_of_msr=value.base_unit_of_msr,changed_by=value.changed_by,created_by=value.created_by,created_on=value.created_on,env_rlv=value.env_rlv,lab_mat_grp=value.lab_mat_grp,language=value.language,last_change=value.last_change,mat_ctg=value.mat_ctg, plant_desc=value.plant_desc,price_brand_ctg=value.price_brand_ctg,sys_id=value.sys_id,xplant_mat_sts=value.xplant_mat_sts ,division=value.division,prod_grp_sbu=value.prod_grp_sbu,glb_prod_hier=value.glb_prod_hier,portfolio_brand=value.portfolio_brand,xdchain_sts=value.xdchain_sts,dchain_sts=value.dchain_sts,mara_prod_hier=value.mara_prod_hier,tarif=value.tarif,brand_desc=value.brand_desc,tptm_znoinetsls=value.tptm_znoinetsls,plant_mat_sts=value.plant_mat_sts,item_ctg_grp=value.item_ctg_grp,price_ctg_grp=value.price_ctg_grp,prod_attr2=value.prod_attr2,special_proc_key=value.special_proc_key,replenis_lead_time=value.replenis_lead_time,purchase_grp=value.purchase_grp,gr_process_time=value.gr_process_time,mat_grp4=value.mat_grp4",
"name": "dse-gcp-connector-mm",
"topic.mm_topic5.masterdata_middle_layer.material_master.mapping": "country=value.country, brand=value.brand, mat_num=value.mat_num, sales_org=value.sales_org,plant=value.plant,cas_num=value.cas_num,mat_desc=value.mat_desc,mat_grp=value.mat_grp,mat_type=value.mat_type,old_mat_num=value.old_mat_num,prod_hier=value.prod_hier,sales_unit=value.sales_unit,strg_cond=value.strg_cond,updt_ts=value.updt_ts2,act_flg=value.act_flg,apo_rlv=value.apo_rlv,base_unit_of_msr=value.base_unit_of_msr,changed_by=value.changed_by,created_by=value.created_by,created_on=value.created_on,env_rlv=value.env_rlv,lab_mat_grp=value.lab_mat_grp,language=value.language,last_change=value.last_change,mat_ctg=value.mat_ctg, plant_desc=value.plant_desc,price_brand_ctg=value.price_brand_ctg,sys_id=value.sys_id,xplant_mat_sts=value.xplant_mat_sts ,division=value.division,prod_grp_sbu=value.prod_grp_sbu,glb_prod_hier=value.glb_prod_hier,portfolio_brand=value.portfolio_brand,xdchain_sts=value.xdchain_sts,dchain_sts=value.dchain_sts,mara_prod_hier=value.mara_prod_hier,tarif=value.tarif,brand_desc=value.brand_desc,tptm_znoinetsls=value.tptm_znoinetsls,plant_mat_sts=value.plant_mat_sts,item_ctg_grp=value.item_ctg_grp,price_ctg_grp=value.price_ctg_grp,prod_attr2=value.prod_attr2,special_proc_key=value.special_proc_key,replenis_lead_time=value.replenis_lead_time,purchase_grp=value.purchase_grp,gr_process_time=value.gr_process_time,mat_grp4=value.mat_grp4",
"auth.username": "sam"
}
}

来源:

{
"name": "jdbc_mm_test",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"timestamp.column.name": "UPDT_TS",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"connection.password": "****",
"tasks.max": "1",
"query": "select country||'-'||mat_num||'-'||sales_org||'-'||plant||'-'||sys_id as "id",act_flg as "act_flg",apo_rlv as "apo_rlv",base_unit_of_msr as "base_unit_of_msr",brand as "brand",cas_num as "cas_num",to_char(created_on,'YYYY-MM-DD HH24:MI:SS') as "created_on",country as "country",created_by as "created_by",changed_by as "changed_by",env_rlv as "env_rlv",lab_mat_grp as "lab_mat_grp",language as "language",to_char(last_change, 'YYYY-MM-DD HH24:MI:SS') as "last_change",mat_num as "mat_num",mat_ctg as "mat_ctg",mat_desc as "mat_desc",mat_grp as "mat_grp",mat_type as "mat_type",old_mat_num as "old_mat_num",plant as "plant",plant_desc as "plant_desc",price_brand_ctg as "price_brand_ctg",prod_hier as "prod_hier",sales_org as "sales_org",sales_unit as "sales_unit",strg_cond as "strg_cond",to_char(updt_ts, 'YYYY-MM-DD HH24:MI:SS') as "updt_ts2",updt_ts,to_char(updt_ts, 'YYYY-MM-DD') as "datebucket", sys_id as "sys_id",xplant_mat_sts as "xplant_mat_sts",division as "division",prod_grp_sbu as "prod_grp_sbu",glb_prod_hier as "glb_prod_hier",portfolio_brand as "portfolio_brand",xdchain_sts as "xdchain_sts", dchain_sts as"dchain_sts", mara_prod_hier as "mara_prod_hier",tarif as "tarif",brand_desc as "brand_desc",tptm_znoinetsls as "tptm_znoinetsls",plant_mat_sts as "plant_mat_sts" , item_ctg_grp as "item_ctg_grp", price_ctg_grp as "price_ctg_grp",prod_attr2 as "prod_attr2" , special_proc_key as "special_proc_key" , to_char(replenis_lead_time) as "replenis_lead_time" , purchase_grp as "purchase_grp" , to_char(gr_process_time) as "gr_process_time" , mat_grp4 as "mat_grp4" from material_master_enriched2",
"connection.attempts": "1000",
"transforms": "createKey,extractInt",
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"batch.max.rows": "3000",
"table.types": "query",
"mode": "timestamp",
"topic.prefix": "mm_topic5",
"transforms.extractInt.field": "id",
"connection.user": "***",
"schema.pattern": "",
"transforms.createKey.fields": "id",
"poll.interval.ms": "10000",
"name": "jdbc_mm_test",
"connection.url": "jdbc:oracle:thin:@oragen3dev.***.com:1545:gen3webd"
}
}

我使用了字符串类型的转换,然后它工作得很好

最新更新