我正在研究POC,我必须读取csv文件并将其插入ms sql server。 我创建了以下配置,但我正在以下删除:
(由以下原因引起:org.apache.kafka.connect.errors.ConnectException:值架构的类型必须为 Struct
配置如下:
1. 示例 csv 文件数据:
id,record1,record2,record3,created
1,1772056014794065487,160842,20668578,9999-12-31
2,1772056014794065487,160842,20668578,9999-12-31
3,1772056014794065487,160842,20668578,9999-12-31
4,1772056014794065487,160842,20668578,9999-12-31
5,1772056014794065487,160842,20668578,9999-12-31
2. 文件源连接器
{"name":"file-source",
"config":
{
"connector.class":"FileStreamSource",
"tasks.max":"1",
"file":"/tmp/my-connect-test.dat",
"topic":"connect-test",
"name":"file-source"},
"tasks":[{"connector":"file-source","task":0}],
"type":"source"}
3. JDBC 接收器连接器:
{"name":"test-sink",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics":"connect-test",
"topic.prefix":"connect-test",
"insert.mode":"insert",
"table.name.format":"dz.temp_data",
"pk.mode":"record_value",
"pk.fields":"id",
"incrementing.column.name":"id",
"table.whitelist":"dz.temp_data",
"mode":"incrementing",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter.schema.registry.url":"http://localhost8081",
"connection.url":"jdbc:sqlserver://**;databaseName=**;username=**;password=***",
"name":"test-sink"},
"tasks":[{"connector":"test-sink","task":0}],
"type":"sink"}
4. MS SQL 表:
CREATE TABLE dz.temp_data (
id INTEGER IDENTITY(1,1) NOT NULL PRIMARY KEY,
record1 VARCHAR(255) NOT NULL,
record2 VARCHAR(255) NOT NULL,
record3 VARCHAR(255) NOT NULL,
record4 VARCHAR(255) NOT NULL,
created VARCHAR(255) NOT NULL
);
如果我通过 avro 消费者测试主题,我得到正确的输出。
kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning
但是我在插入ms sql数据库时遇到异常。
请帮助解决这个问题。提前谢谢。
如果您使用的是 JDBC 接收器,则必须有一个架构。CSV 文件没有架构。因此,您需要在此过程中定义一个。
若要了解有关序列化和架构的详细信息,请参阅此处。
您在这里有几个选择:
-
使用可以在摄取时应用架构的连接器(例如 kafka-connect-spooldir(。如果这样做,请确保在 Kafka Connect 源配置中使用 AvroConverter 或 JSONConverter 和 value.converter
schemas.enable=true
。您可以在此处查看此操作的示例。 -
使用 ksqlDB 应用您的架构,然后以更合适的格式(例如 Avro(将数据重新序列化为其他主题,并使用该主题填充数据库。例如:
-- Declare a schema for the existing topic CREATE STREAM SOURCE_DATA (id INT, record1 VARCHAR, record2 VARCHAR, record3 VARCHAR, record4 VARCHAR, record5 VARCHAR) WITH (KAFKA_TOPIC='connect-test', VALUE_FORMAT='DELIMITED'); -- Write a new Kafka topic that serialises all the data -- from the first topic to a new one, in Avro CREATE STREAM SOURCE_DATA_RESERIALISED WITH (KAFKA_TOPIC='connect-test_avro', VALUE_FORMAT='AVRO') AS SELECT * FROM SOURCE_DATA;