我们正在评估Kinesis,我发现了以下行为。我使用 Kinesis 进行了简单的测试,以测试准确性和基本功能。
该测试将项生成到流中,如下所示:
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName( streamName );
putRecordRequest.setData(ByteBuffer.wrap(event.getBytes()));
putRecordRequest.setPartitionKey( message.getEventList().getEvents().get(0).getLicenseKey());
UsageServiceStatistics.instance().getKinesisSent().increase();
PutRecordResult putRecordResult = kinesisManager.getConnection().putRecord( putRecordRequest );
然后,我使用 Amazon Kinesis 客户端库 (KCL(,如下所示:
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer iRecordProcessorCheckpointer)
{
logger.debug("Received a list of records for processing with size:" + records.size());
for (Record record : records)
{
UsageServiceStatistics.instance().getKinesisConsumed().increase();
logger.debug("Kinesis consumed:" + UsageServiceStatistics.instance().getKinesisConsumed());
if (!processRecord(record))
{
logger.error("Couldn't process record " + record + ". Skipping the record.");
}
}
checkpointManager.checkpoint(iRecordProcessorCheckpointer);
}
我看到生产的数量与消费记录的数量之间存在差异。例如,当连续 2000 次发送 3 个项目的系列时,我看到以下内容:
Kinesis sent:counter=2000
Kinesis consumed:1999
Kinesis sent:counter=4000
Kinesis consumed:counter=3994
Kinesis sent:counter=6000
Kinesis consumed:counter=5999
为什么我没有看到完全相同的生产和消耗的数量?为什么在第二次运行后缺少 6 个项目,尽管我在运行 2 和运行 3 之间至少等待了 2 分钟,但我仅在运行 2006 中获得了
消耗的记录 3.最后,我在这个测试之前做了一组测试,检查点的频率更高,然后差异更大?Amazon KCL 使用什么规则来触发向同号发送记录?为什么它会停止发送并将项目保留在队列中(例如从运行 2 到 3(?发送的 6000 件中的最后一件在哪里?
提前感谢
我找到了根本原因。
这是我代码中的一个错误。
KCL 创建与特定流中的分片数相等的记录处理器数。
但是,我引入了一个错误,让他们在多线程环境中使用相同的Checkpointer 实体。当我修复它以使每个记录处理器都有自己的检查指针时,它运行良好并且计数一致。