Kafka 流处理器 API 批处理大小和时间



尝试使用 kafka 流处理器 API 批处理记录。批处理基于大小和时间。假设如果批处理大小达到 10 或最后一个批处理在 10 秒前处理(大小或上次处理时间以先到者为准),然后调用外部 API 发送批处理并使用 ProcessingContext 提交。

使用punctuate定期检查是否可以清除批处理并将其发送到外部系统。

问题 - 执行标点符号线程时,流 API 是否可以调用处理器 API process方法?由于代码在标点线程中调用提交,context.commit()提交尚未通过进程方法处理的记录吗?

是否有可能在不同的线程

中同时执行标点线程和进程方法?如果是这样,那么我有代码提交尚未处理的记录

public class TestProcessor extends AbstractProcessor<String, String> {
    private ProcessorContext context;
    private List<String> batchList = new LinkedList<>();
    private AtomicLong lastProcessedTime = new AtomicLong(System.currentTimeMillis());
    private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
    @Override
    public void init(ProcessorContext context) {
        LOG.info("Calling init method " + context.taskId());
        this.context = context;
        context.schedule(10000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            if(batchList.size() > 0 && System.currentTimeMillis() - lastProcessedTime.get() >
                    10000){
                //call external API
                batchList.clear();
                lastProcessedTime.set(System.currentTimeMillis());
            }
            context.commit();
        });
    }
    @Override
    public void process(String key, String value) {
        batchList.add(value);
        LOG.info("Context details " + context.taskId() + " " + context.partition() + " " +
                "storeSize " + batchList.size());
        if(batchList.size() == 10){
            //call external API to send the batch
            batchList.clear();
            lastProcessedTime.set(System.currentTimeMillis());
        }
        context.commit();
    }
    @Override
    public void close() {
        if(batchList.size() > 0){
            //call external API to send the left over records
            batchList.clear();
        }
    }
}

流 API 是否可以在以下情况下调用处理器 API process方法 正在执行punctuate线程?

不,这是不可能的,因为Processor在单个线程(用于两种方法的同一线程)中执行processpunctuate方法。

是否有

可能标点符号线程和进程方法 在不同的线程中同时执行?

响应是"这是不可能的",上面提供了说明。

考虑到每个主题分区都有自己的类TestProcessor实例。 而不是局部变量 batchListlastProcessedTime 我建议使用 Kafka 状态存储,如 KeyValueStore ,这样你的流将是容错的。

相关内容

  • 没有找到相关文章

最新更新