如何在KStream(Kafka Streams)中加入列表的每个元素



例如,我有一个购物车的KStream,每个购物车都有一个产品id列表。此外,还有一个带有产品的KStream。我该如何将它们连接在一起?

public class ShoppingCart {
List<ProductKey> productKeys;
}
public class Product {
ProductKey key;
String name;
}
public class ProductKey {
String id;
}
KStream<String, ShoppingCart> shoppingCartKStream;
KStream<ProductKey, Product> productKStream;

我想要的结果看起来像这个

KStream<String, ShoppingCartWithProducts> joinedStream;
public class ShoppingCartWithProducts {
List<Product> products;
}

有没有一种简单的方法来存档?

编辑:我知道有办法,但我觉得太复杂了。简单地说:

  1. 我需要平面地图购物车KStream

  2. 然后我可以将结果与产品KStream 结合起来

  3. 对中间结果进行分组和聚合

  4. 最后加入购物车KStream

    KStream<String, ProductKey> productKeyStream = shoppingCartKStream
    .flatMap((key, shoppingCart) -> shoppingCart.productKeys.stream()
    .map(productKey -> KeyValue.pair(key, productKey))
    .collect(Collectors.toList())
    );
    KTable<String, Product> productStreamWithShoppingCartKey = productKeyStream.toTable()
    .join(
    productKStream.toTable(),
    productKey -> productKey,
    (productKey, product) -> product
    );
    KTable<String, ArrayList<Product>> productListStream = productStreamWithShoppingCartKey
    .groupBy(KeyValue::pair)
    .aggregate(
    (Initializer<ArrayList<Product>>) ArrayList::new,
    (key, value, aggregate) -> addProductToList(aggregate, value),
    (key, value, aggregate) -> removeProductFromList(aggregate, value)
    );
    KStream<String, ShoppingCartWithProducts> shoppingCartWithProductsKStream = shoppingCartKStream.join(
    productListStream,
    (shoppingCart, productList) -> new ShoppingCartWithProducts(productList)
    );
    

当然它非常简单,我还需要处理tombstone等。

定义StreamsBuilder之后,它是Streams DSL的入口点。

StreamsBuilder builder = new StreamsBuilder();

您可以使用JoinWindows.of(Duration.ofMinutes(5))进行5分钟窗口的联接。您必须使用相同类型的两个流的密钥,否则kafka-stream无法比较不同类型的密钥。这就像一个数据库连接。因此,我将String用于ShoppingCartProduct。然后.join(...操作符匹配相同密钥的事件,您就可以构建新的事件ShoppingCartWithProducts

KStream<String, ShoppingCart> shoppingCartKStream = ...;
KStream<String, Product> productKStream = ...;
shoppingCartKStream.join(productKStream,
(shop, prod) -> {
log.info("ShoppingCart: {} with Product: {}", shop, prod);
ShoppingCartWithProducts shoppingCartWithProducts = new ShoppingCartWithProducts();
shoppingCartWithProducts.setShoppingCart(shop);
shoppingCartWithProducts.setProduct(prod);
return shoppingCartWithProducts;
},
JoinWindows.of(Duration.ofMinutes(5)),
StreamJoined.with(Serdes.String(),
new JsonSerde<>(ShoppingCart.class),
new JsonSerde<>(Product.class)))
.foreach((k, v) -> log.info("ShoppingCartWithProducts ID: {}, value: {}", k, v));

你可以在这里找到更详细的信息。

最新更新