我是kafka流的新手,当我尝试对json数据进行聚合时,我面临一个问题。下面是我的流代码,我复制了代码和下面的示例输入和错误,我使用的是kafka版本:2.12。我已经尝试了几天的各种实现,现在卡在这个错误。错误,无法继续。有人能帮帮我吗?
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
Properties props=new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank_stream_001");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
final Serializer<JsonNode> jsonSerialize=new JsonSerializer();
final Deserializer<JsonNode> jsonDeserialize= new JsonDeserializer();
final Serde<JsonNode> jsonSerd= Serdes.serdeFrom(jsonSerialize, jsonDeserialize);
String inputtopic = "bank_topic1";
String outputtopic = "bankout_topic1";
StreamsBuilder builder =new StreamsBuilder();
//stream for reading
KStream<String,JsonNode> stream1=builder.stream(inputtopic, Consumed.with(Serdes.String(), jsonSerd));
//create balance node for caluclations
ObjectNode initial_balance= JsonNodeFactory.instance.objectNode();
initial_balance.put("count",0);
initial_balance.put("balance", 0);
initial_balance.put("time", Instant.ofEpochMilli(0L).toString());
stream1.peek((k,v) -> System.out.println(k+" : "+v));
KTable<String, JsonNode> bank_bal= stream1.groupByKey()
.aggregate(
() -> initial_balance,
(key,value,cur_balance) -> newbalance(value,cur_balance)
);
bank_bal.toStream().peek((k,v) -> System.out.println(k+" : "+v));
bank_bal.toStream().to(outputtopic, Produced.with(Serdes.String(), jsonSerd));
KafkaStreams streams = new KafkaStreams(builder.build(),props);
streams.cleanUp();
streams.start();
System.out.println(stream1.toString());
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static JsonNode newbalance(JsonNode transaction,JsonNode balance) {
ObjectNode newbal = JsonNodeFactory.instance.objectNode();
newbal.put("count", balance.get("count").asInt()+1);
newbal.put("balance", balance.get("balance").asInt()+transaction.get("amount").asInt());
Long balanceEpoch= Instant.parse(balance.get("time").asText()).toEpochMilli();
Long transEpoch =Instant.parse(transaction.get("time").asText()).toEpochMilli();
Instant newbalanceinstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transEpoch));
newbal.put("time", newbalanceinstant.toString());
return newbal;
}
my data in topic: bank_topic1 is
aruna {"name":"aruna","amount":58,"time":"2021-02-26T15:12:43.811Z"}
varun {"name":"varun","amount":3,"time":"2021-02-26T15:12:45.081Z"}
kali {"name":"kali","amount":16,"time":"2021-02-26T15:12:46.082Z"}
aruna {"name":"aruna","amount":80,"time":"2021-02-26T15:13:32.806Z"}
varun {"name":"varun","amount":33,"time":"2021-02-26T15:13:34.015Z"}
kali {"name":"kali","amount":30,"time":"2021-02-26T15:13:35.016Z"}
我得到错误,由于服务器。
java.lang.ClassCastException: java.lang.String cannot be cast to [B at
org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:21)
你能帮帮我吗?
我认为您需要使用KGroupedStream的重载版本。接受Materialized配置对象的聚合。使用Materialized
配置,您可以提供与状态存储一起使用的正确Serdes。
所以你可以尝试更新你的aggregate
调用如下:
.aggregate(
() -> initial_balance,
(key,value,cur_balance) -> newbalance(value,cur_balance),
Materialized.with(Serdes.String(), jsonSerd)
);
让我知道进展如何
HTH,比尔