我的要求是跳过或避免使用kafka流DSL API从INPUT主题接收的重复消息(具有相同的键)。
如果发生任何故障,源系统可能会向 INPUT 主题发送重复的消息。
流-
源系统 --> 输入主题 --> 卡夫卡流 --> 输出主题
目前我正在使用 flatMap 从有效负载中生成多个键,但 flatMap 是无状态的,因此无法避免从 INPUT 主题接收重复的消息处理。
我正在寻找DSL API,它可以跳过从输入主题接收的重复记录,并在发送到输出主题之前生成多个键/值。
Thought Exact Once配置在这里对于根据键删除从INPUT主题收到的重复消息很有用,但看起来它不起作用,可能我不明白Exact Once的用法。
你能不能给它一些说明。
我的要求是跳过或避免使用 kafka 流 DSL API 从 INPUT 主题接收的重复消息(具有相同的键)。
看看 https://github.com/confluentinc/kafka-streams-examples 的EventDeduplication
例子,它做到了这一点。然后,您可以使用特定于您的用例的所需flatMap
功能来调整示例。
下面是示例的要点:
final KStream<byte[], String> input = builder.stream(inputTopic);
final KStream<byte[], String> deduplicated = input.transform(
// In this example, we assume that the record value as-is represents a unique event ID by
// which we can perform de-duplication. If your records are different, adapt the extractor
// function as needed.
() -> new DeduplicationTransformer<>(windowSize.toMillis(), (key, value) -> value),
storeName);
deduplicated.to(outputTopic);
和
/**
* @param maintainDurationPerEventInMs how long to "remember" a known event (or rather, an event
* ID), during the time of which any incoming duplicates of
* the event will be dropped, thereby de-duplicating the
* input.
* @param idExtractor extracts a unique identifier from a record by which we de-duplicate input
* records; if it returns null, the record will not be considered for
* de-duping but forwarded as-is.
*/
DeduplicationTransformer(final long maintainDurationPerEventInMs, final KeyValueMapper<K, V, E> idExtractor) {
if (maintainDurationPerEventInMs < 1) {
throw new IllegalArgumentException("maintain duration per event must be >= 1");
}
leftDurationMs = maintainDurationPerEventInMs / 2;
rightDurationMs = maintainDurationPerEventInMs - leftDurationMs;
this.idExtractor = idExtractor;
}
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.context = context;
eventIdStore = (WindowStore<E, Long>) context.getStateStore(storeName);
}
public KeyValue<K, V> transform(final K key, final V value) {
final E eventId = idExtractor.apply(key, value);
if (eventId == null) {
return KeyValue.pair(key, value);
} else {
final KeyValue<K, V> output;
if (isDuplicate(eventId)) {
output = null;
updateTimestampOfExistingEventToPreventExpiry(eventId, context.timestamp());
} else {
output = KeyValue.pair(key, value);
rememberNewEvent(eventId, context.timestamp());
}
return output;
}
}
private boolean isDuplicate(final E eventId) {
final long eventTime = context.timestamp();
final WindowStoreIterator<Long> timeIterator = eventIdStore.fetch(
eventId,
eventTime - leftDurationMs,
eventTime + rightDurationMs);
final boolean isDuplicate = timeIterator.hasNext();
timeIterator.close();
return isDuplicate;
}
private void updateTimestampOfExistingEventToPreventExpiry(final E eventId, final long newTimestamp) {
eventIdStore.put(eventId, newTimestamp, newTimestamp);
}
private void rememberNewEvent(final E eventId, final long timestamp) {
eventIdStore.put(eventId, timestamp, timestamp);
}
@Override
public void close() {
// Note: The store should NOT be closed manually here via `eventIdStore.close()`!
// The Kafka Streams API will automatically close stores when necessary.
}
}
我正在寻找DSL API,它可以跳过从输入主题接收的重复记录,并在发送到输出主题之前生成多个键/值。
DSL 不包含开箱即用的此类功能,但上面的示例显示了如何使用Transformers
将 DSL 与 Kafka Streams 的处理器 API 相结合,轻松构建自己的重复数据消除逻辑。
Thought Exact Once配置在这里对于根据键删除从INPUT主题收到的重复消息很有用,但看起来它不起作用,可能我不明白Exact Once的用法。
正如Matthias J. Sax在他的回答中提到的,从Kafka的角度来看,从其精确一次处理语义的角度来看,这些"重复"并不是重复。Kafka 确保它自己不会引入任何此类重复项,但它不能为上游数据源做出开箱即用的决策,这对 Kafka 来说是黑盒。
它只能通过DSL实现,使用SessionWindows更改日志而不缓存。
- 用
duplicate
标志包装值 - 在时间窗口内将标志转入
reduce()
true
- 筛选出
true
标志值 - 解开原始键和值的包装
拓扑学:
Serde<K> keySerde = ...;
Serde<V> valueSerde = ...;
Duration dedupWindowSize = ...;
Duration gracePeriod = ...;
DedupValueSerde<V> dedupValueSerde = new DedupValueSerde<>(valueSerde);
new StreamsBuilder()
.stream("input-topic", Consumed.with(keySerde, valueSerde))
.mapValues(v -> new DedupValue<>(v, false))
.groupByKey()
.windowedBy(SessionWindows.ofInactivityGapAndGrace(dedupWindowSize, gracePeriod))
.reduce(
(value1, value2) -> new DedupValue<>(value1.value(), true),
Materialized
.<K, DedupValue<V>, SessionStore<Bytes, byte[]>>with(keySerde, dedupValueSerde)
.withCachingDisabled()
)
.toStream()
.filterNot((wk, dv) -> dv == null || dv.duplicate())
.selectKey((wk, dv) -> wk.key())
.mapValues(DedupValue::value)
.to("output-topic", Produced.with(keySerde, valueSerde));
值包装器:
record DedupValue<V>(V value, boolean duplicate) { }
值包装器 SerDe(示例):
public class DedupValueSerde<V> extends WrapperSerde<DedupValue<V>> {
public DedupValueSerde(Serde<V> vSerde) {
super(new DvSerializer<>(vSerde.serializer()), new DvDeserializer<>(vSerde.deserializer()));
}
private record DvSerializer<V>(Serializer<V> vSerializer) implements Serializer<DedupValue<V>> {
@Override
public byte[] serialize(String topic, DedupValue<V> data) {
byte[] vBytes = vSerializer.serialize(topic, data.value());
return ByteBuffer
.allocate(vBytes.length + 1)
.put(data.duplicate() ? (byte) 1 : (byte) 0)
.put(vBytes)
.array();
}
}
private record DvDeserializer<V>(Deserializer<V> vDeserializer) implements Deserializer<DedupValue<V>> {
@Override
public DedupValue<V> deserialize(String topic, byte[] data) {
ByteBuffer buffer = ByteBuffer.wrap(data);
boolean duplicate = buffer.get() == (byte) 1;
int remainingSize = buffer.remaining();
byte[] vBytes = new byte[remainingSize];
buffer.get(vBytes);
V value = vDeserializer.deserialize(topic, vBytes);
return new DedupValue<>(value, duplicate);
}
}
}
恰好一次可用于确保使用和处理输入主题不会导致输出主题重复。但是,从恰好一次的角度来看,您描述的输入主题中的重复项并不是真正的重复项,而是两条常规输入消息。
对于删除输入主题重复项,可以将transform()
步骤与附加的状态存储一起使用(DSL 中没有执行所需操作的内置运算符)。对于每个输入记录,首先检查是否在存储中找到相应的键。如果没有,请将其添加到存储区并转发邮件。如果在存储中找到它,则将输入作为重复项删除。请注意,只有在 Kafka Streams 应用程序中启用恰好一次处理时,这才适用于 100% 正确性保证。否则,即使您尝试执行重复数据删除,Kafka Streams 也可能在发生故障时重新引入重复。
此外,您需要确定要在商店中保留条目多长时间。如果您确定输入主题中不能再有重复项,则可以使用Punctuation
从存储中删除旧数据。执行此操作的一种方法是将记录时间戳(或偏移量)也存储在存储中。这样,您可以将当前时间与punctuate()
内的存储记录时间进行比较并删除旧记录(即,您将通过store#all()
迭代存储中的所有条目)。
transform()
后,您可以应用flatMap()
(或者也可以将flatMap()
代码直接合并到transform()
中。