KSQL从表中选择*并没有返回任何结果,提示也并没有响应



我使用kafkaconnect将MySQL表中的行流式传输到kafka主题中。这很有效。

然后我用创建一个表

CREATE TABLE mytable (id INT, email VARCHAR, gender VARCHAR, first_name VARCHAR, last_name VARCHAR) WITH (KAFKA_TOPIC='mysql-my-table', VALUE_FORMAT='AVRO', KEY='id');

这也有效,正如我可以通过以下操作来确认的:

LIST TABLES; 
DESCRIBE EXTENDED mytable;

我看到mytable

问题是当我执行时

SELECT * FROM mytable;

然后我没有得到任何结果,提示也没有响应,我必须按ctrl+c才能恢复控制。

可能是什么问题?

经过一段时间的尝试和阅读文档后,我发现了问题。

我的主题中消息的字段id的类型是?INT,并且根据KSQL TABLES文档,它们需要是VARCHAR类型

所以我按照这里描述的步骤解决了这个问题,现在一切都很好。

因此步骤如下:

-- Create a stream on the original topic
CREATE STREAM users_with_wrong_key_format (userid INT, username VARCHAR, email VARCHAR)
WITH (KAFKA_TOPIC='users', VALUE_FORMAT='JSON');
-- Derive a new stream with the required key changes.
-- 1) The CAST statement converts the key to the required format.
-- 2) The PARTITION BY clause re-partitions the stream based on the new, converted key.
CREATE STREAM users_with_proper_key
WITH(KAFKA_TOPIC='users-with-proper-key') AS
SELECT CAST(userid as VARCHAR) as userid_string, username, email
FROM users_with_wrong_key_format
PARTITION BY userid_string;
-- Now you can create the table on the properly keyed stream.
CREATE TABLE users_table (userid_string VARCHAR, username VARCHAR, email VARCHAR)
WITH (KAFKA_TOPIC='users-with-proper-key',
VALUE_FORMAT='JSON',
KEY='userid_string');
-- Now you can create the table on the properly keyed stream.
CREATE TABLE users_table (userid_string VARCHAR, username VARCHAR, email VARCHAR)
WITH (KAFKA_TOPIC='users-with-proper-key',
VALUE_FORMAT='JSON',
KEY='userid_string');

相关内容

  • 没有找到相关文章

最新更新