几个星期来我一直想弄清楚:我需要递归地搜索一个拓扑网络,在这个例子中,OpenStreetMap街道,寻找死角,以及与网络其余部分只隔一条边的社区。如果你所在的城市如此体贴,你可能会在这些地方看到禁止出口的标志。
我的表中有网络中每条边的记录。每条边都有一个"目标"one_answers"源"字段,用来标识该边所连接的节点。我添加了一个名为"悬浮"的二进制列,以指示边缘是否已被识别为终止段。我初始化这个列为FALSE,假设最好。
到目前为止,我已经能够使用以下SQL 来识别简单的分支死角WITH node_counts AS ( -- get all unique nodes
SELECT target AS node FROM edge_table WHERE NOT dangling
UNION ALL
SELECT source AS node FROM edge_table WHERE NOT dangling),
single_nodes AS ( -- select only those that occur once
SELECT node
FROM node_counts
GROUP BY node
HAVING count(*) = 1
) --
UPDATE edge_table SET dangling = true
FROM single_nodes
WHERE node = target OR node = source;
我只是继续运行这个查询,直到没有行更新。结果看起来像这样(红色是悬空= true):
https://i.stack.imgur.com/OE1rZ.png太棒了!这工作得很好…但是仍然存在死胡同社区,如果你愿意的话,它们只通过一条边连接到更大的网络。我如何识别它们?
我最好的猜测是,我将在某个时候需要一个WITH RECURSIVE,但这是我不懂数学的头脑所能做到的。有人能给我指个方向吗?
OK。我是这样想的:
我决定没有一种方法,或者至少不是一个简单的方法来实现这在SQL单独。我最终在PHP和SQL中实现了Tarjan算法,创建了一个临时节点表,将每个节点链接到图的强连接子组件。一旦完成,我更新任何触及不属于最大子组件的节点的段,作为"悬垂"。所有开始和结束于属于最大子组件的节点的边都属于主街道网络(不悬挂)。
下面是代码。请注意,在大型图上运行可能需要很长时间。这对工作记忆也很困难,但它对我的目的有效。
<?php
$username = '';
$password = '';
$database = '';
$edge_table = 'cincy_segments';
$v1 = 'target';
$v2 = 'source';
$dangling_boolean_field = 'dangling';
$edge_id_field = 'edge_id';
//global variables declared
$index = 0;
$component_index = 0;
$nodes = array();
$stack = array();
pg_connect("host=localhost dbname=$database user=$username password=$password");
// get vertices
echo "getting data from databasen";
$neighbors_query = pg_query("
WITH nodes AS (
SELECT DISTINCT $v1 AS node FROM $edge_table
UNION
SELECT DISTINCT $v2 AS node FROM $edge_table
),
edges AS (
SELECT
node,
$edge_id_field AS edge
FROM nodes JOIN $edge_table
ON node = $v1 OR node = $v2
)
SELECT
node,
array_agg(CASE WHEN node = $v2 THEN $v1
WHEN node = $v1 THEN $v2
ELSE NULL
END) AS neighbor
FROM edges JOIN $edge_table ON
(node = $v2 AND edge = $edge_id_field) OR
(node = $v1 AND edge = $edge_id_field)
GROUP BY node");
// now make the results into php results
echo "putting the results in an arrayn";
while($r = pg_fetch_object($neighbors_query)){ // for each node record
$nodes[$r->node]['id'] = $r->node;
$nodes[$r->node]['neighbors'] = explode(',',trim($r->neighbor,'{}'));
}
// create a temporary table to store results
pg_query("
DROP TABLE IF EXISTS temp_nodes;
CREATE TABLE temp_nodes (node integer, component integer);
");
// the big traversal
echo "traversing graph (this part takes a while)n";
foreach($nodes as $id => $values){
if(!isset($values['index'])){
tarjan($id, 'no parent');
}
}
// identify dangling edges
echo "identifying dangling edgesn";
pg_query("
UPDATE $edge_table SET $dangling_boolean_field = FALSE;
WITH dcn AS ( -- DisConnected Nodes
-- get nodes that are NOT in the primary component
SELECT node FROM temp_nodes WHERE component != (
-- select the number of the largest component
SELECT component
FROM temp_nodes
GROUP BY component
ORDER BY count(*) DESC
LIMIT 1)
),
edges AS (
SELECT DISTINCT e.$edge_id_field AS disconnected_edge_id
FROM
dcn JOIN $edge_table AS e ON dcn.node = e.$v1 OR dcn.node = e.$v2
)
UPDATE $edge_table SET $dangling_boolean_field = TRUE
FROM edges WHERE $edge_id_field = disconnected_edge_id;
");
// clean up after ourselves
echo "cleaning upn";
pg_query("DROP TABLE IF EXISTS temp_nodes;");
pg_query("VACUUM ANALYZE;");
// the recursive function definition
//
function tarjan($id, $parent)
{
global $nodes;
global $index;
global $component_index;
global $stack;
// mark and push
$nodes[$id]['index'] = $index;
$nodes[$id]['lowlink'] = $index;
$index++;
array_push($stack, $id);
// go through neighbors
foreach ($nodes[$id]['neighbors'] as $child_id) {
if ( !isset($nodes[$child_id]['index']) ) { // if neighbor not yet visited
// recurse
tarjan($child_id, $id);
// find lowpoint
$nodes[$id]['lowlink'] = min(
$nodes[$id]['lowlink'],
$nodes[$child_id]['lowlink']
);
} else if ($child_id != $parent) { // if already visited and not parent
// assess lowpoint
$nodes[$id]['lowlink'] = min(
$nodes[$id]['lowlink'],
$nodes[$child_id]['index']
);
}
}
// was this a root node?
if ($nodes[$id]['lowlink'] == $nodes[$id]['index']) {
do {
$w = array_pop($stack);
$scc[] = $w;
} while($id != $w);
// record results in table
pg_query("
INSERT INTO temp_nodes (node, component)
VALUES (".implode(','.$component_index.'),(',$scc).",$component_index)
");
$component_index++;
}
return NULL;
}
?>
在我看来,没有循环检测是不可能的。(悬空位是一种面包环检测)。下面的查询是一个分叉的y形,通向两条死胡同(1..)4和11..14)。如果在#19和#15之间添加链接,递归将不会停止。(也许我的逻辑不正确或不完整?)
DROP SCHEMA tmp CASCADE;
CREATE SCHEMA tmp ;
SET search_path=tmp;
CREATE TABLE edge_table
( source INTEGER NOT NULL
, target INTEGER NOT NULL
, dangling boolean NOT NULL DEFAULT False
);
INSERT INTO edge_table ( source, target) VALUES
(1,2) ,(2,3) ,(3,4)
,(11,12) ,(12,13) ,(13,14)
,( 15,16) ,(16,17) ,(17,18) ,( 18,19)
-- , (19,15) -- this will close the loop
, (19,1) -- Y-fork
, (19,11) -- Y-fork
;
-- EXPLAIN
WITH RECURSIVE cul AS (
SELECT e0.source AS source
, e0.target AS target
FROM edge_table e0
WHERE NOT EXISTS ( -- no way out ...
SELECT * FROM edge_table nx
WHERE nx.source = e0.target
)
UNION ALL
SELECT e1.source AS source
, e1.target AS target
FROM edge_table e1
JOIN cul ON cul.source = e1.target
WHERE 1=1
AND NOT EXISTS ( -- Only one incoming link; no *other* way to cul
SELECT * FROM edge_table nx
WHERE nx.target = cul.source
AND nx.source <> e1.source
)
)
SELECT * FROM cul
;