我通过Spring DataFlow与https://dataflow.spring.io/rabbitmq-maven-latest预定义的rabbitmq应用程序创建了非常简单的流。我的流是
file --mode=content --directory=/home/cnb/documents | script | log
问题是当我部署流时,文件源在脚本处理器应用程序启动之前读取文件。所以我有一个错误,文件源没有订阅者来接收解析后的文件有效载荷-日志如下:
2023-01-16 09:27:55.425 INFO [file-source,,] 333 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Creating binder: rabbit
2023-01-16 09:27:55.614 ERROR [file-source,88b4a7f13a17dd71,86aacf31285b41dd] 333 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'fileReadingFlow.channel#2'; nested exception is java.lang.IllegalStateException: The [bean 'fileReadingFlow.channel#2'; defined in: 'org.springframework.cloud.fn.supplier.file.FileSupplierConfiguration'; from source: 'bean method fileReadingFlow'] doesn't have subscribers to accept messages, failedMessage=GenericMessage [payload=byte[33511], headers={b3=88b4a7f13a17dd71-f5dbcbe1db7e1bff-0, nativeHeaders={}, file_name=test1.csv, file_originalFile=/home/cnb/documents/test1.csv, id=10dc5789-f59d-d3c3-7c11-4e46f761cff6, contentType=application/octet-stream, file_relativePath=test1.csv, timestamp=1673861275578}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:166)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:339)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:325)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:268)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:232)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:325)
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:268)
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:232)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:37)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:296)
at org.springframework.integration.endpoint.ReactiveStreamsConsumer$SubscriberDecorator.hookOnNext(ReactiveStreamsConsumer.java:277)
at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
at reactor.core.publisher.FluxRefCount$RefCountInner.onNext(FluxRefCount.java:200)
at reactor.core.publisher.FluxPublish$PublishSubscriber.drain(FluxPublish.java:477)
at reactor.core.publisher.FluxPublish$PublishSubscriber.onNext(FluxPublish.java:268)
at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:130)
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:491)
at reactor.core.publisher.EmitterProcessor.tryEmitNext(EmitterProcessor.java:299)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at org.springframework.integration.channel.FluxMessageChannel.tryEmitMessage(FluxMessageChannel.java:82)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:71)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$5(FluxMessageChannel.java:123)
at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:196)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth.lambda$null$6(ReactorSleuth.java:324)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.lang.IllegalStateException: The [bean 'fileReadingFlow.channel#2'; defined in: 'org.springframework.cloud.fn.supplier.file.FileSupplierConfiguration'; from source: 'bean method fileReadingFlow'] doesn't have subscribers to accept messages
at org.springframework.util.Assert.state(Assert.java:97)
at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
... 55 more
当我把文件'test1.csv'放在流部署后一切都好。但是,当我部署流和'test1.csv'已经在目录-我看到上面的错误,由于启动流组件的竞争。在Dataflow中是否有方法:
设置流组件的启动顺序?(例如,我想按顺序启动我的流组件1 -脚本,2 -文件,3 -日志)
也许我可以配置一些启动超时文件源通过数据流UI?因此,通过这种方式,我将实现当文件源开始读取文件时,此时流处理器将准备好处理该文件。
提前感谢,如果需要,请让我知道我是否可以提供更多的细节!
注:-如果我在目录中有多个文件,并且已经通过我的流处理了它们。当我重新启动数据流服务器时,这些文件会被再次处理吗?提前感谢!
我能想到的解决这个问题的唯一方法可能是使用命名主题。
file --mode=content --directory=/home/cnb/documents > :file-topic
:file-topic > script | log