Spring Kafka-Added Store无法从流进程访问



我在Spring Kafka中遇到了一个问题,即它无法从我将特定存储添加到拓扑/流中的过程事件中访问状态存储。

方法1:

@Component
@RequiredArgsConstructor
@EnableKafkaStreams
@Order(2)
public class TimelineVersionUpdatedStream implements EventStream {
private static final Logger logger =
LoggerFactory.getLogger(TimelineVersionUpdatedStream.class);
@Autowired
private StreamConfiguration configuration;
@Autowired
private TimeLineChangesCaptureService timeLineChangesCaptureService;
@Autowired
public void TimelineVersionUpdatedProccess(StreamsBuilder builder) {
final Serde<String> stringSerde = Serdes.String();
final SpecificAvroSerde<TimelineVersionUpdated> timelineVersionUpdatedSpecificAvroSerde = new SpecificAvroSerde<>();
timelineVersionUpdatedSpecificAvroSerde.configure(getSerdeConfig(), false);
final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
paymentChangedSpecificAvroSerde.configure(getSerdeConfig(), false);
KeyValueStoreBuilder paymentStoreBuilder = new KeyValueStoreBuilder(
Stores.persistentKeyValueStore("demo-store-2"),
stringSerde,
paymentChangedSpecificAvroSerde,
new SystemTime());

KStream<String, TimelineVersionUpdated> stream = builder.stream(
Topics.MOS_BUDGET_TIMELINE_VERSION,
Consumed.with(
stringSerde,
timelineVersionUpdatedSpecificAvroSerde
));
StreamsBuilder stateStore = builder.addStateStore(paymentStoreBuilder);
stream.process(new ProcessorSupplier<>() {
@Override
public Processor<String, TimelineVersionUpdated> get() {
return new Processor<>() {
private ProcessorContext context;
private KeyValueStore<String, PaymentChanged> store;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
this.store = context.getStateStore("demo-store-2");
}
@Override
public void process(String s, TimelineVersionUpdated timelineVersionUpdated) {
logger.info("TimelineVersionUpdatedStream.TimelineVersionUpdatedProccess record key {} value{}", s, timelineVersionUpdated.toString());
if (timelineVersionUpdated == null) {
return;
}
timeLineChangesCaptureService.captureTimeLineChanges(timelineVersionUpdated, store);
}
@Override
public void close() {
}
};
}
});

Topology topology = builder.build();
logger.info("{}", topology.describe().toString());
}

当我运行上面的代码时,我得到了以下异常:

org.apache.kafka.streams.errors.StreamsException: failed to initialize processor KSTREAM-PROCESSOR-0000000001
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:127) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:879) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:234) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:494) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:754) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:636) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:564) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:523) ~[kafka-streams-2.7.2.jar:na]
Caused by: org.apache.kafka.streams.errors.StreamsException: Processor KSTREAM-PROCESSOR-0000000001 has no access to StateStore demo-store-2 as the store is not connected to the processor. If you add stores manually via '.addStateStore()' make sure to connect the added store to the processor by providing the processor name to '.addStateStore()' or connect them via '.connectProcessorAndStateStores()'. DSL users need to provide the store name to '.process()', '.transform()', or '.transformValues()' to connect the store to the corresponding operator, or they can provide a StoreBuilder by implementing the stores() method on the Supplier itself. If you do not add stores manually, please file a bug report at https://issues.apache.org/jira/projects/KAFKA.
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:149) ~[kafka-streams-2.7.2.jar:na]
at au.com.mybudget.mos.mostimelinekafkaetl.transport.stream.TimelineVersionUpdatedStream$1$1.init(TimelineVersionUpdatedStream.java:92) ~[classes/:na]
at org.apache.kafka.streams.processor.internals.ProcessorAdapter.init(ProcessorAdapter.java:57) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$init$0(ProcessorNode.java:120) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:883) ~[kafka-streams-2.7.2.jar:na]
at org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:117) ~[kafka-streams-2.7.2.jar:na]
... 7 common frames omitted

然后我尝试添加一个商店,如下所示:方法2:

@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
final Serde<String> stringSerde = Serdes.String();
final SpecificAvroSerde<PaymentChanged> paymentChangedSpecificAvroSerde = new SpecificAvroSerde<>();
paymentChangedSpecificAvroSerde.configure(Collections.singletonMap(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()), false);
return factoryBean -> {
try {
final StreamsBuilder streamsBuilder = factoryBean.getObject();
streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("store-demo-3"),
stringSerde,
paymentChangedSpecificAvroSerde
));
} catch (Exception e) {
logger.error("StreamsBuilderFactoryBeanCustomizer exception:{}", e.getMessage());
}
};
}

然后尝试从进程访问该存储,但最终得到相同的异常。

请帮助理解这个问题。

Topology添加状态存储只是第一步,但这并不能使其可用:为了允许Processor使用状态存储,必须同时连接两者。

最简单的方法是在添加Processor:时传入状态存储名称

stream.process(..., "storeName");

最新更新