Confluent kafka connect elasticsearch document ID creation



>我正在使用 confluent 来连接我的数据库和 ES,得到异常:

org.apache.kafka.connect.errors.DataException: STRUCT is not supported as the document id.
    at io.confluent.connect.elasticsearch.DataConverter.convertKey(DataConverter.java:75)
    at io.confluent.connect.elasticsearch.DataConverter.convertRecord(DataConverter.java:84)
    at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:210)
    at io.confluent.connect.elasticsearch.ElasticsearchSinkTask.put(ElasticsearchSinkTask.java:119)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:429)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我在 kafka-connect-JDBC 中的配置是:

name=task-view-list-stage
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=10 
connection.url=jdbc:postgresql://localhost:5432/postgres?user=postgres&password=test
table.types=TABLE
query=select * from employee_master
mode=timestamp+incrementing
incrementing.column.name=employee_master_id
timestamp.column.name=modified_date
validate.non.null=false
topic.prefix=my-id-app

我的kafka-connect Elasticsearch配置是:

name=es-id-view
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=my-id-app
topics.key.ignore=false
transforms=InsertKey
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id
connection.url=http://localhost:9200
type.name=type_id

我的表结构为:

employee_master_id | emp_name | modified_date
-----------------------------------------------------------
1                  |  Bala    |  "2017-05-18 11:51:46.721182+05:30"
-------------------------------------------------------------------
2                  |  murugan | "2017-05-21 15:59:11.443901+05:30"
-------------------------------------------------------------------

请帮助我解决此问题

以及ValueToKey您需要ExtractField将键从对象转换为纯字段:

transforms=InsertKey,ExtractId
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=employee_master_id    
transforms.ExtractId.type=org.apache.kafka.connect.transforms.ExtractField$Key
transforms.ExtractId.field=employee_master_id

相关内容

  • 没有找到相关文章

最新更新