我想加入两个具有自定义值的ktables。文档清楚地说明了默认类型(使用默认serdes(-https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#ktable-ktable加入
KTable<String, Long> left = ...;
KTable<String, Double> right = ...;
// Java 8+ example, using lambda expressions
KTable<String, String> joined = left.leftJoin(right,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue /* ValueJoiner */
);
但是当我使用自定义值时,我会得到一个序列化错误,并且没有用于传递自定义序列号的重载。我怎样才能做到这一点?
KTable<String, ModelA> left = ...;
KTable<String, ModelB> right = ...;
// Java 8+ example, using lambda expressions
KTable<String, ModelC> joined = left.leftJoin(right,
(leftValue, rightValue) -> new ModelC("left=" + leftValue.Name + ", right=" + rightValue.Name /* ValueJoiner */
);
我终于明白我做错了什么。错误消息有点误导:
更改StreamConfig中的默认序列号或提供正确的序列号通过方法参数
但我不想更改默认的serde,并且ktables join没有重载来传递serde。
真正的问题是,我使用stream.toTable方法创建了ktable,该方法也没有重载来传递serdes。我所做的是在前面声明ktable(使用serdes(,然后使用stream.to方法。
可能是新手犯的错误,但它就在这里。