卡夫卡如何分组两次?



我想创建一个条形图,显示图像中有多少像素颜色;图像每3秒更新一次,因此我的条形图也会更新。

我有一个收集 JSON 对象的主题,该对象以键作为映像创建日期,值是十六进制值(例如 #FFF)。

我想按键分组,所以它是按图像分组,然后按每个组的十六进制值分组并执行 .count()。

你是怎么做到的?

我在想streams.groupByKey()...然后按十六进制值分组,但我需要将 KTable 转换为 KStream...

更新

很抱歉我在手机上输入时缺乏解释。 我将尝试再次解释。

顺便说一下,我改变了一些东西。如果你想阅读我在做什么,这是我的github:https://github.com/Lilmortal。

  • 我的项目"HexGraph-source-connector"拾取任何图像 指定目录,并将图像路径推送到主题。
  • "HexGraph"项目拾取它,使用Akka,演员将获得 所有像素十六进制代码单独并开始推送消息 到另一个话题。
  • "HexGraph-stream"是我的kafka流部分。

但它很长,我怀疑你会读它,哈哈。

无论如何,我从一个主题中阅读,我收到这样的消息{图像路径:{hexCode:#fff}}。 图像路径是键,十六进制代码是值。我可以有一到多的imagePaths,所以我的想法是我的前端将有一个websocket来拾取它。它将显示一个图像,并且它上面有一个条形图,其中包含像素颜色代码的数量。例如,有 4 个 #fff、28 个 #fef 等。

因此,我想按 imagePath 分组,然后我想计算该 imagePath 的每个像素。

例如:

  • {imagePath1: {hexCode: #fff, count: 47}}
  • {imagePath1: {hexCode: #fef, count: 61}}
  • {imagePath2: {hexCode: #fff, count: 23}}
  • {imagePath2: {hexCode: #fef, count: 55}}

所以这里 imagePath1 有 47 个 #fff,imagePath2 有 23 个 #fff。

这就是我试图做的自动取款机。

也许在分组之前通过组合键选择?像这样:

SteamsBuilder topology = new StreamsBuilder();
topology.stream("input")
.selectKey((k, v) -> k + v.hex)
.groupByKey()
.count()

这不会分组两次,但会得到你想要的效果。


评论后更新

class Image {
public String imagePath;
}
class ImageAggregation {
public String imagePath;
public int count;
}
class ImageSerde implements Serde<Image> {
// implement
}
class ImageAggregationSerde implements Serde<ImageAggregation> {
// implement   
}
KTable<String, ImageAggregation> table = topology
.stream("input", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
.groupBy((k, v) -> v.imagePath)
.aggregate(ImageAggregation::new,
(k, v, agg) -> {
agg.imagePath = v.imagePath;
agg.count = agg.count + 1;
return agg;
}, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageAggregationSerde());

更新后更新2

class ImageHex {
public String imagePath;
public String hex;
}
class ImageHexAggregation {
public String imagePath;
public Map<String, Integer> counts;
}
class ImageHexSerde implements Serde<ImageHex> {
// implement
}
class ImageHexAggregationSerde implements Serde<ImageHexAggregation> {
// implement   
}
KTable<String, ImageHexAggregation> table = topology
.stream("image-hex-observations", Consumed.with(new org.apache.kafka.common.serialization.Serdes.LongSerde(), new ImageSerde()))
.groupBy((k, v) -> v.imagePath)
.aggregate(ImageHexAggregation::new,
(k, v, agg) -> {
agg.imagePath = v.imagePath;
Integer currentCount = agg.counts.getOrDefault(v.hex, 0)
agg.counts.put(v.hex, currentCount + 1));
return agg;
}, Materialized.with(new org.apache.kafka.common.serialization.Serdes.StringSerde(), new ImageHexAggregationSerde());

相关内容

  • 没有找到相关文章

最新更新