包含删除和/或键更改的 Kafka 流聚合



我正在尝试定义一个 kafka 流,它接受来自主题(例如 EMPLOYEE)的记录,其中记录包含有关员工及其部门的属性,并将其转换为另一个主题 DEPARTMENT,其中包含部门属性和所有员工的列表(对员工进行了一些无状态转换)。

员工记录重复部门数据。 (我实际上正在处理一些 DICOM 标头数据,但我会坚持使用更普遍理解的关系。我正在尝试理解一个通用解决方案)。 此外,主题中的记录仅具有当前数据(即:如果部门更改,则没有先前的部门 ID。

这似乎是一项聚合工作。 我有一些东西似乎适用于简单的情况:

...
KStream<String, Employee> stream = kStreamBuilder.stream("EMPLOYEE"); // Stream from raw EMPLOYEE
stream.map((k, v) -> new KeyValue<>(k, transformEmployee(v))) // <-- some stateless enrichment of the employee
.groupBy((k, emp) -> emp.getDepartmentId(), jsonSerialisedWith(Employee.class))
// dummy reduce to a get a ktable for agg:
.reduce((aggValue, newEmp) -> newEmp) 
.groupBy((k, emp2) -> new KeyValue<>(emp2.getDepartmentId(), emp2), jsonSerialisedWith(Employee.class))
.aggregate(Department::new, this::addEmployee, this::removeEmployee,
jsonValueMaterializedAs("DEPARTMENT-AGG", Department.class))
.toStream()
.to("DEPARTMENT", jsonProducedWith(Department.class));
...
private Department addEmployee(String deptId, Employee employee, Department department) {
department.addEmployee(employee);
if (department.getId() == null) {
department.setId(employee.getDepartmentId());
department.setName(employee.getDepartmentName());
}
return department;
}

这适用于添加或更新。但是,随着时间的推移,员工可能会被删除或重新分配到另一个部门。 我收集的删除应该是发送到 EMPLOYEE 主题的逻辑删除记录(k:empId、v:null)。 但是,我不再拥有部门 ID,我必须执行空检查(并为部门 ID 返回空),因此删除员工时永远不会发生 removeEmployee。 更改部门 ID 的类似问题。

那么,卡夫卡的方法是什么呢?

我认为使用您的代码就足够了,但稍微改变了删除员工的语义。

您应该添加某种Mock部门(当用户从部门中删除时将使用)。

如果员工被移除,而是将部门设置为null,则应将其分配给Mock部门。

我们应该首先更改员工流的键,然后使用值连接器加入部门流,然后 groupByKey 进行聚合

KStream<String, Employee> stream = kStreamBuilder.stream("EMPLOYEE"); // Stream from raw EMPLOYEE
stream.selectKey((k,v) -> v.getDeptId())
.join(deptStream, <joiner function >)
.groupByKey()
.aggregrate(//init,add,//).toStream(); 

相关内容

  • 没有找到相关文章

最新更新