我在一个主题中有数据需要在多个级别上计数,所有代码和文章都只提到字数统计示例。
数据的示例是:
型号: 123国家: 美国日期: 01/05/2018州: 纽约州市: 纽约市访客: 5
型号: 123国家: 美国日期: 01/06/2018州: 纽约州市: 皇后区访客: 10
型号: 456日期: 01/06/2018国家: 美国州: 纽约州市: 皇后区访客: 27
型号: 123日期: 01/06/2018国家: 美国州: 纽约州市: 纽约市访客: 867
我已经完成了过滤器,分组,但聚合?对不起Java 8和mix,我更喜欢8,但同时学习它
KTable<String, CountryVisitorModel> countryStream1 = inStream
.filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
.groupBy((key, value) -> value.serial)
.aggregate(
new Initializer<CountryVisitorModel>() {
public CountryVisitorModelapply() {
return new CountryVisitorModel();
}
},
new Aggregator<String, InputModel, CountryVisitorModel>() {
@Override
public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {
aggregate.serial = value.serial;
aggregate.country_name = value.country_name;
aggregate.city_name = value.city_name;
aggregate.country_count++;
aggregate.city_count++;
aggregate.ip_count++;
//
return aggregate;
}
},
Materialized.with(stringSerde, visitorSerde));
对于所有相等的serial_id(这将是分组依据(计算每个访客总数:
连续国家州城市total_num_visitors
如果每条记录只贡献一个计数,我建议branch()
流并按子流计数:
KStream stream = builder.stream(...)
KStream[] subStreams = stream.branch(...);
// each record of `stream` will be contained in exactly _one_ `substream`
subStream[0].grouByKey().count(); // or aggregate() instead of count()
subStream[1].grouByKey().count();
// ...
如果分支不起作用,因为单个记录需要进入多个计数,您可以"广播"并过滤:
KStream stream = builder.stream(...)
// each record in `stream` will be "duplicated" and sent to all `filters`
stream.filter(...).grouByKey().count(); // or aggregate() instead of count()
stream.filter(...).grouByKey().count();
// ...
多次使用相同的KStream
对象并应用多个运算符(在我们的例子中filter()
,每条记录将被"广播"给所有运算符(。请注意,在这种情况下,不会物理复制记录(即对象(,而是使用相同的输入记录对象来调用每个filter()
。
您可以将字段值保留在一个集合中,并使用 Set#size 方法获取每个键的计数。
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.junit.jupiter.api.Test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
public class SomeTest {
public static class VisitorDetails {
public long serial;
public String country;
public long date;
public String state;
public String city;
public long visitors;
}
public static class Aggregate {
public Set <String> countrySet = new HashSet <>();
public long countryCounter;
public Set <String> citySet = new HashSet <>();
public long cityCounter;
public long totalVisitorCounter = 0;
}
public static class CustomSerializer<T> implements Serializer <T> {
private static final Charset CHARSET = StandardCharsets.UTF_8;
static private final Gson gson = new Gson();
@Override
public byte[] serialize(String topic, T data) {
String line = gson.toJson(data);
return line.getBytes(CHARSET);
}
}
public static class CustomDeserializer<T> implements Deserializer <T> {
private static final Charset CHARSET = StandardCharsets.UTF_8;
static private final Gson gson = new Gson();
private final Class <T> tClass;
public CustomDeserializer(Class <T> tClass) {
this.tClass = tClass;
}
@Override
public T deserialize(String topic, byte[] data) {
try {
String person = new String(data, CHARSET);
return gson.fromJson(person, tClass);
} catch (Exception e) {
throw new IllegalArgumentException("Deserialization failed:", e);
}
}
}
public static class AggregateSerde implements Serde <Aggregate> {
@Override
public Serializer <Aggregate> serializer() {
return new CustomSerializer <Aggregate>();
}
@Override
public Deserializer <Aggregate> deserializer() {
return new CustomDeserializer <Aggregate>(Aggregate.class);
}
}
public static class VisitorDetailsSerde implements Serde <VisitorDetails> {
@Override
public Serializer <VisitorDetails> serializer() {
return new CustomSerializer <VisitorDetails>();
}
@Override
public Deserializer <VisitorDetails> deserializer() {
return new CustomDeserializer <VisitorDetails>(VisitorDetails.class);
}
}
@Test
void test() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input", Consumed.with(Serdes.Long(), new VisitorDetailsSerde()))
.groupByKey(Grouped.with(Serdes.Long(), new VisitorDetailsSerde()))
.aggregate(
Aggregate::new,
(key, value, agg) -> {
agg.countrySet.add(value.country);
agg.countryCounter = agg.countrySet.size();
agg.citySet.add(value.city);
agg.cityCounter = agg.citySet.size();
agg.totalVisitorCounter += value.visitors;
return agg;
},
Materialized. <Long, Aggregate, KeyValueStore <Bytes, byte[]>>as("store-name-2")
.withKeySerde(Serdes.Long())
.withValueSerde(new AggregateSerde())
.withLoggingDisabled() // only for testing,
// recommended to not disable on prod as it provides fault tolerance
);
Topology topology = builder.build();
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
TestInputTopic <Long, VisitorDetails> inputTopic = testDriver.createInputTopic("input",
Serdes.Long().serializer(), new CustomSerializer <VisitorDetails>());
inputTopic.pipeInput(123L, visitorDetail(123L, "usa", "ny", 10L));
inputTopic.pipeInput(123L, visitorDetail(123L, "usa", "la", 20L));
inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "krk", 30L));
inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "wrs", 40L));
inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "krk", 50L));
testDriver.getAllStateStores();
KeyValueStore <Long, Aggregate> keyValueStore = testDriver. <Long, Aggregate>getKeyValueStore("store-name-2");
assertThat(keyValueStore.get(123L).cityCounter).isEqualTo(4);
assertThat(keyValueStore.get(123L).countryCounter).isEqualTo(2);
assertThat(keyValueStore.get(123L).totalVisitorCounter).isEqualTo(150);
testDriver.close();
}
private VisitorDetails visitorDetail(long serial, String country, String city, long visitors) {
VisitorDetails visitorDetails = new VisitorDetails();
visitorDetails.serial = serial;
visitorDetails.country = country;
visitorDetails.city = city;
visitorDetails.visitors = visitors;
return visitorDetails;
}
}