我有一个使用 Kafka Streams 中的 KTable 的单实例 Java 应用程序。直到最近,我可以使用KTable检索所有数据,突然间某些消息似乎消失了。那里应该有 ~33k 条带有唯一键的消息。
当我想按键检索消息时,我没有得到一些消息。我使用 ReadOnlyKeyValueStore 来检索消息:
final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore());
store.get(key);
这些是我为 KafkaStreams 设置的配置设置。
final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
卡夫卡:0.10.2.0-cp1
汇合:3.2.0
调查使我得出了一些非常令人担忧的见解。使用 REST 代理,我手动读取分区,发现某些偏移返回错误。
请求:/topics/{topic}/partitions/{partition}/messages?offset={offset}
{
"error_code": 50002,
"message": "Kafka error: Fetch response contains an error code: 1"
}
没有客户端,无论是java还是命令行,但是返回任何错误。他们只是跳过
我有两个代理,所有主题的复制因子均为 2,并且是完全复制的。两个代理分别返回相同的内容。重新启动代理没有区别。
- 可能是什么原因?
- 如何在客户端中检测这种情况?
默认情况下,Kafka Broker 配置键cleanup.policy
设置为delete
。将其设置为compact
以保留每个键的最新消息。请参阅压缩。
删除旧邮件不会更改最小偏移量,因此尝试检索其下方的邮件会导致错误。错误非常模糊。Kafka Streams 客户端将从最小偏移量开始读取消息,因此不会出现错误。唯一可见的影响是 KTable 中缺少数据。
当应用程序由于缓存而运行时,即使从 Kafka 本身中删除消息,所有数据可能仍然可用。清理后它们会消失。