给定:DSL拓扑与KStream::transform。作为 Transformer::transform 执行的一部分,从输入消息生成多条消息(可能是来自单个输入消息的数千条输出消息(。
根据从数据库中检索的数据生成新消息。为了加快该过程,我想创建多个用户线程来并行访问数据库中的数据。生成新消息后,线程将调用 ProcessContext::forward 将消息发送到下游。
从不同的线程调用 ProcessContext::forward 是否安全?
它不安全,不允许从不同的线程调用ProcessorContext#forward()
。如果您尝试这样做,将引发异常。
作为一种解决方法,您可以让所有线程"缓冲"其结果数据,并在下次调用process()
时收集所有数据。或者,您还可以计划一个标点符号来收集和转发来自不同线程的数据。