Neo4j: Cypher查询来并行化前一个查询的结果行



我有一个数据库,其中句子彼此相关。我必须对整个数据库执行一次大的更新,因此我试图并行化更新。

相关密码查询如下:

match (s:Sentence)-[r:RELATED]-(t:Sentence)
return s as sentence, collect(t.embedding) as neighbours_embeddings

embedding是一个数字列表。

返回如下结果:

---------------------------------------
| sentence   |  neighbours_embeddings |
---------------------------------------
| sentence1  | [[1, 2, 3], [4, 5, 6]] | 
---------------------------------------
| sentence2  | [[2, 3, 5]]            |
---------------------------------------

现在我想在neighbors_embeddings上执行一些操作,并在相应的Sentence节点上设置一个属性。

我看过Neo4j中不同的并行化技术,据我所知,它们都需要一个列表作为输入。但我的输入将是一个元组(sentence, neighbors_embeddings)。我该如何做到这一点?

有兴趣的人的完整查询:

match (s:Sentence)-[r:RELATED]-(t:Sentence)
with s as sentence, collect(t.embedding) as neighbours
with sentence, [
w in reduce(s=[], neighbour IN neighbours | 
case when size(s) = 0 then
neighbour else [
i in range(0, size(s)-1) |
s[i] + neighbour[i]] end) |
w / tofloat(size(neighbours))
] as average

with sentence, [
i in range(0, size(sentence.embedding)-1) |
(0.8 * sentence.embedding[i]) + (0.2 *average[i])
] as unnormalized

with sentence, unnormalized, sqrt(reduce(sum = 0.0, element in unnormalized | sum + element^2)) as divideby
set sentence.normalized = [
i in range(0, size(unnormalized)-1) | (unnormalized[i] / divideby)
]

对于并行化,apoc是您的朋友,特别是apoc.periodic.iterate过程。在您的用例中,您可以并行化,因为您只更新每行中单个节点的属性。

结果查询看起来像这样:

CALL apoc.periodic.iterate("
match (s:Sentence) RETURN s",
"
match (s)-[r:RELATED]-(t:Sentence)
with s as sentence, collect(t.embedding) as neighbours
with sentence, [
w in reduce(s=[], neighbour IN neighbours | 
case when size(s) = 0 then
neighbour else [
i in range(0, size(s)-1) |
s[i] + neighbour[i]] end) |
w / tofloat(size(neighbours))
] as average

with sentence, [
i in range(0, size(sentence.embedding)-1) |
(0.8 * sentence.embedding[i]) + (0.2 *average[i])
] as unnormalized

with sentence, unnormalized, sqrt(reduce(sum = 0.0, element in unnormalized | sum + element^2)) as divideby
set sentence.normalized = [
i in range(0, size(unnormalized)-1) | (unnormalized[i] / divideby)
]", {batchSize:1000, parallel:true})

您可以使用batchSize参数。要了解更多信息,请参阅文档。

最新更新