如何将 MySql 表数据集成到 Ksql 流或表中?



我正在尝试构建从MySqlKsql的数据管道。

用例:数据源是 MySql。我在MySql中创建了一个表。

我正在使用

./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties  ./etc/kafka-connect-jdbc/source-quickstart-sqlite.properties 

以启动独立连接器。而且它工作正常。

我以主题名称启动消费者,即

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test1Category --from-beginning

当我在 MySQL 表中插入数据时,我也在消费者中得到结果。我已经创建了具有相同主题名称的 KSQL 流。我也期待在我的 Kstream 中获得相同的结果,但是当我这样做时我没有得到任何结果

select * from <streamName>

连接器配置 - 源-快速入门-mysql.properties

name=jdbc_source_mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:mysql://localhost:3306/testDB?user=root&password=cloudera
#comment=Which table(s) to include
table.whitelist=ftest
mode=incrementing
incrementing.column.name=id
topic.prefix=ftopic

示例数据

  • MySql

1.(创建数据库:

CREATE DATABASE testDB;

2.( 使用数据库:

USE testDB;

3.( 创建表:

CREATE TABLE products (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
description VARCHAR(512),
weight FLOAT
);

4.( 将数据插入表中:

INSERT INTO products(id,name,description,weight)
VALUES (103,'car','Small car',20);
  • KSQL

1.(创建流:

CREATE STREAM pro_original (id int, name varchar, description varchar,weight bigint) WITH 
(kafka_topic='proproducts', value_format='DELIMITED');

2.( 选择查询:

Select * from pro_original;

预期输出

  1. 消费者

获取插入到 MySQL 表中的数据。

在这里,我正在获取MySQL中的数据。

  1. Ksql

应填充插入 Mysql 表中并反映在 Kafka 主题中的流内数据。

我在 ksql 中没有得到预期的结果

帮助我处理此数据管道。

您的数据采用AVRO格式,但采用VALUE_FORMAT,而不是您定义DELIMITEDAVRO。必须指示 KSQL 主题中存储的值的格式。以下应该可以为您解决问题。

CREATE STREAM pro_original_v2  
WITH (KAFKA_TOPIC='products', VALUE_FORMAT='AVRO');

执行插入 kafka 主题的数据

SELECT * FROM pro_original_v2;

现在应该在 KSQL 控制台窗口中可见。

您可以在此处查看 KSQL 中的一些 Avro 示例。

最新更新