在 Kafka ksqlDB 上联接两个表时出错"Invalid join condition: table-table joins require to join on the primary ke



我需要从其他九个主题的组合中创建一个Kafka主题,它们都是由Debezium PostgreSQL源连接器生成的,格式为AVRO。首先,我尝试(到目前为止还没有成功)组合来自两个主题的字段。

那么,首先基于"REQUEST"创建一个sqldb表主题:

ksql> CREATE TABLE TB_REQUEST (ID STRUCT<REQUEST_ID BIGINT> PRIMARY KEY)
WITH (KAFKA_TOPIC='REQUEST', FORMAT='AVRO');

对我来说一切都很好:

ksql> DESCRIBE TB_REQUEST;
Name                 : TB_REQUEST
Field       | Type
-----------------------------------------------------------------------------------------------------------------------
ID          | STRUCT<REQUEST_ID BIGINT> (primary key)
BEFORE      | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>
AFTER       | STRUCT<REQUEST_ID BIGINT, REQUESTER_ID INTEGER, STATUS_ID>

SOURCE      | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>
OP          | VARCHAR(STRING)
TS_MS       | BIGINT
TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>
-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

然后从"EMPLOYEE"创建另一个表主题:

ksql> CREATE TABLE TB_EMPLOYEE (ID STRUCT<EMPLOYEE_ID INT> PRIMARY KEY)
WITH (KAFKA_TOPIC='EMPLOYEE', FORMAT='AVRO');

又一次,一切似乎都很好。

ksql> DESCRIBE TB_EMPLOYEE;
Name                 : TB_EMPLOYEE
Field       | Type                                                       
-----------------------------------------------------------------------------------------------------------------------
ID          | STRUCT<EMPLOYEE_ID INTEGER> (primary key)
BEFORE      | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>

AFTER       | STRUCT<EMPLOYEE_ID INTEGER, NAME VARCHAR(STRING), HIRING_DATE DATE>

SOURCE      | STRUCT<VERSION VARCHAR(STRING), CONNECTOR VARCHAR(STRING), NAME VARCHAR(STRING), TS_MS BIGINT, SNAPSHOT VARCHAR(STRING), DB VARCHAR(STRING), SEQUENCE VARCHAR(STRING), SCHEMA VARCHAR(STRING), TABLE VARCHAR(STRING), TXID BIGINT, LSN BIGINT, XMIN BIGINT>
OP          | VARCHAR(STRING)
TS_MS       | BIGINT
TRANSACTION | STRUCT<ID VARCHAR(STRING), TOTAL_ORDER BIGINT, DATA_COLLECTION_ORDER BIGINT>
-----------------------------------------------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

但是通过尝试创建我的目标表,通过Employee Id连接以前的表。

ksql> CREATE TABLE REQUEST_EMPLOYEE AS 
SELECT RQ.ID->REQUEST_ID, RQ.AFTER->REQUESTER_ID, RQ.AFTER->STATUS_ID, EM.ID->EMPLOYEE_ID, EM.AFTER->NAME AS REQUESTER
FROM TB_REQUEST RQ
JOIN TB_EMPLOYEE EM ON RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID;

我得到了以下错误:

Could not determine output schema for query due to error: Invalid join condition: table-table joins require to join on the primary key of the right input table. Got RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID.
Statement: CREATE TABLE REQUEST_EMPLOYEE WITH (KAFKA_TOPIC='REQUEST_EMPLOYEE', PARTITIONS=1, REPLICAS=1) AS SELECT
RQ.ID->REQUEST_ID REQUEST_ID,
RQ.AFTER->REQUESTER_ID REQUESTER_ID,
RQ.AFTER->STATUS_ID STATUS_ID,
EM.ID->EMPLOYEE_ID EMPLOYEE_ID,
EM.AFTER->NAME REQUESTER
FROM TB_REQUEST RQ
INNER JOIN TB_EMPLOYEE EM ON ((RQ.AFTER->REQUESTER_ID = EM.ID->EMPLOYEE_ID))
EMIT CHANGES;

查看"DESCRIBE TB_EMPLOYEE"命令,在我看来&;EM.ID-> employee_id &;是正确的选择。我错过了什么?

提前感谢。

ksqlDB version in 0.21.0

我认为你应该在你的连接语句中使用至少一个行键,在以前的KsqlDB版本中,连接表的唯一方法是通过行键,在你当前的0.21.0版本中,可以使用外键。

检查以下示例:

CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.u_id EMIT CHANGES;

其中u_id被定义为主键,因此是rowkey。

CREATE TABLE users (
u_id VARCHAR PRIMARY KEY
name VARCHAR
) WITH (
kafka_topic = 'users',
partitions = 3,
value_format = 'json'
);

下面的句子类似

CREATE TABLE orders_with_users AS
SELECT * FROM orders JOIN users ON orders.u_id = users.ROWKEY EMIT CHANGES;

另一个观察结果是,sqldb认为tb_employee的键为STRUCT

相关内容

最新更新