使用 kafka-connect 读取 CSV 文件并接收到 ms sql 数据库中?



我正在研究POC,我必须读取csv文件并将其插入ms sql server。 我创建了以下配置,但我正在以下删除:

(由以下原因引起:org.apache.kafka.connect.errors.ConnectException:值架构的类型必须为 Struct

配置如下:

1. 示例 csv 文件数据:

id,record1,record2,record3,created
1,1772056014794065487,160842,20668578,9999-12-31
2,1772056014794065487,160842,20668578,9999-12-31
3,1772056014794065487,160842,20668578,9999-12-31
4,1772056014794065487,160842,20668578,9999-12-31
5,1772056014794065487,160842,20668578,9999-12-31

2. 文件源连接器

{"name":"file-source",
"config":
{
"connector.class":"FileStreamSource",
"tasks.max":"1",
"file":"/tmp/my-connect-test.dat",
"topic":"connect-test",
"name":"file-source"},
"tasks":[{"connector":"file-source","task":0}],
"type":"source"}

3. JDBC 接收器连接器:

{"name":"test-sink",
"config":   {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics":"connect-test",
"topic.prefix":"connect-test", 
"insert.mode":"insert",
"table.name.format":"dz.temp_data",
"pk.mode":"record_value",
"pk.fields":"id",
"incrementing.column.name":"id",
"table.whitelist":"dz.temp_data",
"mode":"incrementing",
"key.converter.schemas.enable":"false",
"value.converter.schemas.enable":"false",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter.schema.registry.url":"http://localhost8081",
"connection.url":"jdbc:sqlserver://**;databaseName=**;username=**;password=***",
"name":"test-sink"},
"tasks":[{"connector":"test-sink","task":0}],
"type":"sink"}

4. MS SQL 表:

CREATE TABLE dz.temp_data (
id INTEGER IDENTITY(1,1) NOT NULL PRIMARY KEY,
record1 VARCHAR(255) NOT NULL,
record2 VARCHAR(255) NOT NULL,
record3 VARCHAR(255) NOT NULL,
record4 VARCHAR(255) NOT NULL,
created VARCHAR(255) NOT NULL
);

如果我通过 avro 消费者测试主题,我得到正确的输出。

kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic connect-test --from-beginning

但是我在插入ms sql数据库时遇到异常。

请帮助解决这个问题。提前谢谢。

如果您使用的是 JDBC 接收器,则必须有一个架构。CSV 文件没有架构。因此,您需要在此过程中定义一个。

若要了解有关序列化和架构的详细信息,请参阅此处。

您在这里有几个选择:

  1. 使用可以在摄取时应用架构的连接器(例如 kafka-connect-spooldir(。如果这样做,请确保在 Kafka Connect 源配置中使用 AvroConverter 或 JSONConverter 和 value.converterschemas.enable=true。您可以在此处查看此操作的示例。

  2. 使用 ksqlDB 应用您的架构,然后以更合适的格式(例如 Avro(将数据重新序列化为其他主题,并使用该主题填充数据库。例如:

    -- Declare a schema for the existing topic
    CREATE STREAM SOURCE_DATA (id INT, record1 VARCHAR, record2 VARCHAR, record3 VARCHAR, 
    record4 VARCHAR, record5 VARCHAR) 
    WITH (KAFKA_TOPIC='connect-test', VALUE_FORMAT='DELIMITED');
    -- Write a new Kafka topic that serialises all the data 
    -- from the first topic to a new one, in Avro
    CREATE STREAM SOURCE_DATA_RESERIALISED WITH (KAFKA_TOPIC='connect-test_avro',
    VALUE_FORMAT='AVRO') AS 
    SELECT * FROM SOURCE_DATA;
    

相关内容

  • 没有找到相关文章

最新更新