我在KSQL中创建表或流时遇到问题。
我已经制作了官方示例中显示的所有内容,我不明白为什么我的代码不起作用。示例来自https://docs.confluent.io/current/ksql/docs/tutorials/examples.html#joining:
CREATE TABLE pageviews_per_region_per_session AS
SELECT regionid,
windowStart(),
windowEnd(),
count(*)
FROM pageviews_enriched
WINDOW SESSION (60 SECONDS)
GROUP BY regionid;
现在我的代码:
我试着在命令prom中运行select,它运行得很好:
SELECT count(*) as attempts_count, "computer", (WINDOWSTART() / 1000) as row_time
FROM LOG_FLATTENED
WINDOW TUMBLING (SIZE 20 SECONDS)
WHERE "event_id" = 4625
GROUP BY "computer"
HAVING count(*) > 2;
但是,当我尝试基于这个选择(来自ksql命令行工具(创建表时:
CREATE TABLE `incorrect_logins` AS
SELECT count(*) as attempts_count, "computer", (WINDOWSTART() / 1000) as row_time
FROM LOG_FLATTENED
WINDOW TUMBLING (SIZE 20 SECONDS)
WHERE "event_id" = 4625
GROUP BY "computer"
HAVING count(*) > 2;
我得到一个错误-io.confluent.ksql.util.KsqlStatementException: Column COMPUTER cannot be resolved.
但是这个列存在并且select不带create table
语句就可以完美地工作。
我使用的是最新稳定的KSQL图像(confluentinc/cp-ksql-server:5.3.1
(
首先,我为我糟糕的英语道歉,如果我要说的话不够清楚,请毫不犹豫地回复我,我会尽力用更好的方式解释我。
我对KSQL了解不多,但我会根据我创建类似TABLE的STREAM的经验来帮助您。
1( 正如您可能知道的,除非您指定相反的内容,否则KSQL会将所有内容处理为UpperCase。
2( KSQL不支持CREATE查询中SELECT中的双引号,事实上,KSQL会忽略这些字符,并将您的字段作为UpperCase列处理,因此,在返回给您的错误中,出现COMPUTER而不是"COMPUTER"。
此问题的解决方法是:
-
首先,创建一个带有lowerCase字段的空表:
CREATE TABLE"correct_logins"("attempts_count"整数,"computer"VARCHAR,"row_time"整数(WITH(KAFKA_TOPIC='TOPIC_that_you_want',VALUE_FORMAT='avro'(
(如果主题不存在,则必须先创建它(
-
一旦创建了表,就可以使用SELECT查询在表中插入数据:
INSERT INTO"correct_logins"SELECT count((为"attempts_count","computer",(WINDOWSTART((/1000(为"row_time"来自LOG_flattend窗户翻滚(尺寸20秒(其中"event_id"=4625按"计算机"分组计数((>2;
希望它能帮助你!