假设我有2个kafka流(kafka-streams-scala库,版本2.2.0(:
val builder: StreamsBuilder = new StreamsBuilder
val stream1: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic1")
val stream2: KStream[String, GenericRecord] = builder.stream[String, GenericRecord]("topic2")
和他们的加入:
val stream3: KStream[String, MyClass] = flights.join(schedules)((r1, r2) => MyClass(r1.get("f1"), r2.get("f2")), JoinWindows.of(Duration.ofSeconds(30))
KSQL中的子句可用的等效内容是什么?(请参阅late_orders流(有关流API?只使用stream3.Filter是个好主意吗?这种方法的效率是否与KSQL创建的流相同的效率?
KSQL中的子句可用的等效内容是什么?(请参阅late_orders stream(有关流的API?
是:
-
KStream#filter()
,返回过滤的KStream
-
KTable#filter()
,返回过滤的KTable
https://kafka.apache.org/documentation/streams/developer-guide/dsl-api.html#stateless-transformations
只使用stream3.filter?
是个好主意吗?
是。
这种方法的效率是否与KSQL创建的流相同的效率?
是。