如何连接两个KTable并将结果KTable写入状态存储



我有两个KTable对象:

KTable<Long, byte[]> firstTable = builder.table("firstTopic", Consumed.with(Serdes.Long(), Serdes.ByteArray()));
KTable<Long, byte[]> secondTable = builder.table("secondTopic",
Consumed.with(Serdes.Long(), Serdes.ByteArray()));

之后我想加入这两个表:

firstTable.leftJoin(secondTable,
(leftValue, rightValue) -> {
try {
return utils.serializeNetwork(utils.deserializeNetwork(leftValue));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
)

所以我有两个表,我将它们连接到一个表中,我希望通过每个键将结果表存储在kafka状态存储中,但我不知道如何做到这一点。

您可以通过在leftJoin上指定Materialized参数并指定状态存储的名称来强制实体化到本地存储中。

firstTable.leftJoin(..., Materialized.as("my-store-name"));

最新更新