尝试使用 kafka 流处理器 API 批处理记录。批处理基于大小和时间。假设如果批处理大小达到 10 或最后一个批处理在 10 秒前处理(大小或上次处理时间以先到者为准),然后调用外部 API 发送批处理并使用 ProcessingContext 提交。
使用punctuate
定期检查是否可以清除批处理并将其发送到外部系统。
问题 - 执行标点符号线程时,流 API 是否可以调用处理器 API process
方法?由于代码在标点线程中调用提交,context.commit()
提交尚未通过进程方法处理的记录吗?
中同时执行标点线程和进程方法?如果是这样,那么我有代码提交尚未处理的记录
public class TestProcessor extends AbstractProcessor<String, String> {
private ProcessorContext context;
private List<String> batchList = new LinkedList<>();
private AtomicLong lastProcessedTime = new AtomicLong(System.currentTimeMillis());
private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
@Override
public void init(ProcessorContext context) {
LOG.info("Calling init method " + context.taskId());
this.context = context;
context.schedule(10000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if(batchList.size() > 0 && System.currentTimeMillis() - lastProcessedTime.get() >
10000){
//call external API
batchList.clear();
lastProcessedTime.set(System.currentTimeMillis());
}
context.commit();
});
}
@Override
public void process(String key, String value) {
batchList.add(value);
LOG.info("Context details " + context.taskId() + " " + context.partition() + " " +
"storeSize " + batchList.size());
if(batchList.size() == 10){
//call external API to send the batch
batchList.clear();
lastProcessedTime.set(System.currentTimeMillis());
}
context.commit();
}
@Override
public void close() {
if(batchList.size() > 0){
//call external API to send the left over records
batchList.clear();
}
}
}
流 API 是否可以在以下情况下调用处理器 API
process
方法 正在执行punctuate
线程?
不,这是不可能的,因为Processor
在单个线程(用于两种方法的同一线程)中执行process
和punctuate
方法。
是否有可能标点符号线程和进程方法 在不同的线程中同时执行?
响应是"这是不可能的",上面提供了说明。
考虑到每个主题分区都有自己的类TestProcessor
实例。 而不是局部变量 batchList
和 lastProcessedTime
我建议使用 Kafka 状态存储,如 KeyValueStore
,这样你的流将是容错的。