我正在尝试根据键生成计数,使用以下代码,此代码基于单词计数示例。奇怪的是,如果MapValues函数返回在字符串上,则组件如注释行所述工作,但是当我将字符串的按键作为键,而genericRecord作为值。
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url","http://localhost:8081");
stringSerde.configure(serdeConfig, true); // `true` for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<String, GenericRecord> textLines =
builder.stream("ora-query-in",Consumed.with(stringSerde, valueGenericAvroSerde));
final KTable<String, Long> wordCounts = textLines
.mapValues(new ValueMapperWithKey<String, GenericRecord, KeyValue<String, GenericRecord>>() {
@Override
public KeyValue<String, GenericRecord> apply(String arg0, GenericRecord arg1) {
return new KeyValue<String, GenericRecord>(arg1.get("KEY_FIELD").toString(),arg1);
}
})
// .groupBy((key, value) -> value) //THIS WORKS if value is STRING
// .groupBy((key, value) -> key) //DOES NOT WORK EITHER
.groupByKey() //THIS does nothing
.count();
wordCounts.toStream().to("test.topic.out",Produced.with(stringSerde, longSerde));
我错过了配置中的东西
streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
您没有写出到底出了什么问题,但这似乎是Serialization
您可以使用:
-
KStream::groupBy(final KeyValueMapper<? super K, ? super V, KR> selector, final Grouped<KR, V> grouped)
。
someStream.groupByKey((key, value) -> value, Grouped.with(newKeySerdes, valueSerdes)
-
KGroupedStream::count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized)
someGroupedStream.count(Materialized.with(newKeySerdes, valueSerdes)
可能是:
kafka流2.1.1级别铸造时,刷新定时聚合
kafkastreams:获取窗口最终结果