我正在尝试用以下方式实现kafka流的事件溯源模式。
我在一个安全服务和处理两个用例:
- 注册用户,处理
RegisterUserCommand
应该产生UserRegisteredEvent
- 修改用户名,处理
ChangeUserNameCommand
生成UserNameChangedEvent
我有两个主题:
- 命令主题,
'security-command'
。每个命令都有键,键是用户的电子邮件。例如:
foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
- 事件主题,
'security-event'
。每条记录都由用户的电子邮件进行键控:
foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex", "version":0}}
foo@bar.com:{"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
Kafka Streams version 2.8.0
Kafka version 2.8
实现思想可以用以下拓扑表示:
commandStream = builder.stream("security-command");
eventStream = builder.stream("security-event",
Consumed.with(
...,
new ZeroTimestampExtractor()
/*always returns 0 to get the latest version of snapshot*/));
// build the snapshot to get the current state of the user.
userSnapshots = eventStream.groupByKey()
.aggregate(() -> new UserSnapshot(),
(key /*email*/, event, currentSnapshot) -> currentSnapshot.apply(event));
// join commands with latest snapshot at the time of the join
commandWithSnapshotStream =
commandStream.leftJoin(
userSnapshots,
(command, snapshot) -> new CommandWithUserSnapshot(command, snapshot),
joinParams
);
// handle the command given the current snapshot
resultingEventStream = commandWithSnapshotStream.flatMap((key /*email*/, commandWithSnapshot) -> {
var newEvents = commandHandler(commandWithSnapshot.command(), commandWithSnapshot.snapshot());
return Arrays.stream(newEvents )
.map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
.toList();
});
// append events to events topic
resultingEventStream.to("security-event");
对于这个拓扑,我使用EOS exactly_once_beta
。
这个拓扑的更明确的版本:
KStream<String, Command<DomainEvent[]>> commandStream =
builder.stream(
commandTopic,
Consumed.with(Serdes.String(), new SecurityCommandSerde()));
KStream<String, DomainEvent> eventStream =
builder.stream(
eventTopic,
Consumed.with(
Serdes.String(),
new DomainEventSerde(),
new LatestRecordTimestampExtractor() /*always returns 0 to get the latest snapshot of the snapshot.*/));
// build the snapshots ktable by aggregating all the current events for a given user.
KTable<String, UserSnapshot> userSnapshots =
eventStream.groupByKey()
.aggregate(
() -> new UserSnapshot(),
(email, event, currentSnapshot) -> currentSnapshot.apply(event),
Materialized.with(
Serdes.String(),
new UserSnapshotSerde()));
// join command stream and snapshot table to get the stream of pairs <Command, UserSnapshot>
Joined<String, Command<DomainEvent[]>, UserSnapshot> commandWithSnapshotJoinParams =
Joined.with(
Serdes.String(),
new SecurityCommandSerde(),
new UserSnapshotSerde()
);
KStream<String, CommandWithUserSnapshot> commandWithSnapshotStream =
commandStream.leftJoin(
userSnapshots,
(command, snapshot) -> new CommandWithUserSnapshot(command, snapshot),
commandWithSnapshotJoinParams
);
var resultingEventStream = commandWithSnapshotStream.flatMap((key /*email*/, commandWithSnapshot) -> {
var command = commandWithSnapshot.command();
if (command instanceof RegisterUserCommand registerUserCommand) {
var handler = new RegisterUserCommandHandler();
var events = handler.handle(registerUserCommand);
// multiple events might be produced when a command is handled.
return Arrays.stream(events)
.map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
.toList();
}
if (command instanceof ChangeUserNameCommand changeUserNameCommand) {
var handler = new ChangeUserNameCommandHandler();
var events = handler.handle(changeUserNameCommand, commandWithSnapshot.userSnapshot());
return Arrays.stream(events)
.map(e -> new KeyValue<String, DomainEvent>(e.email(), e))
.toList();
}
throw new IllegalArgumentException("...");
});
resultingEventStream.to(eventTopic, Produced.with(Serdes.String(), new DomainEventSerde()));
我遇到的问题:
- 在已有记录的命令主题上启动流应用程序:
foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
Outcome:
1. Stream application fails when processing the ChangeUserNameCommand, because the snapshot is null.
2. The events topic has a record for successful registration, but nothing for changing the name:
/*OK*/foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex", "version":0}}
Thoughts:
When processing the ChangeUserNameCommand, the snapshot is missing in the aggregated KTable, userSnapshots. Restarting the application succesfully produces the following record:
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
Tried increasing the max.task.idle.ms to 4 seconds - no effect.
- 启动流应用程序并一次生成一组ChangeUserNameCommand命令(快速)
Producing:
// Produce to command topic
foo@bar.com:{"type": "RegisterUserCommand", "command": {"name":"Alex","email":"foo@bar.com"}}
// event topic outcome
/*OK*/ foo@bar.com:{"type":"UserRegisteredEvent","event":{"email":"foo@bar.com","name":"Alex", "version":0}}
// Produce at once to command topic
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex1"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex2"}}
foo@bar.com:{"type": "ChangeUserNameCommand", "command": {"email":"foo@bar.com","newName":"Alex3"}}
// event topic outcome
/*OK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
/*NOK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex2","version":1}}
/*NOK*/foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex3","version":1}}
Thoughts:
'ChangeUserNameCommand' commands are joined with a stale version of snapshot (pay attention to the version attribute).
The expected outcome would be:
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex1","version":1}}
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex2","version":2}}
foo@bar.com: {"type":"UserNameChangedEvent","event":{"email":"foo@bar.com","name":"Alex3","version":3}}
Tried increasing the max.task.idle.ms to 4 seconds - no effect, setting the cache_max_bytes_buffering to 0 has no effect.
我在构建这样的拓扑时遗漏了什么?我希望在最新版本的快照上处理每个命令。如果我生成的命令之间有几秒钟的延迟,那么一切都如预期的那样工作。
我想你错过了表的更改日志恢复部分。请阅读本文,了解更改日志恢复会发生什么。
对于表,它更复杂,因为它们必须维护额外的信息——它们的状态——允许有状态处理,比如连接以及像COUNT()或SUM()这样的聚合。要做到这一点,同时也确保高处理性能,表(通过它们的状态)存储)在Kafka流的本地磁盘上具体化应用实例或sqldb服务器。但是机器和容器可能会丢失,同时丢失的还有本地存储的数据。我们怎样才能表也能容错吗?
答案是存储在表中的任何数据也被远程存储在卡夫卡。为此,每个表都有自己的更改流我们可以说,内置的变更数据捕获(CDC)设置。如果我们有一个按客户的账户余额表,每次一个账户余额表更新后,相应的变更事件将被记录到修改表的流
还要记住,重新启动Kafka流应用程序不应该处理先前处理过的事件。为此,您需要在处理消息后提交消息的偏移量。
找到了根本原因。不确定这是设计还是错误,但是流任务在每个处理周期中只等待其他分区中的数据一次。因此,如果首先读取命令主题中的2条记录,则流任务将等待max.task.idle.ms
,允许在处理第一个命令记录时发生poll()阶段。处理完后,在处理第二个命令期间,流任务将不允许轮询获得由第一个命令处理产生的新生成的事件。
IsProcessable()
在处理阶段开始时调用。如果返回false,这将导致重复轮询阶段。
public boolean isProcessable(final long wallClockTime) {
if (state() == State.CLOSED) {
return false;
}
if (hasPendingTxCommit) {
return false;
}
if (partitionGroup.allPartitionsBuffered()) {
idleStartTimeMs = RecordQueue.UNKNOWN;
return true;
} else if (partitionGroup.numBuffered() > 0) {
if (idleStartTimeMs == RecordQueue.UNKNOWN) {
idleStartTimeMs = wallClockTime;
}
if (wallClockTime - idleStartTimeMs >= maxTaskIdleMs) {
return true;
// idleStartTimeMs is not reset to default, RecordQueue.UNKNOWN, value,
// therefore the next time when the check for all buffered partitions is done, `true` is returned, meaning that the task is ready to be processed.
} else {
return false;
}
} else {
// there's no data in any of the topics; we should reset the enforced
// processing timer
idleStartTimeMs = RecordQueue.UNKNOWN;
return false;
}
}