我正在尝试实现一个postgresql -> Debezium Kafka Connect source -> Kafka -> Neo4j Kafka Connect sink -> Neo4j.debezium 发送的数据包含包含字段op = "c/u/d"
(创建/更新/删除)的事件。示例文档显示了使用FOREACH
来测试是否应该进行创建/更新以及该部分是否正常工作的模式。我无法开始工作的是在出现op = "d"
事件时如何删除节点(我在主题中看到它)。
我目前针对该 kafka 主题的密码行如下所示(格式化,原文是一长行):
FOREACH (run_me_once in CASE WHEN event.op <> 'd' THEN [1] ELSE [] END |
MERGE (p:DemoTable{id: event.after.id})
SET p.message = event.after.message, p.last_changed = event.ts_ms
)
WITH event
MATCH (p:DemoTable{id: event.after.id})
FOREACH (run_me_once in CASE WHEN event.op = 'd' THEN [1] ELSE [] END |
DELETE p
)
这不会显示任何错误,但也不会删除任何节点。
我尝试了删除部分的多个版本,直到我得出这个:
- 没有
WITH event
(关于MATCH
和FOREACH
的错误仅适用于WITH
), - 在比赛进行比赛时(不允许
FOREACH
MATCH
), - 在
FOREACH
中使用DELETE (p:DemoTable{id: event.after.id})
(代码末尾出现关于expected whitespace or a relationship pattern
的错误)
有条件地处理删除事件的正确模式是什么?
发现问题:来自 debezium 的删除事件有event.after = null
,因此在删除中,它需要MATCH (p:DemoTable{id: event.before.id})
。
最终代码:
FOREACH (run_me_once in CASE WHEN event.op <> 'd' THEN [1] ELSE [] END |
MERGE (p:DemoTable{id: event.after.id})
SET p.message = event.after.message, p.last_changed = event.ts_ms
)
WITH event
MATCH (p:DemoTable{id: event.before.id})
FOREACH (run_me_once in CASE WHEN event.op = 'd' THEN [1] ELSE [] END |
DELETE p
)