我想实现一个特定的算法,但我很难找到适合这项工作的数据结构。该算法的更简单版本的工作方式如下:
Input: A set of points.
Output: A new set of points.
Step 1: For each point, calculate the closest points in a radius.
Step 2: For each point, calculate a value "v" from the closest points subset.
Step 3: For each point, calculate a new value "w" from the closest points and
the values "v" from the previous step, i.e, "w" depends on the neighbors
and "v" of each neighbor.
Step 4: Update points.
在C++中,我可以像这样解决这个问题:
struct Point {
Vector position;
double v, w;
std::vector<Point *> neighbors;
};
std::vector<Point> points = initializePoints();
calculateNeighbors(points);
calculateV(points); // points[0].v = value; for example.
calculateW(points);
对于点列表等朴素结构,我无法将值"v"更新为原始点集,并且需要计算邻居两次。我怎样才能避免这种情况并保持函数的纯性,因为计算邻居是算法中最昂贵的部分(超过 30% 的时间(?
PS.:对于那些在数值方法和CFD方面有经验的人,这是平滑粒子流体动力学方法的简化版本。
更新:更改了步骤3,使其更清晰。
一个普遍的神话是,Haskell根本不提供突变。实际上,它提供了一种非常特殊的突变:一个值可以只变异一次,从未评估到评估。利用这种特殊突变的艺术称为打结。我们将从数据结构开始,就像您在C++中的数据结构一样:
data Vector -- held abstract
data Point = Point
{ position :: Vector
, v, w :: Double
, neighbors :: [Point]
}
现在,我们要做的是构建一个Array Point
,其neighbors
包含指向同一数组中其他元素的指针。以下代码中Array
的主要特点是它懒惰(它不会太快强制其元素(并且具有快速随机访问;如果您愿意,可以将您喜欢的备用数据结构替换为这些属性。
邻居查找功能的接口有很多选择。为了具体起见并使我自己的工作变得简单,我将假设您有一个函数,该函数接受Vector
和Vectors
列表,并给出邻居的索引。
findNeighbors :: Vector -> [Vector] -> [Int]
findNeighbors = undefined
让我们也为computeV
和computeW
设置一些类型。对于随机数,我们将要求computeV
遵守您所说的非正式合同,即它可以查看任何Point
的position
和neighbors
字段,但不能查看v
或w
字段。(同样,computeW
可以查看除它能得到的任何Point
的w
字段之外的任何内容。实际上,可以在类型级别强制执行这一点,而无需太多的体操,但现在让我们跳过它。
computeV, computeW :: Point -> Double
(computeV, computeW) = undefined
现在我们已经准备好构建我们的(标记的(内存图。
buildGraph :: [Vector] -> Array Int Point
buildGraph vs = answer where
answer = listArray (0, length vs-1) [point pos | pos <- vs]
point pos = this where
this = Point
{ position = pos
, v = computeV this
, w = computeW this
, neighbors = map (answer!) (findNeighbors pos vs)
}
就是这样,真的。现在你可以写你的
newPositions :: Point -> [Vector]
newPositions = undefined
newPositions
可以完全自由地检查它所处理Point
的任何字段,并将所有功能放在一起:
update :: [Vector] -> [Vector]
update = newPositions <=< elems . buildGraph
编辑:。。。解释开头的"特殊突变"注释:在评估过程中,你可以预期当你要求Point
的w
字段时,事情会按这个顺序发生:computeW
会强制v
字段;然后computeV
将强制neighbors
字段;然后neighbors
字段将从未评估变为已评估;然后v
字段将从未评估变为已评估;然后,w
字段将从未评估变为已评估。最后三个步骤看起来与C++算法的三个突变步骤非常相似!
双重编辑:我决定要看到这个东西运行,所以我用虚拟实现实例化了上面所有抽象的东西。我也想看到它只评估一次,因为我甚至不确定我做对了!所以我打了一些trace
电话。这是一个完整的文件:
import Control.Monad
import Data.Array
import Debug.Trace
announce s (Vector pos) = trace $ "computing " ++ s ++ " for position " ++ show pos
data Vector = Vector Double deriving Show
data Point = Point
{ position :: Vector
, v, w :: Double
, neighbors :: [Point]
}
findNeighbors :: Vector -> [Vector] -> [Int]
findNeighbors (Vector n) vs = [i | (i, Vector n') <- zip [0..] vs, abs (n - n') < 1]
computeV, computeW :: Point -> Double
computeV (Point pos _ _ neighbors) = sum [n | Point { position = Vector n } <- neighbors]
computeW (Point pos v _ neighbors) = sum [v | Point { v = v } <- neighbors]
buildGraph :: [Vector] -> Array Int Point
buildGraph vs = answer where
answer = listArray (0, length vs-1) [point pos | pos <- vs]
point pos = this where { this = Point
{ position = announce "position" pos $ pos
, v = announce "v" pos $ computeV this
, w = announce "w" pos $ computeW this
, neighbors = announce "neighbors" pos $ map (answer!) (findNeighbors pos vs)
} }
newPositions :: Point -> [Vector]
newPositions (Point { position = Vector n, v = v, w = w }) = [Vector (n*v), Vector w]
update :: [Vector] -> [Vector]
update = newPositions <=< elems . buildGraph
以及GHCI的运行:
*Main> length . show . update . map Vector $ [0, 0.25, 0.75, 1.25, 35]
computing position for position 0.0
computing v for position 0.0
computing neighbors for position 0.0
computing position for position 0.25
computing position for position 0.75
computing w for position 0.0
computing v for position 0.25
computing neighbors for position 0.25
computing v for position 0.75
computing neighbors for position 0.75
computing position for position 1.25
computing w for position 0.25
computing w for position 0.75
computing v for position 1.25
computing neighbors for position 1.25
computing w for position 1.25
computing position for position 35.0
computing v for position 35.0
computing neighbors for position 35.0
computing w for position 35.0
123
如您所见,每个位置最多计算一次字段。
你能做这样的事情吗?给定以下类型签名
calculateNeighbours :: [Point] -> [[Point]]
calculateV :: [Point] -> Double
calculateW :: [Point] -> Double -> Double
你可以写
algorithm :: [Point] -> [(Point, Double, Double)]
algorithm pts = -- pts :: [Point]
let nbrs = calculateNeighbours pts -- nbrs :: [[Point]]
vs = map calculateV nbrs -- vs :: [Double]
ws = zipWith calculateW nbrs vs -- ws :: [Double]
in zip3 pts vs ws -- :: [(Point,Double,Double)]
这只计算一次邻居列表,并在计算中重用该值进行v
和w
。
如果这不是你想要的,你能详细说明一下吗?
我认为您应该使用Map(HashMap(单独存储从Point
中计数的v(和w(,或者使用可变变量来反映您的C++算法。第一种方法更"功能化",例如,你可以很容易地将parralelism添加到其中,因为所有数据都是不可变的,但它应该慢一点,因为每次你需要按点获取v时,你都必须计算哈希。