是否可以重新键入全球KTable



我想重新键入 GlobalKTable(可能在初始化它时,因为我相信它们只在创建一次时被读取(。

这可能吗?

我有两个主题,我正在Spring/Java Kafka Streams应用程序中工作。第一个没有压实,第二个是。两者都使用 Avro 作为其键和值。

该应用程序流式传输来自第一个(非压缩(主题的记录,并通过 KStream#leftJoin 附加来自压缩主题的其他数据。压缩的主题已作为 GlobalKTable 引入应用程序,通过 StreamsBuilder#globalTable() 创建,并且需要保持这种方式(我需要应用程序的每个实例中可用的主题所有分区的每条记录(。

我知道有人谈论支持非主键连接(https://issues.apache.org/jira/browse/KAFKA-3705(,但据我所知,我还不能这样做......

@Configuration
@EnableKafkaStreams
public class StreamsConfig {
  @Autowired
  private MyCustomSerdes serdes;
  @Bean
  public KStream<AvroKeyOne, AvroValueOne> reKeyJoin(StreamsBuilder streamsBuilder) {
    GlobalKTable<AvroKeyOne, AvroValueOne> globalTable = streamsBuilder.globalTable("topicOne", Consumed.with(
      serdes.getAvroKeyOne()
      serdes.getAvroValueOne()
    ));
    KStream<AvroKeyTwo, AvroValueOne> kStream = streamsBuilder.stream("topicTwo", Consumed.with(
      serdes.getAvroKeyTwo(),
      serdes.getAvroValueOne()
    ));
    kStream.join(
      globalTable,
       /**
        * the KeyValueMapper. I need to rekey the Global table as well to the
        * corresponding String (which it's data will have) if I want this join
        * to return results
        */
      (streamKey, streamValue) -> {return streamKey.getNewStringKey()},
      (/**ValueJoiner Deal**/)
    );
  }
}

我想重新键入 GlobalKTable(可能在初始化它时,因为我相信它们只有在创建一次时才会被读取(。

这可能吗?

今天对此没有直接支持。您已经提到了即将开展的工作,例如向全局表添加对非主键联接的支持,但这尚不可用。

您现在可以做什么:您可以将原始 Kafka 主题重新键入(重新分区(为新主题,然后将重新键入的主题读取到全局 KTable 中。也许这是您的一个选择。

相关内容

  • 没有找到相关文章

最新更新