复杂聚合



我在一个主题中有数据需要在多个级别上计数,所有代码和文章都只提到字数统计示例。

数据的示例是:

型号: 123国家: 美国日期: 01/05/2018州: 纽约州市: 纽约市访客: 5

型号: 123国家: 美国日期: 01/06/2018州: 纽约州市: 皇后区访客: 10

型号: 456日期: 01/06/2018国家: 美国州: 纽约州市: 皇后区访客: 27

型号: 123日期: 01/06/2018国家: 美国州: 纽约州市: 纽约市访客: 867

我已经完成了过滤器,分组,但聚合?对不起Java 8和mix,我更喜欢8,但同时学习它

KTable<String, CountryVisitorModel> countryStream1 = inStream
    .filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
    .groupBy((key, value) -> value.serial)
    .aggregate(
            new Initializer<CountryVisitorModel>() {
            public CountryVisitorModelapply() {
                return new CountryVisitorModel();
            }
        },
        new Aggregator<String, InputModel, CountryVisitorModel>() {
            @Override
            public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {
    aggregate.serial = value.serial;
    aggregate.country_name = value.country_name;
    aggregate.city_name = value.city_name;
    aggregate.country_count++;
    aggregate.city_count++;
    aggregate.ip_count++;
        //
    return aggregate;
       }
},
Materialized.with(stringSerde, visitorSerde));

对于所有相等的serial_id(这将是分组依据(计算每个访客总数:

连续国家州城市total_num_visitors

如果每条记录只贡献一个计数,我建议branch()流并按子流计数:

KStream stream = builder.stream(...)
KStream[] subStreams = stream.branch(...);
// each record of `stream` will be contained in exactly _one_ `substream`
subStream[0].grouByKey().count(); // or aggregate() instead of count()
subStream[1].grouByKey().count();
// ...

如果分支不起作用,因为单个记录需要进入多个计数,您可以"广播"并过滤:

KStream stream = builder.stream(...)
// each record in `stream` will be "duplicated" and sent to all `filters`
stream.filter(...).grouByKey().count(); // or aggregate() instead of count()
stream.filter(...).grouByKey().count();
// ...

多次使用相同的KStream对象并应用多个运算符(在我们的例子中filter(),每条记录将被"广播"给所有运算符(。请注意,在这种情况下,不会物理复制记录(即对象(,而是使用相同的输入记录对象来调用每个filter()

您可以将字段值保留在一个集合中,并使用 Set#size 方法获取每个键的计数。

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.junit.jupiter.api.Test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
public class SomeTest {
    public static class VisitorDetails {
        public long serial;
        public String country;
        public long date;
        public String state;
        public String city;
        public long visitors;
    }
    public static class Aggregate {
        public Set <String> countrySet = new HashSet <>();
        public long countryCounter;
        public Set <String> citySet = new HashSet <>();
        public long cityCounter;
        public long totalVisitorCounter = 0;
    }
    public static class CustomSerializer<T> implements Serializer <T> {
        private static final Charset CHARSET = StandardCharsets.UTF_8;
        static private final Gson gson = new Gson();

        @Override
        public byte[] serialize(String topic, T data) {
            String line = gson.toJson(data);
            return line.getBytes(CHARSET);
        }
    }
    public static class CustomDeserializer<T> implements Deserializer <T> {
        private static final Charset CHARSET = StandardCharsets.UTF_8;
        static private final Gson gson = new Gson();
        private final Class <T> tClass;
        public CustomDeserializer(Class <T> tClass) {
            this.tClass = tClass;
        }
        @Override
        public T deserialize(String topic, byte[] data) {
            try {
                String person = new String(data, CHARSET);
                return gson.fromJson(person, tClass);
            } catch (Exception e) {
                throw new IllegalArgumentException("Deserialization failed:", e);
            }
        }
    }
    public static class AggregateSerde implements Serde <Aggregate> {
        @Override
        public Serializer <Aggregate> serializer() {
            return new CustomSerializer <Aggregate>();
        }
        @Override
        public Deserializer <Aggregate> deserializer() {
            return new CustomDeserializer <Aggregate>(Aggregate.class);
        }
    }
    public static class VisitorDetailsSerde implements Serde <VisitorDetails> {
        @Override
        public Serializer <VisitorDetails> serializer() {
            return new CustomSerializer <VisitorDetails>();
        }
        @Override
        public Deserializer <VisitorDetails> deserializer() {
            return new CustomDeserializer <VisitorDetails>(VisitorDetails.class);
        }
    }
    @Test
    void test() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input", Consumed.with(Serdes.Long(), new VisitorDetailsSerde()))
                .groupByKey(Grouped.with(Serdes.Long(), new VisitorDetailsSerde()))
                .aggregate(
                        Aggregate::new,
                        (key, value, agg) -> {
                            agg.countrySet.add(value.country);
                            agg.countryCounter = agg.countrySet.size();
                            agg.citySet.add(value.city);
                            agg.cityCounter = agg.citySet.size();
                            agg.totalVisitorCounter += value.visitors;
                            return agg;
                        },
                        Materialized. <Long, Aggregate, KeyValueStore <Bytes, byte[]>>as("store-name-2")
                                .withKeySerde(Serdes.Long())
                                .withValueSerde(new AggregateSerde())
                                .withLoggingDisabled() // only for testing,
                                // recommended to not disable on prod as it provides fault tolerance
                );
        Topology topology = builder.build();
        
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
        TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
        TestInputTopic <Long, VisitorDetails> inputTopic = testDriver.createInputTopic("input",
                Serdes.Long().serializer(), new CustomSerializer <VisitorDetails>());
        inputTopic.pipeInput(123L, visitorDetail(123L, "usa", "ny", 10L));
        inputTopic.pipeInput(123L, visitorDetail(123L, "usa", "la", 20L));
        inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "krk", 30L));
        inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "wrs", 40L));
        inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "krk", 50L));
        testDriver.getAllStateStores();
        KeyValueStore <Long, Aggregate> keyValueStore = testDriver. <Long, Aggregate>getKeyValueStore("store-name-2");
        assertThat(keyValueStore.get(123L).cityCounter).isEqualTo(4);
        assertThat(keyValueStore.get(123L).countryCounter).isEqualTo(2);
        assertThat(keyValueStore.get(123L).totalVisitorCounter).isEqualTo(150);
        testDriver.close();
    }
    private VisitorDetails visitorDetail(long serial, String country, String city, long visitors) {
        VisitorDetails visitorDetails = new VisitorDetails();
        visitorDetails.serial = serial;
        visitorDetails.country = country;
        visitorDetails.city = city;
        visitorDetails.visitors = visitors;
        return visitorDetails;
    }
}

相关内容

  • 没有找到相关文章

最新更新