在Kafka流中,目的是什么:
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
。考虑到塞尔德斯在builder
中被召唤:
final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "TextLinesTopic");
。在to
:
wordCounts.to(stringSerde, longSerde, "WordsWithCountsTopic");
在我的代码中,我删除了上述两个props.put
调用,一切似乎仍然正常工作。另外,请注意,我们不会将 Serdes.Long() 传递给VALUE_SERDE_CLASS_CONFIG,即使它在调用中使用to
。
从融合样本中提取的代码片段。
它为所有未指定 serde 的操作设置默认键或值 serde。
例如,您可以在不指定 serdes 的情况下创建一个流,例如
final KStream<String, String> textLines = builder.stream("TextLinesTopic");
在这种情况下,Streams 将使用在主题"TextLinexTopic"
的配置中指定为键/值 Serde 的任何内容。(顺便说一句,这两个参数都重命名为default.key.serde
并在即将到来的0.11
中default.value.serde
,以使其语义更清晰。
对于您链接到的示例:我没有仔细检查,但是如果所有运算符都指定了Serdes,则实际上不需要配置。