什么是Java/scala kafka流相等的ksql join where子句



假设我有2个kafka流(kafka-streams-scala库,版本2.2.0(:

val builder: StreamsBuilder = new StreamsBuilder
val stream1: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic1")
val stream2: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic2")

和他们的加入:

val stream3: KStream[String, MyClass] = flights.join(schedules)((r1, r2) =>  MyClass(r1.get("f1"), r2.get("f2")), JoinWindows.of(Duration.ofSeconds(30))

KSQL中的子句可用的等效内容是什么?(请参阅late_orders流(有关流API?只使用stream3.Filter是个好主意吗?这种方法的效率是否与KSQL创建的流相同的效率?

KSQL中的子句可用的等效内容是什么?(请参阅late_orders stream(有关流的API?

是:

  • KStream#filter(),返回过滤的KStream
  • KTable#filter(),返回过滤的KTable

https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#stateless-transformations

只使用stream3.filter?

是个好主意吗?

是。

这种方法的效率是否与KSQL创建的流相同的效率?

是。

相关内容

  • 没有找到相关文章

最新更新