我正在使用DSL api,我有一个用例,我需要检查一个条件,然后如果为真,从快乐路径发送一个额外的消息到一个单独的主题。我的问题是,如何在DSL api中将子处理器附加到父处理器?它是否像缓存流变量并在随后的两个地方使用它并命名这些流处理器一样简单?这里有一些简短的代码,解释了我要做的事情。我正在使用DSL api,因为我需要使用foreignKeyJoin。
var myStream = stream.process(myProcessorSupplier); //3.3 returns a stream
stream.to("happyThingTopic"); Q: will the forward ever land here?
stream.map( myKvMapper, new Named("what-is-this")).to("myOtherTopic"); //will the forward land here?
public KeyValue<String, Object> process(Object key, Object value){
if (value.hasFlag){
processorContext.forward(key, new OtherThing(), "what-is-this?");
}
return new KeyValue(key, HappyThing(value));
}
我创建了一些示例代码和单元测试来证明这个概念。可以为要向下转发的消息选择特定的逻辑路径。但是有一个问题。
通过在transform方法中返回a值,kafka streams仍然将原始消息发送给两个子流处理器,无论它们是否命名。您可以通过以下方式防止这种情况发生:
-
命名所有子处理器
使用processorContext 。在变压器中转发以向下游发送所有消息到各种命名的处理器
和在转换
中返回nullpackage com.events.streams; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Named; import org.apache.kafka.streams.kstream.Transformer; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.To; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Profile; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.stereotype.Component; @Profile("context-forward-poc") @Component @EnableKafka @ConditionalOnProperty( name = "kafka.enabled", havingValue = "true", matchIfMissing = true) public class KafkaStreamsContextForwardPOCConfig { public final static String childProcessorName = "forward-child"; public final static String INPUT_TOPIC = "context.input.topic"; public final static String OUTPUT_TOPIC = "context.output.topic"; public final static String OUTPUT_FORWARD_TOPIC = "context.forward.output.topic"; private final StreamsBuilder builder; public KafkaStreamsContextForwardPOCConfig(@Autowired @Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilder builder){ this.builder = builder; initTopology(); } private void initTopology( ) { KStream<String, String> stream = builder .stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String())) .transform(PocTransformer::new); stream.to(OUTPUT_TOPIC); stream .map((k,v)-> new KeyValue<>(k, v), Named.as(childProcessorName)) .to(OUTPUT_FORWARD_TOPIC); } public static class PocTransformer implements Transformer<String, String, KeyValue<String, String>>{ private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public KeyValue<String, String> transform(String key, String value) { context.forward(key, "here-be-a-forwarded-message", To.child(childProcessorName)); return new KeyValue<>(key, value); } @Override public void close() { } } } Unit Test package com.events.streams; import static org.junit.jupiter.api.Assertions.assertEquals; import com.csx.events.MainApplication; import java.time.Instant; import java.util.Properties; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; import org.apache.kafka.streams.TopologyTestDriver; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration; import org.springframework.test.context.ActiveProfiles; @SpringBootTest(classes = {MainApplication.class, KafkaStreamsContextForwardTest.class, KStreamsBeans.class}) @ActiveProfiles("context-forward-poc") @TestInstance(Lifecycle.PER_CLASS) public class KafkaStreamsContextForwardTest { private final Logger log = LoggerFactory.getLogger(KStreamsE2EForeignKeyJoinIT.class); @Autowired KafkaStreamsContextForwardPOCConfig kafkaStreamsConfig; @Autowired @Qualifier(KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_BUILDER_BEAN_NAME) StreamsBuilder builder; private TopologyTestDriver testDriver; private TestInputTopic<String, String> testInputTopic; private TestOutputTopic<String, String> outputTopic; private TestOutputTopic<String, String> outputContextForwardTopic; @BeforeAll public void setup() { Properties props = new Properties(); props.put(StreamsConfig.STATE_DIR_CONFIG, "./target/kstreamsstate/KStreamChildProcessorTest"); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "KStreamChildProcessorTest"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "KStreamChildProcessorTest:3212"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); //the kafkaStreamsConfig class has already been loaded by the spring context so the topology exists here testDriver = new TopologyTestDriver(builder.build(), props); testInputTopic = testDriver.createInputTopic( KafkaStreamsContextForwardPOCConfig.INPUT_TOPIC, new StringSerializer(), new StringSerializer() ); outputTopic = testDriver.createOutputTopic( KafkaStreamsContextForwardPOCConfig.OUTPUT_TOPIC, new StringDeserializer(), new StringDeserializer() ); outputContextForwardTopic = testDriver.createOutputTopic( KafkaStreamsContextForwardPOCConfig.OUTPUT_FORWARD_TOPIC, new StringDeserializer(), new StringDeserializer() ); } @Test void shouldReceiveForwardedMessage(){ testInputTopic.pipeInput( "key1", "value1", Instant.now() ); final KeyValue<String, String> outputTopicMessage = outputTopic.readKeyValue(); final KeyValue<String, String> outputTopicMessageFromContextForward = outputContextForwardTopic.readKeyValue(); assertEquals(outputTopicMessage.key, "key1"); assertEquals(outputTopicMessage.value, "value1"); assertEquals(outputTopicMessageFromContextForward.key, "key1"); assertEquals(outputTopicMessageFromContextForward.value, "here-be-a-forwarded-message"); //!!! By returning the happy path message in the transform method, // kafka streams still sends the original message to both child stream processors regardless if they are named or not. // You can prevent this from happening by: // * naming all child processors // * using processorContext.forward in the transformer to send ALL message downstream to various named processors // * and returning null in the transform // * Or you can do it on the backend and put a named filter processor on all your downstream child processors to weed out the ones you don't want // * I dislike the latter option because it's smelly. final KeyValue<String, String> outputTopicHappyPathMessageFromContextForward = outputContextForwardTopic.readKeyValue(); assertEquals(outputTopicHappyPathMessageFromContextForward.key, "key1"); assertEquals(outputTopicHappyPathMessageFromContextForward.value, "value1"); } }