使用不可变的数据结构改变数据



我想实现一个特定的算法,但我很难找到适合这项工作的数据结构。该算法的更简单版本的工作方式如下:

Input: A set of points.
Output: A new set of points.
Step 1: For each point, calculate the closest points in a radius.
Step 2: For each point, calculate a value "v" from the closest points subset.
Step 3: For each point, calculate a new value "w" from the closest points and
        the values "v" from the previous step, i.e, "w" depends on the neighbors
        and "v" of each neighbor.
Step 4: Update points.

在C++中,我可以像这样解决这个问题:

struct Point {
    Vector position;
    double v, w;
    std::vector<Point *> neighbors;
};
std::vector<Point> points = initializePoints();
calculateNeighbors(points);
calculateV(points); // points[0].v = value; for example.
calculateW(points);

对于点列表等朴素结构,我无法将值"v"更新为原始点集,并且需要计算邻居两次。我怎样才能避免这种情况并保持函数的纯性,因为计算邻居是算法中最昂贵的部分(超过 30% 的时间(?

PS.:对于那些在数值方法和CFD方面有经验的人,这是平滑粒子流体动力学方法的简化版本。

更新:更改了步骤3,使其更清晰。

一个普遍的神话是,Haskell根本不提供突变。实际上,它提供了一种非常特殊的突变:一个值可以只变异一次,从未评估到评估。利用这种特殊突变的艺术称为打结。我们将从数据结构开始,就像您在C++中的数据结构一样:

data Vector -- held abstract
data Point = Point
    { position  :: Vector
    , v, w      :: Double
    , neighbors :: [Point]
    }

现在,我们要做的是构建一个Array Point,其neighbors包含指向同一数组中其他元素的指针。以下代码中Array的主要特点是它懒惰(它不会太快强制其元素(并且具有快速随机访问;如果您愿意,可以将您喜欢的备用数据结构替换为这些属性。

邻居查找功能的接口有很多选择。为了具体起见并使我自己的工作变得简单,我将假设您有一个函数,该函数接受VectorVectors列表,并给出邻居的索引。

findNeighbors :: Vector -> [Vector] -> [Int]
findNeighbors = undefined

让我们也为computeVcomputeW设置一些类型。对于随机数,我们将要求computeV遵守您所说的非正式合同,即它可以查看任何Pointpositionneighbors字段,但不能查看vw字段。(同样,computeW可以查看除它能得到的任何Pointw字段之外的任何内容。实际上,可以在类型级别强制执行这一点,而无需太多的体操,但现在让我们跳过它。

computeV, computeW :: Point -> Double
(computeV, computeW) = undefined

现在我们已经准备好构建我们的(标记的(内存图。

buildGraph :: [Vector] -> Array Int Point
buildGraph vs = answer where
    answer = listArray (0, length vs-1) [point pos | pos <- vs]
    point pos = this where
        this = Point
            { position = pos
            , v = computeV this
            , w = computeW this
            , neighbors = map (answer!) (findNeighbors pos vs)
            }

就是这样,真的。现在你可以写你的

newPositions :: Point -> [Vector]
newPositions = undefined

newPositions可以完全自由地检查它所处理Point的任何字段,并将所有功能放在一起:

update :: [Vector] -> [Vector]
update = newPositions <=< elems . buildGraph

编辑:。。。解释开头的"特殊突变"注释:在评估过程中,你可以预期当你要求Pointw字段时,事情会按这个顺序发生:computeW会强制v字段;然后computeV将强制neighbors字段;然后neighbors字段将从未评估变为已评估;然后v字段将从未评估变为已评估;然后,w字段将从未评估变为已评估。最后三个步骤看起来与C++算法的三个突变步骤非常相似!

双重编辑:我决定要看到这个东西运行,所以我用虚拟实现实例化了上面所有抽象的东西。我也想看到它只评估一次,因为我甚至不确定我做对了!所以我打了一些trace电话。这是一个完整的文件:

import Control.Monad
import Data.Array
import Debug.Trace
announce s (Vector pos) = trace $ "computing " ++ s ++ " for position " ++ show pos
data Vector = Vector Double deriving Show
data Point = Point
    { position  :: Vector
    , v, w      :: Double
    , neighbors :: [Point]
    }
findNeighbors :: Vector -> [Vector] -> [Int]
findNeighbors (Vector n) vs = [i | (i, Vector n') <- zip [0..] vs, abs (n - n') < 1]
computeV, computeW :: Point -> Double
computeV (Point pos _ _ neighbors) = sum [n | Point { position = Vector n } <- neighbors]
computeW (Point pos v _ neighbors) = sum [v | Point { v = v } <- neighbors]
buildGraph :: [Vector] -> Array Int Point
buildGraph vs = answer where
    answer = listArray (0, length vs-1) [point pos | pos <- vs]
    point pos = this where { this = Point
        { position  = announce "position" pos $ pos
        , v         = announce "v" pos $ computeV this
        , w         = announce "w" pos $ computeW this
        , neighbors = announce "neighbors" pos $ map (answer!) (findNeighbors pos vs)
        } }
newPositions :: Point -> [Vector]
newPositions (Point { position = Vector n, v = v, w = w }) = [Vector (n*v), Vector w]
update :: [Vector] -> [Vector]
update = newPositions <=< elems . buildGraph

以及GHCI的运行:

*Main> length . show . update . map Vector $ [0, 0.25, 0.75, 1.25, 35]
computing position for position 0.0
computing v for position 0.0
computing neighbors for position 0.0
computing position for position 0.25
computing position for position 0.75
computing w for position 0.0
computing v for position 0.25
computing neighbors for position 0.25
computing v for position 0.75
computing neighbors for position 0.75
computing position for position 1.25
computing w for position 0.25
computing w for position 0.75
computing v for position 1.25
computing neighbors for position 1.25
computing w for position 1.25
computing position for position 35.0
computing v for position 35.0
computing neighbors for position 35.0
computing w for position 35.0
123

如您所见,每个位置最多计算一次字段。

你能做这样的事情吗?给定以下类型签名

calculateNeighbours :: [Point] -> [[Point]]
calculateV :: [Point] -> Double
calculateW :: [Point] -> Double -> Double

你可以写

algorithm :: [Point] -> [(Point, Double, Double)]
algorithm pts =                             -- pts  :: [Point]
    let nbrs = calculateNeighbours pts      -- nbrs :: [[Point]]
        vs   = map calculateV nbrs          -- vs   :: [Double]
        ws   = zipWith calculateW nbrs vs   -- ws   :: [Double]
     in zip3 pts vs ws                      --      :: [(Point,Double,Double)]

这只计算一次邻居列表,并在计算中重用该值进行vw

如果这不是你想要的,你能详细说明一下吗?

我认为您应该使用Map(HashMap(单独存储从Point中计数的v(和w(,或者使用可变变量来反映您的C++算法。第一种方法更"功能化",例如,你可以很容易地将parralelism添加到其中,因为所有数据都是不可变的,但它应该慢一点,因为每次你需要按点获取v时,你都必须计算哈希。

最新更新