我有两个表模式(MySQL 5.6,所以没有CTE(,大致看起来像这样:
CREATE TABLE nodes (
node_id INT PRIMARY KEY,
name VARCHAR(10)
);
CREATE TABLE edges (
edge_id INT PRIMARY KEY,
source INT,
target INT,
FOREIGN KEY (source) REFERENCES nodes(node_id),
FOREIGN KEY (target) REFERENCES nodes(node_id)
);
在我们的设计中,两个节点之间的逻辑边(逻辑n1 -> n2
(实际上表示为数据库中的(n1 -> proxy node -> n2
(。我们使用两个边和一个代理节点作为逻辑边的原因是我们可以在边缘上存储属性。因此,当客户端查询由边缘连接的两个节点时,查询将转换为查询三个连接的节点。
我编写了一个查询来获取具有固定长度的路径。例如,"给我所有路径,这些路径以具有某些属性的节点开头,以具有某些属性的节点结束,路径上正好有 5 条边。这是在 SQL 端不使用递归完成的;我只是以编程方式生成一个具有指定固定长度的长查询。
挑战在于,我们希望支持对可变长度路径的查询。例如,"给我所有路径,这些路径以具有某些属性的节点开头,以具有某些属性的节点结束,路径上不少于3条边且不超过10条边。在没有(甚至有(CTE的情况下,这是否可行?
编辑:
一些示例数据:
-- Logical nodes are 1, 3, 5, 7, 9, 11. The rest are proxy nodes.
INSERT INTO nodes VALUES
(1, 'foo'),
(2, '_proxy_'),
(3, 'foo'),
(4, '_proxy_'),
(5, 'bar'),
(6, '_proxy_'),
(7, 'bar'),
(8, '_proxy_'),
(9, 'bar'),
(10, '_proxy_'),
(11, 'bar');
-- Connects 1 -> 2 -> ... -> 11.
INSERT INTO edges VALUES
(1, 1, 2),
(2, 2, 3),
(3, 3, 4),
(4, 4, 5),
(5, 5, 6),
(6, 6, 7),
(7, 7, 8),
(8, 8, 9),
(9, 9, 10),
(10, 10, 11);
查询可以是"选择路径上所有节点的 ID 和名称,以便路径以名为 'foo' 的节点开头,以名为 'bar' 的节点结束,路径上至少有 2 个节点和最多 4 个节点。这些路径包括1 -> 3 -> 5
、1 -> 3 -> 5 -> 7
、3 -> 5
、3 -> 5 -> 7
和3 -> 5 -> 7 -> 9
。因此,结果集应包括节点 1、3、5、7、9 的 ID 和名称。
以下查询以逗号分隔的字符串返回所有感兴趣的路径。
with recursive rcte as (
select e.source, e.target, 1 as depth, concat(e.source) as path
from nodes n
join edges e on e.source = n.node_id
where n.name = 'foo' -- start node name
union all
select e.source, e.target, r.depth + 1 as depth, concat_ws(',', r.path, e.source)
from rcte r
join edges p on p.source = r.target -- p for proxy
join edges e on e.source = p.target
where r.depth < 4 -- max path nodes
)
select r.path
from rcte r
join nodes n on n.node_id = r.source
where r.depth >= 2 -- min path nodes
and n.name = 'bar' -- end node name
结果如下所示:
| path |
| ------- |
| 3,5 |
| 1,3,5 |
| 3,5,7 |
| 1,3,5,7 |
| 3,5,7,9 |
在DB Fiddle上查看
现在可以解析应用程序代码中的字符串并合并/合并数组。如果只需要包含的节点 ID,还可以将外部查询更改为:
select distinct r2.source
from rcte r
join nodes n on n.node_id = r.source
join rcte r2 on find_in_set(r2.source, r.path)
where r.depth >= 2 -- min path nodes
and n.name = 'bar' -- end node name
结果:
| source |
| ------ |
| 1 |
| 3 |
| 5 |
| 7 |
| 9 |
在DB Fiddle上查看
请注意,如果 JOIN onFIND_IN_SET()
包含太多行,则rcte
可能会很慢。我宁愿在应用程序代码中执行此步骤,这在过程语言中应该非常简单。
MySQL 5.6 解决方案
在MySQL 8.0和MariaDB 10.2之前,没有递归的方法。此外,还有许多其他限制,这使得解决方法变得困难。例如:
- 存储函数中没有动态查询
- 无法在单个语句中两次使用临时表
- 引擎中没有文本类型
memmory
但是 - 可以在两个(临时(表之间移动行的存储过程中模拟 RCTE。以下过程执行此操作:
delimiter //
create procedure get_path(
in source_name text,
in target_name text,
in min_depth int,
in max_depth int
)
begin
create temporary table tmp_sources (id int, depth int, path text) engine=innodb;
create temporary table tmp_targets like tmp_sources;
insert into tmp_sources (id, depth, path)
select n.node_id, 1, n.node_id
from nodes n
where n.name = source_name;
set @depth = 1;
while @depth < max_depth do
set @depth = @depth+1;
insert into tmp_targets(id, depth, path)
select e.target, @depth, concat_ws(',', t.path, e.target)
from tmp_sources t
join edges p on p.source = t.id
join edges e on e.source = p.target
where t.depth = @depth - 1;
insert into tmp_sources (id, depth, path)
select id, depth, path
from tmp_targets;
truncate tmp_targets;
end while;
select t.path
from tmp_sources t
join nodes n on n.node_id = t.id
where n.name = target_name
and t.depth >= min_depth;
end //
delimiter ;
将其用作:
call get_path('foo', 'bar', 2, 4)
结果:
| path |
| ------- |
| 3,5 |
| 1,3,5 |
| 3,5,7 |
| 1,3,5,7 |
| 3,5,7,9 |
在DB Fiddle上查看
这远非最佳。如果结果具有多个路径或长路径,则可能需要在临时表上定义一些索引。我也不喜欢在 stroed 过程中创建(临时(表的想法。将其视为"概念验证"。使用它的风险自负。
我已经用传递闭包表解决了这类问题。这将枚举通过节点的每个直接和间接路径。您当前拥有的边是长度为 1 的路径。但是您还需要长度为 0 的路径(即,节点具有指向自身的路径(,然后对于长度大于 1 的路径,从一个源节点到最终目标节点的每个路径。
create table closure (
source int,
target int,
length int,
is_direct bool,
primary key (source, target)
);
insert into closure values
(1, 1, 0, false), (1, 2, 1, true), (1, 3, 2, false), (1, 4, 3, false), (1, 5, 4, false), (1, 6, 5, false), (1, 7, 6, false), (1, 8, 7, false), (1, 9, 8, false), (1, 10, 9, false), (1, 11, 10, false),
(2, 2, 0, false), (2, 3, 1, true), (2, 4, 2, false), (2, 5, 3, false), (2, 6, 4, false), (2, 7, 5, false), (2, 8, 6, false), (2, 9, 7, false), (2, 10, 8, false), (2, 11, 9, false),
(3, 3, 0, false), (3, 4, 1, true), (3, 5, 2, false), (3, 6, 3, false), (3, 7, 4, false), (3, 8, 5, false), (3, 9, 6, false), (3, 10, 7, false), (3, 11, 8, false),
(4, 4, 0, false), (4, 5, 1, true), (4, 6, 2, false), (4, 7, 3, false), (4, 8, 4, false), (4, 9, 5, false), (4, 10, 6, false), (4, 11, 7, false),
(5, 5, 0, false), (5, 6, 1, true), (5, 7, 2, false), (5, 8, 3, false), (5, 9, 4, false), (5, 10, 5, false), (5, 11, 6, false),
(6, 6, 0, false), (6, 7, 1, true), (6, 8, 2, false), (6, 9, 3, false), (6, 10, 4, false), (6, 11, 5, false),
(7, 7, 0, false), (7, 8, 1, true), (7, 9, 2, false), (7, 10, 3, false), (7, 11, 4, false),
(8, 8, 0, false), (8, 9, 1, true), (8, 10, 2, false), (8, 11, 3, false),
(9, 9, 0, false), (9, 10, 1, true), (9, 11, 2, true),
(10, 10, 0, false), (10, 11, 1, true),
(11, 11, 0, false);
现在我们可以编写您的查询:
选择路径上所有节点的 ID 和名称,使路径以名为"foo"的节点开始,以名为"bar"的节点结束,路径上至少有 2 个节点,最多 4 个节点。
我将其转换为长度为 4,6,8 的路径,因为每个路径之间都有一个代理节点,因此在节点之间确实需要两个跃点。
select source.node_id as source_node, target.node_id as target_node, c.length
from nodes as source
join closure as c on source.node_id = c.source
join nodes as target on c.target = target.node_id
where source.name='foo' and target.name = 'bar' and c.length in (4,6,8)
下面是结果,实际上还包括节点 11:
+-------------+-------------+--------+
| source_node | target_node | length |
+-------------+-------------+--------+
| 1 | 5 | 4 |
| 1 | 7 | 6 |
| 1 | 9 | 8 |
| 3 | 7 | 4 |
| 3 | 9 | 6 |
| 3 | 11 | 8 |
+-------------+-------------+--------+
保罗·斯皮格尔的评论:
获得路径的终结点后,可以查询从源开始并在具有目标路径的节点结束的所有路径的闭包。
select source.node_id as source_node, target.node_id as target_node,
group_concat(i1.target order by i1.target) as interim_nodes
from nodes as source
join closure as c on source.node_id = c.source
join nodes as target on c.target = target.node_id
join closure as i1 on source.node_id = i1.source
join closure as i2 on target.node_id = i2.target and i1.target = i2.source
where source.name='foo' and target.name = 'bar' and c.length in (4,6,8)
group by source.node_id, target.node_id
+-------------+-------------+---------------------+
| source_node | target_node | interim_nodes |
+-------------+-------------+---------------------+
| 1 | 5 | 1,2,3,4,5 |
| 1 | 7 | 1,2,3,4,5,6,7 |
| 1 | 9 | 1,2,3,4,5,6,7,8,9 |
| 3 | 7 | 3,4,5,6,7 |
| 3 | 9 | 3,4,5,6,7,8,9 |
| 3 | 11 | 3,4,5,6,7,8,9,10,11 |
+-------------+-------------+---------------------+