Kafka Connect 在 300k 记录后停止



我正在尝试将我的mysql表沉入elasticsearch。我的表有 100 多万条记录。问题是我的 elasticsearch 在插入了 300 几千条记录后不再获得记录。我知道我第一次运行它时,它确实运行了所有记录。当我在删除 ES 索引后尝试再次执行此操作时,发生了这种情况。我尝试将update_ts字段重置为新的时间戳。我已经尝试了接收器中的偏移值。似乎没有任何效果。

这是我的源文件

{
"name": "items3",
"config": {
"_comment": "The JDBC connector class. Don't change this if you want to use the JDBC Source.",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"_comment": "How to serialise the value of keys - here use the Confluent Avro serialiser. Note that the JDBC Source Connector always returns null for the key ",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"_comment": "Since we're using Avro serialisation, we need to specify the Confluent schema registry at which the created schema is to be stored. NB Schema Registry and Avro serialiser are both part of Confluent Open Source.",
"key.converter.schema.registry.url": "http://localhost:8081",
"_comment": "As above, but for the value of the message. Note that these key/value serialisation settings can be set globally for Connect and thus omitted for individual connector configs to make them shorter and clearer",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",

"_comment": " --- JDBC-specific configuration below here  --- ",
"_comment": "JDBC connection URL. This will vary by RDBMS. Consult your manufacturer's handbook for more information",
"connection.url": "jdbc:mysql://localhost:3306/db?user=user&password=password",
"_comment": "Which table(s) to include",
"table.whitelist": "items",
"_comment": "Pull all rows based on an timestamp column. You can also do bulk or incrementing column-based extracts. For more information, see http://docs.confluent.io/current/connect/connect-jdbc/docs/source_config_options.html#mode",
"mode": "timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "update_ts",

"_comment": "If the column is not defined as NOT NULL, tell the connector to ignore this  ",
"validate.non.null": "true",
"_comment": "The Kafka topic will be made up of this prefix, plus the table name  ",
"topic.prefix": "kafka-",
"auto.offset.reset" : "earliest"
}
}

这是我的水槽

{
"name": "items-sink",
"config": {
"_comment": "-- standard converter stuff -- this can actually go in the worker config globally --",
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter.schema.registry.url": "http://localhost:8081",

"_comment": "--- Elasticsearch-specific config ---",
"_comment": "Elasticsearch server address",
"connection.url": "http://localhost:9200",
"_comment": "Elasticsearch mapping name. Gets created automatically if doesn't exist  ",
"type.name": "items",
"_comment": "Which topic to stream data from into Elasticsearch",
"topics": "kafka-items",
"auto.offset.reset" : "earliest",
"_comment": "If the Kafka message doesn't have a key (as is the case with JDBC source)  you need to specify key.ignore=true. If you don't, you'll get an error from the Connect task: 'ConnectException: Key is used as document id and can not be null.",
"key.ignore": "true"
}
}

如您所见,我正在尝试将 auto.offset.reset 设置为最早,因此如果它以某种方式跟踪我的记录,它将重新开始,但一切都是徒劳的。

>"auto.offset.reset" : "earliest"只能在connect-distributed.properties文件中使用,不能在 JSON 连接器配置中使用

在该文件中,由于它是使用者配置,因此将其命名为consumer.auto.offset.reset

此外,使用者组映射到连接器配置的name字段,因此除非更改,否则将继续从上一个同名组停止的地方使用,直到重置组偏移量或更改名称。默认情况下,组名称为connect-${connector_name}

最新更新