递归搜索拓扑网络表中的死角



几个星期来我一直想弄清楚:我需要递归地搜索一个拓扑网络,在这个例子中,OpenStreetMap街道,寻找死角,以及与网络其余部分只隔一条边的社区。如果你所在的城市如此体贴,你可能会在这些地方看到禁止出口的标志。

我的表中有网络中每条边的记录。每条边都有一个"目标"one_answers"源"字段,用来标识该边所连接的节点。我添加了一个名为"悬浮"的二进制列,以指示边缘是否已被识别为终止段。我初始化这个列为FALSE,假设最好。

到目前为止,我已经能够使用以下SQL 来识别简单的分支死角
WITH node_counts AS ( -- get all unique nodes
SELECT target AS node FROM edge_table WHERE NOT dangling
UNION ALL
SELECT source AS node FROM edge_table WHERE NOT dangling),
single_nodes AS ( -- select only those that occur once
SELECT node 
FROM node_counts
GROUP BY node
HAVING count(*) = 1
) -- 
UPDATE edge_table SET dangling = true 
FROM single_nodes
WHERE node = target OR node = source;

我只是继续运行这个查询,直到没有行更新。结果看起来像这样(红色是悬空= true):

https://i.stack.imgur.com/OE1rZ.png

太棒了!这工作得很好…但是仍然存在死胡同社区,如果你愿意的话,它们只通过一条边连接到更大的网络。我如何识别它们?

我最好的猜测是,我将在某个时候需要一个WITH RECURSIVE,但这是我不懂数学的头脑所能做到的。有人能给我指个方向吗?

OK。我是这样想的:

我决定没有一种方法,或者至少不是一个简单的方法来实现这在SQL单独。我最终在PHP和SQL中实现了Tarjan算法,创建了一个临时节点表,将每个节点链接到图的强连接子组件。一旦完成,我更新任何触及不属于最大子组件的节点的段,作为"悬垂"。所有开始和结束于属于最大子组件的节点的边都属于主街道网络(不悬挂)。

下面是代码。请注意,在大型图上运行可能需要很长时间。这对工作记忆也很困难,但它对我的目的有效。

<?php
$username = '';
$password = '';
$database = '';
$edge_table = 'cincy_segments';
$v1 = 'target';
$v2 = 'source';
$dangling_boolean_field = 'dangling';
$edge_id_field = 'edge_id';
//global variables declared
$index = 0;
$component_index = 0;
$nodes = array();
$stack = array();
pg_connect("host=localhost dbname=$database user=$username password=$password");
// get vertices
echo "getting data from databasen";
$neighbors_query = pg_query("
WITH nodes AS (
    SELECT DISTINCT $v1 AS node FROM $edge_table
    UNION
    SELECT DISTINCT $v2 AS node FROM $edge_table
), 
edges AS (
SELECT 
    node,
    $edge_id_field AS edge
FROM nodes JOIN $edge_table
    ON node = $v1 OR node = $v2
)
SELECT
    node,
    array_agg(CASE WHEN node = $v2 THEN $v1 
    WHEN node = $v1 THEN $v2
    ELSE NULL
    END) AS neighbor    
FROM edges JOIN $edge_table ON 
    (node = $v2 AND edge = $edge_id_field) OR 
    (node = $v1 AND edge = $edge_id_field)
GROUP BY node");
// now make the results into php results
echo "putting the results in an arrayn";
while($r = pg_fetch_object($neighbors_query)){ // for each node record
    $nodes[$r->node]['id'] = $r->node;
    $nodes[$r->node]['neighbors'] = explode(',',trim($r->neighbor,'{}'));
}
// create a temporary table to store results
pg_query("
    DROP TABLE IF EXISTS temp_nodes;
    CREATE TABLE temp_nodes (node integer, component integer);
");
// the big traversal
echo "traversing graph (this part takes a while)n";
foreach($nodes as $id => $values){
    if(!isset($values['index'])){
        tarjan($id, 'no parent');
    }
}
// identify dangling edges
echo "identifying dangling edgesn";
pg_query("
    UPDATE $edge_table SET $dangling_boolean_field = FALSE; 
    WITH dcn AS ( -- DisConnected Nodes
        -- get nodes that are NOT in the primary component
        SELECT node FROM temp_nodes WHERE component != (
            -- select the number of the largest component
            SELECT component
            FROM temp_nodes 
            GROUP BY component 
            ORDER BY count(*) DESC
            LIMIT 1)
    ),
    edges AS (
        SELECT DISTINCT e.$edge_id_field AS disconnected_edge_id
        FROM 
            dcn JOIN $edge_table AS e ON dcn.node = e.$v1 OR dcn.node = e.$v2
    )
    UPDATE $edge_table SET $dangling_boolean_field = TRUE
    FROM edges WHERE $edge_id_field = disconnected_edge_id;
");
// clean up after ourselves
echo "cleaning upn";
pg_query("DROP TABLE IF EXISTS temp_nodes;");
pg_query("VACUUM ANALYZE;");
 // the recursive function definition
//
function tarjan($id, $parent)
{
    global $nodes;
    global $index;
    global $component_index;
    global $stack;
    // mark and push
    $nodes[$id]['index'] = $index;
    $nodes[$id]['lowlink'] = $index;
    $index++;
    array_push($stack, $id);
    // go through neighbors
    foreach ($nodes[$id]['neighbors'] as $child_id) {
        if ( !isset($nodes[$child_id]['index']) ) { // if neighbor not yet visited
            // recurse
            tarjan($child_id, $id);
            // find lowpoint
            $nodes[$id]['lowlink'] = min(
                $nodes[$id]['lowlink'],
                $nodes[$child_id]['lowlink']
            );
        } else if ($child_id != $parent) { // if already visited and not parent
            // assess lowpoint
            $nodes[$id]['lowlink'] = min(
                $nodes[$id]['lowlink'],
                $nodes[$child_id]['index']
            );
        }
    }
    // was this a root node?
    if ($nodes[$id]['lowlink'] == $nodes[$id]['index']) {
        do {
            $w = array_pop($stack);
            $scc[] = $w;
        } while($id != $w);
        // record results in table
        pg_query("
            INSERT INTO temp_nodes (node, component)
            VALUES (".implode(','.$component_index.'),(',$scc).",$component_index) 
        ");
        $component_index++;
    }
    return NULL;
}
?>

在我看来,没有循环检测是不可能的。(悬空位是一种面包环检测)。下面的查询是一个分叉的y形,通向两条死胡同(1..)4和11..14)。如果在#19和#15之间添加链接,递归将不会停止。(也许我的逻辑不正确或不完整?)

DROP SCHEMA tmp CASCADE;
CREATE SCHEMA tmp ;
SET search_path=tmp;
CREATE TABLE edge_table
        ( source INTEGER NOT NULL
        , target INTEGER NOT NULL
        , dangling boolean NOT NULL DEFAULT False
        );
INSERT INTO edge_table ( source, target) VALUES
 (1,2) ,(2,3) ,(3,4)
,(11,12) ,(12,13) ,(13,14)
,( 15,16) ,(16,17) ,(17,18) ,( 18,19)
-- , (19,15)    -- this will close the loop
, (19,1)        -- Y-fork
, (19,11)       -- Y-fork
        ;
-- EXPLAIN
WITH RECURSIVE cul AS (
        SELECT e0.source AS source
                , e0.target AS target
        FROM edge_table e0
        WHERE NOT EXISTS ( -- no way out ...
                SELECT * FROM edge_table nx
                WHERE nx.source = e0.target
                )
        UNION ALL
        SELECT e1.source AS source
                , e1.target AS target
        FROM edge_table e1
        JOIN cul ON cul.source = e1.target
        WHERE 1=1
        AND NOT EXISTS ( -- Only one incoming link; no *other* way to cul
                SELECT * FROM edge_table nx
                WHERE nx.target = cul.source
                AND nx.source <> e1.source
                )
        )
SELECT * FROM cul
        ;

[CTE当然是用来在update语句中设置悬空字段的]

最新更新