我想使用一些回调 API 为我的 rabbitmq 生产者找出失败的消息。我已经用 [{rabbit, [{vm_memory_high_watermark, 0.001}]}] 配置了 rabbitmq。并尝试推送很多消息,但所有消息都被接受并且 TimeoutException 稍后出现,消息没有发送到队列 enter code here
,请告诉我如何捕获它。
发送消息的代码:
// #create-sink - producer
final Sink<ByteString, CompletionStage<Done>> amqpSink =
AmqpSink.createSimple(
AmqpSinkSettings.create(connectionProvider)
.withRoutingKey(AkkaConstants.queueName)
.withDeclaration(queueDeclaration));
// #run-sink
//final List<String> input = Arrays.asList("one", "two", "three", "four", "five");
//Source.from(input).map(ByteString::fromString).runWith(amqpSink, materializer);
String filePath = "D:\subrata\code\akkaAmqpTest-master\akkaAmqpTest-master\logs2\dummy.txt";
Path path = Paths.get(filePath);
// List containing 78198 individual message
List<String> contents = Files.readAllLines(path);
System.out.println("********** file reading done ....");
int times = 5;
// Send 78198*times message to Queue [From console i can see 400000 number of messages being sent]
for(int i=0;i<times;i++) {
Source.from(contents).map(ByteString::fromString).runWith(amqpSink, materializer);
}
System.out.println("************* sending to queue is done");
不幸的是,
目前不支持开箱即用。理想情况下,生产者将被建模为一个Flow
,它将所有传入的消息发送到 AMQP 代理,并将发出相同的消息,结果是它是否已成功发送到代理。有一张票可以跟踪Alpakka问题跟踪器上的这种可能的改进。