响应式程序在向Kafka发送所有消息之前提前退出



这是之前响应式kafka问题的后续问题(发送Flux数据到响应式kafka时的问题)

我正在尝试使用响应式方法向kafka发送一些日志记录。下面是使用响应式kafka发送消息的响应式代码。

public class LogProducer {
private final KafkaSender<String, String> sender;
public LogProducer(String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "log-producer");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
SenderOptions<String, String> senderOptions = SenderOptions.create(props);
sender = KafkaSender.create(senderOptions);
}
public void sendMessages(String topic, Flux<Logs.Data> records) throws InterruptedException {

AtomicInteger sentCount = new AtomicInteger(0);
sender.send(records
.map(record -> {
LogRecord lrec = record.getRecords().get(0);
String id = lrec.getId();
Thread.sleep(0, 5); // sleep for 5 ns
return SenderRecord.create(new ProducerRecord<>(topic, id,
lrec.toString()), id);
})).doOnNext(res -> sentCount.incrementAndGet()).then()
.doOnError(e -> {
log.error("[FAIL]: Send to the topic: '{}' failed. "
+ e, topic);
})
.doOnSuccess(s -> {
log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);
})
.subscribe();
}
}

public class ExecuteQuery implements Runnable {
private LogProducer producer = new LogProducer("localhost:9092");
@Override
public void run() {
Flux<Logs.Data> records = ...
producer.sendMessages(kafkaTopic, records);
.....
.....
// processing related to the messages sent
}
}

因此,即使Thread.sleep(0, 5);存在,有时它也不会将所有消息发送给kafka,并且程序存在的早期打印成功消息(log.info("[SUCCESS]: {} records sent to the topic: '{}'", sentCount, topic);)。有没有更具体的方法来解决这个问题?例如,使用某种回调,这样线程将等待所有消息成功发送。

我有一个spring控制台应用程序,并通过调度程序以固定速率运行ExecuteQuery,如下所示

public class Main {
private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
private ExecutorService executor = Executors.newFixedThreadPool(POOL_SIZE);
public static void main(String[] args) {
QueryScheduler scheduledQuery = new QueryScheduler();
scheduler.scheduleAtFixedRate(scheduledQuery, 0, 5, TimeUnit.MINUTES);
}
class QueryScheduler implements Runnable {
@Override
public void run() {
// preprocessing related to time
executor.execute(new ExecuteQuery());
// postprocessing related to time
}
}
}

你的Thread.sleep(0, 5); // sleep for 5 ns没有任何值主线程被阻塞,所以它退出时需要和你的ExecuteQuery可能还没有完成它的工作。

不清楚如何启动应用程序,但我建议在主线程中使用Thread.sleep()来阻塞。准确地说,在public static void main(String[] args) {方法中,impl.

最新更新